Browse Source

remove flow and use rx

...to stay in line with currently used concepts

Signed-off-by: Andy Scherzinger <info@andy-scherzinger.de>
Andy Scherzinger 2 years ago
parent
commit
565903b956

+ 1 - 0
app/build.gradle

@@ -245,6 +245,7 @@ dependencies {
     kapt 'io.requery:requery-processor:1.6.1'
 
     implementation "androidx.room:room-runtime:${roomVersion}"
+    implementation "androidx.room:room-rxjava2:${roomVersion}"
     kapt "androidx.room:room-compiler:${roomVersion}" // For Kotlin use kapt instead of annotationProcessor
     implementation "androidx.room:room-ktx:${roomVersion}"
 

+ 94 - 80
app/src/main/java/com/nextcloud/talk/activities/MainActivity.kt

@@ -46,10 +46,11 @@ import com.nextcloud.talk.controllers.ServerSelectionController
 import com.nextcloud.talk.controllers.SettingsController
 import com.nextcloud.talk.controllers.WebViewLoginController
 import com.nextcloud.talk.controllers.base.providers.ActionBarProvider
-import com.nextcloud.talk.data.user.UsersRepository
+import com.nextcloud.talk.data.user.model.UserNgEntity
 import com.nextcloud.talk.databinding.ActivityMainBinding
 import com.nextcloud.talk.models.database.UserEntity
 import com.nextcloud.talk.models.json.conversations.RoomOverall
+import com.nextcloud.talk.users.UserManager
 import com.nextcloud.talk.utils.ApiUtils
 import com.nextcloud.talk.utils.ConductorRemapping.remapChatController
 import com.nextcloud.talk.utils.SecurityUtils
@@ -65,8 +66,6 @@ import io.reactivex.schedulers.Schedulers
 import io.requery.Persistable
 import io.requery.android.sqlcipher.SqlCipherDatabaseSource
 import io.requery.reactivex.ReactiveEntityStore
-import kotlinx.coroutines.GlobalScope
-import kotlinx.coroutines.launch
 import org.parceler.Parcels
 import javax.inject.Inject
 
@@ -84,7 +83,7 @@ class MainActivity : BaseActivity(), ActionBarProvider {
     lateinit var ncApi: NcApi
 
     @Inject
-    lateinit var usersRepository: UsersRepository
+    lateinit var userManager: UserManager
 
     private var router: Router? = null
 
@@ -119,9 +118,14 @@ class MainActivity : BaseActivity(), ActionBarProvider {
                 if (!appPreferences.isDbRoomMigrated) {
                     appPreferences.isDbRoomMigrated = true
                 }
-                GlobalScope.launch {
-                    usersRepository.getUsers().collect {
-                        if (it.isNotEmpty()) {
+
+                userManager.users.subscribe(object : Observer<List<UserNgEntity>> {
+                    override fun onSubscribe(d: Disposable) {
+                        // unused atm
+                    }
+
+                    override fun onNext(users: List<UserNgEntity>) {
+                        if (users.isNotEmpty()) {
                             runOnUiThread {
                                 setDefaultRootController()
                             }
@@ -131,7 +135,15 @@ class MainActivity : BaseActivity(), ActionBarProvider {
                             }
                         }
                     }
-                }
+
+                    override fun onError(e: Throwable) {
+                        // unused atm
+                    }
+
+                    override fun onComplete() {
+                        // unused atm
+                    }
+                })
             } else {
                 launchLoginScreen()
             }
@@ -191,15 +203,27 @@ class MainActivity : BaseActivity(), ActionBarProvider {
     }
 
     fun resetConversationsList() {
-        GlobalScope.launch {
-            usersRepository.getUsers().collect {
-                if (it.isNotEmpty()) {
+        userManager.users.subscribe(object : Observer<List<UserNgEntity>> {
+            override fun onSubscribe(d: Disposable) {
+                // unused atm
+            }
+
+            override fun onNext(users: List<UserNgEntity>) {
+                if (users.isNotEmpty()) {
                     runOnUiThread {
                         setDefaultRootController()
                     }
                 }
             }
-        }
+
+            override fun onError(e: Throwable) {
+                // unused atm
+            }
+
+            override fun onComplete() {
+                // unused atm
+            }
+        })
     }
 
     fun openSettings() {
@@ -237,22 +261,15 @@ class MainActivity : BaseActivity(), ActionBarProvider {
                 "vnd.android.cursor.item/vnd.com.nextcloud.talk2.chat" -> {
                     val user = userId.substringBeforeLast("@")
                     val baseUrl = userId.substringAfterLast("@")
-                    GlobalScope.launch {
-                        usersRepository.getActiveUser().collect {
-                            if (it?.baseUrl?.endsWith(baseUrl) == true) {
-                                runOnUiThread {
-                                    startConversation(user)
-                                }
-                            } else {
-                                runOnUiThread {
-                                    Snackbar.make(
-                                        binding.controllerContainer,
-                                        R.string.nc_phone_book_integration_account_not_found,
-                                        Snackbar.LENGTH_LONG
-                                    ).show()
-                                }
-                            }
-                        }
+
+                    if (userManager.currentUser?.baseUrl?.endsWith(baseUrl) == true) {
+                        startConversation(user)
+                    } else {
+                        Snackbar.make(
+                            binding.controllerContainer,
+                            R.string.nc_phone_book_integration_account_not_found,
+                            Snackbar.LENGTH_LONG
+                        ).show()
                     }
                 }
             }
@@ -262,18 +279,40 @@ class MainActivity : BaseActivity(), ActionBarProvider {
     private fun startConversation(userId: String) {
         val roomType = "1"
 
-        GlobalScope.launch {
-            usersRepository.getActiveUser().collect { currentUser ->
-                if (currentUser != null) {
-                    val apiVersion = ApiUtils.getConversationApiVersion(currentUser, intArrayOf(ApiUtils.APIv4, 1))
-                    val credentials = ApiUtils.getCredentials(currentUser.username, currentUser.token)
-                    val retrofitBucket = ApiUtils.getRetrofitBucketForCreateRoom(
-                        apiVersion, currentUser.baseUrl, roomType,
-                        null, userId, null
-                    )
-                    ncApi.createRoom(
+        val currentUser = userManager.currentUser
+
+        val apiVersion = ApiUtils.getConversationApiVersion(currentUser, intArrayOf(ApiUtils.APIv4, 1))
+        val credentials = ApiUtils.getCredentials(currentUser?.username, currentUser?.token)
+        val retrofitBucket = ApiUtils.getRetrofitBucketForCreateRoom(
+            apiVersion, currentUser?.baseUrl, roomType,
+            null, userId, null
+        )
+
+        ncApi.createRoom(
+            credentials,
+            retrofitBucket.url, retrofitBucket.queryMap
+        )
+            .subscribeOn(Schedulers.io())
+            .observeOn(AndroidSchedulers.mainThread())
+            .subscribe(object : Observer<RoomOverall> {
+                override fun onSubscribe(d: Disposable) {
+                    // unused atm
+                }
+
+                override fun onNext(roomOverall: RoomOverall) {
+                    val bundle = Bundle()
+                    bundle.putParcelable(KEY_USER_ENTITY, currentUser)
+                    bundle.putString(KEY_ROOM_TOKEN, roomOverall.ocs!!.data!!.token)
+                    bundle.putString(KEY_ROOM_ID, roomOverall.ocs!!.data!!.roomId)
+
+                    // FIXME once APIv2 or later is used only, the createRoom already returns all the data
+                    ncApi.getRoom(
                         credentials,
-                        retrofitBucket.url, retrofitBucket.queryMap
+                        ApiUtils.getUrlForRoom(
+                            apiVersion,
+                            currentUser?.baseUrl,
+                            roomOverall.ocs!!.data!!.token
+                        )
                     )
                         .subscribeOn(Schedulers.io())
                         .observeOn(AndroidSchedulers.mainThread())
@@ -283,46 +322,14 @@ class MainActivity : BaseActivity(), ActionBarProvider {
                             }
 
                             override fun onNext(roomOverall: RoomOverall) {
-                                val bundle = Bundle()
-                                bundle.putParcelable(KEY_USER_ENTITY, currentUser)
-                                bundle.putString(KEY_ROOM_TOKEN, roomOverall.ocs!!.data!!.token)
-                                bundle.putString(KEY_ROOM_ID, roomOverall.ocs!!.data!!.roomId)
-
-                                // FIXME once APIv2 or later is used only, the createRoom already returns all the data
-                                ncApi.getRoom(
-                                    credentials,
-                                    ApiUtils.getUrlForRoom(
-                                        apiVersion,
-                                        currentUser.baseUrl,
-                                        roomOverall.ocs!!.data!!.token
-                                    )
+                                bundle.putParcelable(
+                                    KEY_ACTIVE_CONVERSATION,
+                                    Parcels.wrap(roomOverall.ocs!!.data)
+                                )
+                                remapChatController(
+                                    router!!, currentUser!!.id,
+                                    roomOverall.ocs!!.data!!.token!!, bundle, true
                                 )
-                                    .subscribeOn(Schedulers.io())
-                                    .observeOn(AndroidSchedulers.mainThread())
-                                    .subscribe(object : Observer<RoomOverall> {
-                                        override fun onSubscribe(d: Disposable) {
-                                            // unused atm
-                                        }
-
-                                        override fun onNext(roomOverall: RoomOverall) {
-                                            bundle.putParcelable(
-                                                KEY_ACTIVE_CONVERSATION,
-                                                Parcels.wrap(roomOverall.ocs!!.data)
-                                            )
-                                            remapChatController(
-                                                router!!, currentUser.id,
-                                                roomOverall.ocs!!.data!!.token!!, bundle, true
-                                            )
-                                        }
-
-                                        override fun onError(e: Throwable) {
-                                            // unused atm
-                                        }
-
-                                        override fun onComplete() {
-                                            // unused atm
-                                        }
-                                    })
                             }
 
                             override fun onError(e: Throwable) {
@@ -334,8 +341,15 @@ class MainActivity : BaseActivity(), ActionBarProvider {
                             }
                         })
                 }
-            }
-        }
+
+                override fun onError(e: Throwable) {
+                    // unused atm
+                }
+
+                override fun onComplete() {
+                    // unused atm
+                }
+            })
     }
 
     @RequiresApi(api = Build.VERSION_CODES.M)

+ 13 - 30
app/src/main/java/com/nextcloud/talk/controllers/SettingsController.kt

@@ -69,7 +69,6 @@ import com.nextcloud.talk.application.NextcloudTalkApplication.Companion.setAppT
 import com.nextcloud.talk.application.NextcloudTalkApplication.Companion.sharedApplication
 import com.nextcloud.talk.controllers.base.NewBaseController
 import com.nextcloud.talk.controllers.util.viewBinding
-import com.nextcloud.talk.data.user.UsersRepository
 import com.nextcloud.talk.data.user.model.UserNgEntity
 import com.nextcloud.talk.databinding.ControllerSettingsBinding
 import com.nextcloud.talk.jobs.AccountRemovalWorker
@@ -99,10 +98,6 @@ import io.reactivex.Observer
 import io.reactivex.android.schedulers.AndroidSchedulers
 import io.reactivex.disposables.Disposable
 import io.reactivex.schedulers.Schedulers
-import kotlinx.coroutines.MainScope
-import kotlinx.coroutines.cancel
-import kotlinx.coroutines.flow.first
-import kotlinx.coroutines.launch
 import kotlinx.coroutines.runBlocking
 import net.orange_box.storebox.listeners.OnPreferenceValueChangedListener
 import okhttp3.MediaType.Companion.toMediaTypeOrNull
@@ -123,9 +118,6 @@ class SettingsController : NewBaseController(R.layout.controller_settings) {
     @Inject
     lateinit var userUtils: UserUtils
 
-    @Inject
-    lateinit var userRepository: UsersRepository
-
     @Inject
     lateinit var currentUserProvider: CurrentUserProviderNew
 
@@ -143,18 +135,13 @@ class SettingsController : NewBaseController(R.layout.controller_settings) {
     private var profileQueryDisposable: Disposable? = null
     private var dbQueryDisposable: Disposable? = null
 
-    val scope = MainScope()
-
     override val title: String
         get() =
             resources!!.getString(R.string.nc_settings)
 
-    private suspend fun getCurrentUser() {
-        val user = currentUserProvider.currentUser.first()
-        Log.e(TAG, "User: $user")
-        currentUser = user
-        credentials =
-            ApiUtils.getCredentials(currentUser!!.username, currentUser!!.token)
+    private fun getCurrentUser() {
+        currentUser = currentUserProvider.currentUser
+        credentials = ApiUtils.getCredentials(currentUser!!.username, currentUser!!.token)
     }
 
     override fun onViewBound(view: View) {
@@ -203,20 +190,18 @@ class SettingsController : NewBaseController(R.layout.controller_settings) {
 
             setupClientCertView()
         }
+
+        Log.i(TAG, "Current user: " + currentUser?.displayName)
     }
 
     private fun setupPhoneBookIntegration() {
-        scope.launch {
-            userRepository.getActiveUser().collect {
-                if (CapabilitiesNgUtil.isPhoneBookIntegrationAvailable(it)) {
-                    activity!!.runOnUiThread {
-                        binding.settingsPhoneBookIntegration.visibility = View.VISIBLE
-                    }
-                } else {
-                    activity!!.runOnUiThread {
-                        binding.settingsPhoneBookIntegration.visibility = View.GONE
-                    }
-                }
+        if (CapabilitiesNgUtil.isPhoneBookIntegrationAvailable(currentUser)) {
+            activity!!.runOnUiThread {
+                binding.settingsPhoneBookIntegration.visibility = View.VISIBLE
+            }
+        } else {
+            activity!!.runOnUiThread {
+                binding.settingsPhoneBookIntegration.visibility = View.GONE
             }
         }
     }
@@ -327,7 +312,7 @@ class SettingsController : NewBaseController(R.layout.controller_settings) {
         var port = -1
         val uri: URI
         try {
-            uri = URI(this@SettingsController.currentUser!!.baseUrl)
+            uri = URI(currentUser!!.baseUrl)
             host = uri.host
             port = uri.port
         } catch (e: URISyntaxException) {
@@ -759,8 +744,6 @@ class SettingsController : NewBaseController(R.layout.controller_settings) {
         appPreferences?.unregisterReadPrivacyChangeListener(readPrivacyChangeListener)
         appPreferences?.unregisterPhoneBookIntegrationChangeListener(phoneBookIntegrationChangeListener)
 
-        scope.cancel()
-
         super.onDestroy()
     }
 

+ 106 - 50
app/src/main/java/com/nextcloud/talk/data/user/UsersDao.kt

@@ -22,6 +22,7 @@
 
 package com.nextcloud.talk.data.user
 
+import android.util.Log
 import androidx.room.Dao
 import androidx.room.Insert
 import androidx.room.OnConflictStrategy
@@ -29,8 +30,9 @@ import androidx.room.Query
 import androidx.room.Transaction
 import androidx.room.Update
 import com.nextcloud.talk.data.user.model.UserNgEntity
-import kotlinx.coroutines.flow.Flow
-import kotlinx.coroutines.flow.flowOf
+import io.reactivex.Observable
+import io.reactivex.Observer
+import io.reactivex.disposables.Disposable
 import java.lang.Boolean.FALSE
 import java.lang.Boolean.TRUE
 
@@ -39,107 +41,161 @@ import java.lang.Boolean.TRUE
 abstract class UsersDao {
     // get active user
     @Query("SELECT * FROM User where current = 1")
-    abstract fun getActiveUser(): Flow<UserNgEntity?>
+    abstract fun getActiveUser(): Observable<UserNgEntity?>
+
+    @Query("SELECT * FROM User where current = 1")
+    abstract fun getActiveUserSynchronously(): UserNgEntity?
 
     @Query("SELECT * FROM User WHERE current = 1")
-    abstract fun getActiveUserLiveData(): Flow<UserNgEntity?>
+    abstract fun getActiveUserLiveData(): Observable<UserNgEntity?>
 
     @Query("SELECT * FROM User ORDER BY current DESC")
-    abstract fun getUsersLiveData(): Flow<List<UserNgEntity>>
+    abstract fun getUsersLiveData(): Observable<List<UserNgEntity>>
 
     @Query("SELECT * FROM User WHERE current != 1 ORDER BY current DESC")
-    abstract fun getUsersLiveDataWithoutActive(): Flow<List<UserNgEntity>>
+    abstract fun getUsersLiveDataWithoutActive(): Observable<List<UserNgEntity>>
 
     @Query("DELETE FROM User WHERE id = :id")
-    abstract suspend fun deleteUserWithId(id: Long)
+    abstract fun deleteUserWithId(id: Long)
 
     @Update
-    abstract suspend fun updateUser(user: UserNgEntity): Int
+    abstract fun updateUser(user: UserNgEntity): Int
 
     @Insert(onConflict = OnConflictStrategy.REPLACE)
-    abstract suspend fun saveUser(user: UserNgEntity): Long
+    abstract fun saveUser(user: UserNgEntity): Long
 
     @Insert(onConflict = OnConflictStrategy.REPLACE)
-    abstract suspend fun saveUsers(vararg users: UserNgEntity): List<Long>
+    abstract fun saveUsers(vararg users: UserNgEntity): List<Long>
 
     // get all users not scheduled for deletion
     @Query("SELECT * FROM User where current != 0")
-    abstract fun getUsers(): Flow<List<UserNgEntity>>
+    abstract fun getUsers(): Observable<List<UserNgEntity>>
 
     @Query("SELECT * FROM User where id = :id")
-    abstract fun getUserWithId(id: Long): Flow<UserNgEntity?>
+    abstract fun getUserWithId(id: Long): Observable<UserNgEntity?>
 
     @Query("SELECT * FROM User where id = :id")
-    abstract fun getUserWithIdLiveData(id: Long): Flow<UserNgEntity?>
+    abstract fun getUserWithIdLiveData(id: Long): Observable<UserNgEntity?>
 
     @Query("SELECT * FROM User where id = :id AND scheduledForDeletion != 1")
-    abstract fun getUserWithIdNotScheduledForDeletion(id: Long): Flow<UserNgEntity?>
+    abstract fun getUserWithIdNotScheduledForDeletion(id: Long): Observable<UserNgEntity?>
 
     @Query("SELECT * FROM User where userId = :userId")
-    abstract fun getUserWithUserId(userId: String): Flow<UserNgEntity?>
+    abstract fun getUserWithUserId(userId: String): Observable<UserNgEntity?>
 
     @Query("SELECT * FROM User where userId != :userId")
-    abstract fun getUsersWithoutUserId(userId: Long): Flow<List<UserNgEntity>>
+    abstract fun getUsersWithoutUserId(userId: Long): Observable<List<UserNgEntity>>
 
     @Query("SELECT * FROM User where current = 0")
-    abstract fun getUsersScheduledForDeletion(): Flow<List<UserNgEntity>>
+    abstract fun getUsersScheduledForDeletion(): Observable<List<UserNgEntity>>
 
     @Query("SELECT * FROM User where scheduledForDeletion = 0")
-    abstract fun getUsersNotScheduledForDeletion(): Flow<List<UserNgEntity>>
+    abstract fun getUsersNotScheduledForDeletion(): Observable<List<UserNgEntity>>
 
     @Query("SELECT * FROM User WHERE username = :username AND baseUrl = :server")
-    abstract fun getUserWithUsernameAndServer(username: String, server: String): Flow<UserNgEntity?>
+    abstract fun getUserWithUsernameAndServer(username: String, server: String): Observable<UserNgEntity?>
 
     @Transaction
-    open suspend fun setUserAsActiveWithId(id: Long): Flow<Boolean> {
+    open suspend fun setUserAsActiveWithId(id: Long): Boolean {
         val users = getUsers()
-        users.collect {
-            for (user in it) {
-                // removed from clause: && UserStatus.ACTIVE == user.status
-                if (user.id != id) {
-                    user.current = TRUE
-                    updateUser(user)
-                } // removed from clause: && UserStatus.ACTIVE != user.status
-                else if (user.id == id) {
-                    user.current = TRUE
-                    updateUser(user)
+        var result = TRUE
+
+        users.subscribe(object : Observer<List<UserNgEntity>> {
+            override fun onSubscribe(d: Disposable) {
+                // unused atm
+            }
+
+            override fun onNext(users: List<UserNgEntity>) {
+                for (user in users) {
+                    // removed from clause: && UserStatus.ACTIVE == user.status
+                    if (user.id != id) {
+                        user.current = TRUE
+                        updateUser(user)
+                    } // removed from clause: && UserStatus.ACTIVE != user.status
+                    else if (user.id == id) {
+                        user.current = TRUE
+                        updateUser(user)
+                    }
                 }
             }
-        }
 
-        return flowOf(TRUE)
+            override fun onError(e: Throwable) {
+                Log.e(TAG, "Error setting user active", e)
+                result = FALSE
+            }
+
+            override fun onComplete() {
+                // unused atm
+            }
+        })
+
+        return result
     }
 
     @Transaction
-    open suspend fun markUserForDeletion(id: Long): Flow<Boolean> {
+    open suspend fun markUserForDeletion(id: Long): Boolean {
         val users = getUsers()
-        users.collect {
-            for (user in it) {
-                if (user.id == id) {
-                    // TODO currently we only have a boolean, no intermediate states
-                    user.current = FALSE
-                    updateUser(user)
-                    break
+
+        users.subscribe(object : Observer<List<UserNgEntity>> {
+            override fun onSubscribe(d: Disposable) {
+                // unused atm
+            }
+
+            override fun onNext(users: List<UserNgEntity>) {
+                for (user in users) {
+                    if (user.id == id) {
+                        // TODO currently we only have a boolean, no intermediate states
+                        user.current = FALSE
+                        updateUser(user)
+                        break
+                    }
                 }
             }
-        }
+
+            override fun onError(e: Throwable) {
+                // unused atm
+            }
+
+            override fun onComplete() {
+                // unused atm
+            }
+        })
 
         return setAnyUserAsActive()
     }
 
     @Transaction
-    open suspend fun setAnyUserAsActive(): Flow<Boolean> {
+    open suspend fun setAnyUserAsActive(): Boolean {
         val users = getUsers()
         var result = FALSE
-        users.collect {
-            for (user in it) {
-                user.current = TRUE
-                updateUser(user)
-                result = true
-                break
+
+        users.subscribe(object : Observer<List<UserNgEntity>> {
+            override fun onSubscribe(d: Disposable) {
+                // unused atm
             }
-        }
 
-        return flowOf(result)
+            override fun onNext(users: List<UserNgEntity>) {
+                for (user in users) {
+                    user.current = TRUE
+                    updateUser(user)
+                    result = TRUE
+                    break
+                }
+            }
+
+            override fun onError(e: Throwable) {
+                // unused atm
+            }
+
+            override fun onComplete() {
+                // unused atm
+            }
+        })
+
+        return result
+    }
+
+    companion object {
+        const val TAG = "UsersDao"
     }
 }

+ 21 - 20
app/src/main/java/com/nextcloud/talk/data/user/UsersRepository.kt

@@ -23,27 +23,28 @@
 package com.nextcloud.talk.data.user
 
 import com.nextcloud.talk.data.user.model.UserNgEntity
-import kotlinx.coroutines.flow.Flow
+import io.reactivex.Observable
 
 @Suppress("TooManyFunctions")
 interface UsersRepository {
-    fun getActiveUserLiveData(): Flow<UserNgEntity?>
-    fun getActiveUser(): Flow<UserNgEntity?>
-    fun getUsers(): Flow<List<UserNgEntity>>
-    fun getUserWithId(id: Long): Flow<UserNgEntity?>
-    fun getUserWithIdLiveData(id: Long): Flow<UserNgEntity?>
-    fun getUserWithIdNotScheduledForDeletion(id: Long): Flow<UserNgEntity?>
-    fun getUserWithUserId(userId: String): Flow<UserNgEntity?>
-    fun getUsersWithoutUserId(userId: Long): Flow<List<UserNgEntity>>
-    fun getUsersLiveData(): Flow<List<UserNgEntity>>
-    fun getUsersLiveDataWithoutActive(): Flow<List<UserNgEntity>>
-    fun getUsersScheduledForDeletion(): Flow<List<UserNgEntity>>
-    fun getUsersNotScheduledForDeletion(): Flow<List<UserNgEntity>>
-    fun getUserWithUsernameAndServer(username: String, server: String): Flow<UserNgEntity?>
-    suspend fun updateUser(user: UserNgEntity): Int
-    suspend fun insertUser(user: UserNgEntity): Long
-    suspend fun setUserAsActiveWithId(id: Long): Flow<Boolean>
-    suspend fun deleteUserWithId(id: Long)
-    suspend fun setAnyUserAsActive(): Flow<Boolean>
-    suspend fun markUserForDeletion(id: Long): Flow<Boolean>
+    fun getActiveUserLiveData(): Observable<UserNgEntity?>
+    fun getActiveUser(): Observable<UserNgEntity?>
+    fun getActiveUserSynchronously(): UserNgEntity?
+    fun getUsers(): Observable<List<UserNgEntity>>
+    fun getUserWithId(id: Long): Observable<UserNgEntity?>
+    fun getUserWithIdLiveData(id: Long): Observable<UserNgEntity?>
+    fun getUserWithIdNotScheduledForDeletion(id: Long): Observable<UserNgEntity?>
+    fun getUserWithUserId(userId: String): Observable<UserNgEntity?>
+    fun getUsersWithoutUserId(userId: Long): Observable<List<UserNgEntity>>
+    fun getUsersLiveData(): Observable<List<UserNgEntity>>
+    fun getUsersLiveDataWithoutActive(): Observable<List<UserNgEntity>>
+    fun getUsersScheduledForDeletion(): Observable<List<UserNgEntity>>
+    fun getUsersNotScheduledForDeletion(): Observable<List<UserNgEntity>>
+    fun getUserWithUsernameAndServer(username: String, server: String): Observable<UserNgEntity?>
+    fun updateUser(user: UserNgEntity): Int
+    fun insertUser(user: UserNgEntity): Long
+    suspend fun setUserAsActiveWithId(id: Long): Boolean
+    fun deleteUserWithId(id: Long)
+    suspend fun setAnyUserAsActive(): Boolean
+    suspend fun markUserForDeletion(id: Long): Boolean
 }

+ 25 - 22
app/src/main/java/com/nextcloud/talk/data/user/UsersRepositoryImpl.kt

@@ -23,84 +23,87 @@
 package com.nextcloud.talk.data.user
 
 import com.nextcloud.talk.data.user.model.UserNgEntity
-import kotlinx.coroutines.flow.Flow
-import kotlinx.coroutines.flow.distinctUntilChanged
+import io.reactivex.Observable
 
 @Suppress("TooManyFunctions")
 class UsersRepositoryImpl(private val usersDao: UsersDao) : UsersRepository {
-    override fun getActiveUserLiveData(): Flow<UserNgEntity?> {
-        return usersDao.getActiveUserLiveData().distinctUntilChanged()
+    override fun getActiveUserLiveData(): Observable<UserNgEntity?> {
+        return usersDao.getActiveUserLiveData()
     }
 
-    override fun getActiveUser(): Flow<UserNgEntity?> {
+    override fun getActiveUser(): Observable<UserNgEntity?> {
         return usersDao.getActiveUser()
     }
 
-    override fun getUsers(): Flow<List<UserNgEntity>> {
+    override fun getActiveUserSynchronously(): UserNgEntity? {
+        return usersDao.getActiveUserSynchronously()
+    }
+
+    override fun getUsers(): Observable<List<UserNgEntity>> {
         return usersDao.getUsers()
     }
 
-    override fun getUserWithId(id: Long): Flow<UserNgEntity?> {
+    override fun getUserWithId(id: Long): Observable<UserNgEntity?> {
         return usersDao.getUserWithId(id)
     }
 
-    override fun getUserWithIdLiveData(id: Long): Flow<UserNgEntity?> {
+    override fun getUserWithIdLiveData(id: Long): Observable<UserNgEntity?> {
         return usersDao.getUserWithIdLiveData(id).distinctUntilChanged()
     }
 
-    override fun getUserWithIdNotScheduledForDeletion(id: Long): Flow<UserNgEntity?> {
+    override fun getUserWithIdNotScheduledForDeletion(id: Long): Observable<UserNgEntity?> {
         return usersDao.getUserWithIdNotScheduledForDeletion(id)
     }
 
-    override fun getUserWithUserId(userId: String): Flow<UserNgEntity?> {
+    override fun getUserWithUserId(userId: String): Observable<UserNgEntity?> {
         return usersDao.getUserWithUserId(userId)
     }
 
-    override fun getUsersWithoutUserId(userId: Long): Flow<List<UserNgEntity>> {
+    override fun getUsersWithoutUserId(userId: Long): Observable<List<UserNgEntity>> {
         return usersDao.getUsersWithoutUserId(userId)
     }
 
-    override fun getUsersLiveData(): Flow<List<UserNgEntity>> {
+    override fun getUsersLiveData(): Observable<List<UserNgEntity>> {
         return usersDao.getUsersLiveData().distinctUntilChanged()
     }
 
-    override fun getUsersLiveDataWithoutActive(): Flow<List<UserNgEntity>> {
+    override fun getUsersLiveDataWithoutActive(): Observable<List<UserNgEntity>> {
         return usersDao.getUsersLiveDataWithoutActive().distinctUntilChanged()
     }
 
-    override fun getUsersScheduledForDeletion(): Flow<List<UserNgEntity>> {
+    override fun getUsersScheduledForDeletion(): Observable<List<UserNgEntity>> {
         return usersDao.getUsersScheduledForDeletion()
     }
 
-    override fun getUsersNotScheduledForDeletion(): Flow<List<UserNgEntity>> {
+    override fun getUsersNotScheduledForDeletion(): Observable<List<UserNgEntity>> {
         return usersDao.getUsersNotScheduledForDeletion()
     }
 
-    override fun getUserWithUsernameAndServer(username: String, server: String): Flow<UserNgEntity?> {
+    override fun getUserWithUsernameAndServer(username: String, server: String): Observable<UserNgEntity?> {
         return usersDao.getUserWithUsernameAndServer(username, server)
     }
 
-    override suspend fun updateUser(user: UserNgEntity): Int {
+    override fun updateUser(user: UserNgEntity): Int {
         return usersDao.updateUser(user)
     }
 
-    override suspend fun insertUser(user: UserNgEntity): Long {
+    override fun insertUser(user: UserNgEntity): Long {
         return usersDao.saveUser(user)
     }
 
-    override suspend fun setUserAsActiveWithId(id: Long): Flow<Boolean> {
+    override suspend fun setUserAsActiveWithId(id: Long): Boolean {
         return usersDao.setUserAsActiveWithId(id)
     }
 
-    override suspend fun deleteUserWithId(id: Long) {
+    override fun deleteUserWithId(id: Long) {
         usersDao.deleteUserWithId(id)
     }
 
-    override suspend fun setAnyUserAsActive(): Flow<Boolean> {
+    override suspend fun setAnyUserAsActive(): Boolean {
         return usersDao.setAnyUserAsActive()
     }
 
-    override suspend fun markUserForDeletion(id: Long): Flow<Boolean> {
+    override suspend fun markUserForDeletion(id: Long): Boolean {
         return usersDao.markUserForDeletion(id)
     }
 }

+ 196 - 74
app/src/main/java/com/nextcloud/talk/users/UserManager.kt

@@ -29,144 +29,262 @@ import com.nextcloud.talk.models.ExternalSignalingServer
 import com.nextcloud.talk.models.json.capabilities.Capabilities
 import com.nextcloud.talk.models.json.push.PushConfigurationState
 import com.nextcloud.talk.utils.database.user.CurrentUserProviderNew
-import kotlinx.coroutines.flow.Flow
-import kotlinx.coroutines.flow.flowOf
+import io.reactivex.Observable
+import io.reactivex.Observer
+import io.reactivex.disposables.Disposable
 import java.lang.Boolean.FALSE
+import java.lang.Boolean.TRUE
 
 @Suppress("TooManyFunctions")
 class UserManager internal constructor(private val userRepository: UsersRepository) : CurrentUserProviderNew {
-    suspend fun anyUserExists(): Boolean {
-        var result = FALSE
-        userRepository.getUsers().collect {
-            result = it.isNotEmpty()
-        }
-
-        return result
-    }
-
-    suspend fun hasMultipleUsers(): Flow<Boolean> {
-        var result = FALSE
-        userRepository.getUsers().collect {
-            result = it.size > 1
-        }
-
-        return flowOf(result)
-    }
-
-    val users: Flow<List<UserNgEntity>>
+    val users: Observable<List<UserNgEntity>>
         get() = userRepository.getUsers()
 
-    val usersScheduledForDeletion: Flow<List<UserNgEntity>>
+    val usersScheduledForDeletion: Observable<List<UserNgEntity>>
         get() = userRepository.getUsersScheduledForDeletion()
 
-    private suspend fun setAnyUserAndSetAsActive(): Flow<UserNgEntity?> {
+    private fun setAnyUserAndSetAsActive(): Observable<UserNgEntity?> {
         val results = userRepository.getUsersNotScheduledForDeletion()
 
         var result: UserNgEntity? = null
 
-        results.collect {
-            if (it.isNotEmpty()) {
-                val user = it[0]
-                user.current = true
-                userRepository.updateUser(user)
-                result = user
+        results.subscribe(object : Observer<List<UserNgEntity>> {
+            override fun onSubscribe(d: Disposable) {
+                // unused atm
             }
-        }
 
-        return flowOf(result)
+            override fun onNext(users: List<UserNgEntity>) {
+                if (users.isNotEmpty()) {
+                    val user = users[0]
+                    user.current = true
+                    userRepository.updateUser(user)
+                    result = user
+                }
+            }
+
+            override fun onError(e: Throwable) {
+                // unused atm
+            }
+
+            override fun onComplete() {
+                // unused atm
+            }
+        })
+
+        return Observable.just(result)
     }
 
-    override val currentUser: Flow<UserNgEntity?>
+    override val currentUser: UserNgEntity?
         get() {
-            return userRepository.getActiveUser()
+            return userRepository.getActiveUserSynchronously()
         }
 
-    suspend fun deleteUser(internalId: Long) {
+    fun deleteUser(internalId: Long) {
         userRepository.deleteUserWithId(internalId)
     }
 
-    suspend fun deleteUserWithId(internalId: Long) {
+    fun deleteUserWithId(internalId: Long) {
         userRepository.deleteUserWithId(internalId)
     }
 
-    fun getUserById(userId: String): Flow<UserNgEntity?> {
+    fun getUserById(userId: String): Observable<UserNgEntity?> {
         return userRepository.getUserWithUserId(userId)
     }
 
-    fun getUserWithId(id: Long): Flow<UserNgEntity?> {
+    fun getUserWithId(id: Long): Observable<UserNgEntity?> {
         return userRepository.getUserWithId(id)
     }
 
-    suspend fun disableAllUsersWithoutId(userId: Long) {
+    fun disableAllUsersWithoutId(userId: Long) {
         val results = userRepository.getUsersWithoutUserId(userId)
 
-        results.collect {
-            if (it.isNotEmpty()) {
-                for (entity in it) {
-                    entity.current = false
-                    userRepository.updateUser(entity)
+        results.subscribe(object : Observer<List<UserNgEntity>> {
+            override fun onSubscribe(d: Disposable) {
+                // unused atm
+            }
+
+            override fun onNext(users: List<UserNgEntity>) {
+                if (users.isNotEmpty()) {
+                    for (entity in users) {
+                        entity.current = false
+                        userRepository.updateUser(entity)
+                    }
                 }
             }
-        }
+
+            override fun onError(e: Throwable) {
+                // unused atm
+            }
+
+            override fun onComplete() {
+                // unused atm
+            }
+        })
     }
 
-    suspend fun checkIfUserIsScheduledForDeletion(username: String, server: String): Flow<Boolean> {
+    fun checkIfUserIsScheduledForDeletion(username: String, server: String): Observable<Boolean> {
         val results = userRepository.getUserWithUsernameAndServer(username, server)
         var result = FALSE
-        results.collect {
-            result = it?.scheduledForDeletion ?: FALSE
-        }
 
-        return flowOf(result)
+        results.subscribe(object : Observer<UserNgEntity?> {
+            override fun onSubscribe(d: Disposable) {
+                // unused atm
+            }
+
+            override fun onNext(user: UserNgEntity) {
+                result = user.scheduledForDeletion
+            }
+
+            override fun onError(e: Throwable) {
+                // unused atm
+            }
+
+            override fun onComplete() {
+                // unused atm
+            }
+        })
+
+        return Observable.just(result)
     }
 
-    fun getUserWithInternalId(id: Long): Flow<UserNgEntity?> {
+    fun getUserWithInternalId(id: Long): Observable<UserNgEntity?> {
         return userRepository.getUserWithIdNotScheduledForDeletion(id)
     }
 
-    suspend fun getIfUserWithUsernameAndServer(username: String, server: String): Flow<Boolean> {
+    fun getIfUserWithUsernameAndServer(username: String, server: String): Observable<Boolean> {
         val results = userRepository.getUserWithUsernameAndServer(username, server)
         var result = FALSE
-        results.collect {
-            result = it != null
-        }
 
-        return flowOf(result)
+        results.subscribe(object : Observer<UserNgEntity?> {
+            override fun onSubscribe(d: Disposable) {
+                // unused atm
+            }
+
+            override fun onNext(users: UserNgEntity) {
+                result = TRUE
+            }
+
+            override fun onError(e: Throwable) {
+                // unused atm
+            }
+
+            override fun onComplete() {
+                // unused atm
+            }
+        })
+
+        return Observable.just(result)
     }
 
-    suspend fun scheduleUserForDeletionWithId(id: Long): Flow<Boolean> {
+    suspend fun scheduleUserForDeletionWithId(id: Long): Observable<Boolean> {
         val results = userRepository.getUserWithId(id)
         var result = FALSE
 
-        results.collect {
-            if (it != null) {
-                it.scheduledForDeletion = true
-                it.current = false
-                userRepository.updateUser(it)
+        results.subscribe(object : Observer<UserNgEntity?> {
+            override fun onSubscribe(d: Disposable) {
+                // unused atm
             }
-        }
 
-        setAnyUserAndSetAsActive().collect {
-            result = it != null
-        }
+            override fun onNext(user: UserNgEntity) {
+                user.scheduledForDeletion = true
+                user.current = false
+                userRepository.updateUser(user)
+            }
+
+            override fun onError(e: Throwable) {
+                // unused atm
+            }
+
+            override fun onComplete() {
+                // unused atm
+            }
+        })
+
+        results.subscribe(object : Observer<UserNgEntity?> {
+            override fun onSubscribe(d: Disposable) {
+                // unused atm
+            }
+
+            override fun onNext(user: UserNgEntity) {
+                user.scheduledForDeletion = true
+                user.current = false
+                userRepository.updateUser(user)
+            }
+
+            override fun onError(e: Throwable) {
+                // unused atm
+            }
+
+            override fun onComplete() {
+                // unused atm
+            }
+        })
+
+        setAnyUserAndSetAsActive().subscribe(object : Observer<UserNgEntity?> {
+            override fun onSubscribe(d: Disposable) {
+                // unused atm
+            }
+
+            override fun onNext(user: UserNgEntity) {
+                result = TRUE
+            }
+
+            override fun onError(e: Throwable) {
+                // unused atm
+            }
+
+            override fun onComplete() {
+                // unused atm
+            }
+        })
 
-        return flowOf(result)
+        return Observable.just(result)
     }
 
-    suspend fun createOrUpdateUser(
+    fun createOrUpdateUser(
         username: String?,
         userAttributes: UserAttributes,
-    ): Flow<UserNgEntity?> {
+    ): Observable<UserNgEntity?> {
         var user: UserNgEntity? = null
 
         if (userAttributes.id == null && username != null && userAttributes.serverUrl != null) {
-            userRepository.getUserWithUsernameAndServer(username, userAttributes.serverUrl).collect {
-                user = it
-            }
+            userRepository.getUserWithUsernameAndServer(username, userAttributes.serverUrl)
+                .subscribe(object : Observer<UserNgEntity?> {
+                    override fun onSubscribe(d: Disposable) {
+                        // unused atm
+                    }
+
+                    override fun onNext(userEntity: UserNgEntity) {
+                        user = userEntity
+                    }
+
+                    override fun onError(e: Throwable) {
+                        // unused atm
+                    }
+
+                    override fun onComplete() {
+                        // unused atm
+                    }
+                })
         } else if (userAttributes.id != null) {
-            userRepository.getUserWithId(userAttributes.id).collect {
-                user = it
-            }
+            userRepository.getUserWithId(userAttributes.id)
+                .subscribe(object : Observer<UserNgEntity?> {
+                    override fun onSubscribe(d: Disposable) {
+                        // unused atm
+                    }
+
+                    override fun onNext(userEntity: UserNgEntity) {
+                        user = userEntity
+                    }
+
+                    override fun onError(e: Throwable) {
+                        // unused atm
+                    }
+
+                    override fun onComplete() {
+                        // unused atm
+                    }
+                })
         }
 
         if (user == null) {
@@ -292,6 +410,10 @@ class UserManager internal constructor(private val userRepository: UsersReposito
         }
     }
 
+    companion object {
+        const val TAG = "UserManager"
+    }
+
     data class UserAttributes(
         val id: Long?,
         val serverUrl: String?,

+ 1 - 2
app/src/main/java/com/nextcloud/talk/utils/database/user/CurrentUserProviderNew.kt

@@ -21,8 +21,7 @@
 package com.nextcloud.talk.utils.database.user
 
 import com.nextcloud.talk.data.user.model.UserNgEntity
-import kotlinx.coroutines.flow.Flow
 
 interface CurrentUserProviderNew {
-    val currentUser: Flow<UserNgEntity?>
+    val currentUser: UserNgEntity?
 }