123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492 |
- /*
- * Nextcloud Talk - Android Client
- *
- * SPDX-FileCopyrightText: 2023 Marcel Hibbe <dev@mhibbe.de>
- * SPDX-FileCopyrightText: 2017-2018 Mario Danic <mario@lovelyhq.com>
- * SPDX-License-Identifier: GPL-3.0-or-later
- */
- package com.nextcloud.talk.webrtc
- import android.content.Context
- import android.text.TextUtils
- import android.util.Log
- import autodagger.AutoInjector
- import com.bluelinelabs.logansquare.LoganSquare
- import com.nextcloud.talk.application.NextcloudTalkApplication
- import com.nextcloud.talk.application.NextcloudTalkApplication.Companion.sharedApplication
- import com.nextcloud.talk.data.user.model.User
- import com.nextcloud.talk.events.NetworkEvent
- import com.nextcloud.talk.events.WebSocketCommunicationEvent
- import com.nextcloud.talk.models.json.participants.Participant
- import com.nextcloud.talk.models.json.participants.Participant.ActorType
- import com.nextcloud.talk.models.json.signaling.NCSignalingMessage
- import com.nextcloud.talk.models.json.websocket.BaseWebSocketMessage
- import com.nextcloud.talk.models.json.websocket.ByeWebSocketMessage
- import com.nextcloud.talk.models.json.websocket.CallOverallWebSocketMessage
- import com.nextcloud.talk.models.json.websocket.CallWebSocketMessage
- import com.nextcloud.talk.models.json.websocket.ErrorOverallWebSocketMessage
- import com.nextcloud.talk.models.json.websocket.EventOverallWebSocketMessage
- import com.nextcloud.talk.models.json.websocket.HelloResponseOverallWebSocketMessage
- import com.nextcloud.talk.models.json.websocket.JoinedRoomOverallWebSocketMessage
- import com.nextcloud.talk.signaling.SignalingMessageReceiver
- import com.nextcloud.talk.signaling.SignalingMessageSender
- import com.nextcloud.talk.utils.bundle.BundleKeys
- import okhttp3.OkHttpClient
- import okhttp3.Request
- import okhttp3.Response
- import okhttp3.WebSocket
- import okhttp3.WebSocketListener
- import okio.ByteString
- import org.greenrobot.eventbus.EventBus
- import org.greenrobot.eventbus.Subscribe
- import org.greenrobot.eventbus.ThreadMode
- import java.io.IOException
- import java.lang.Thread.sleep
- import javax.inject.Inject
- @AutoInjector(NextcloudTalkApplication::class)
- @Suppress("TooManyFunctions")
- class WebSocketInstance internal constructor(
- conversationUser: User,
- connectionUrl: String,
- webSocketTicket: String
- ) : WebSocketListener() {
- @JvmField
- @Inject
- var okHttpClient: OkHttpClient? = null
- @JvmField
- @Inject
- var eventBus: EventBus? = null
- @JvmField
- @Inject
- var context: Context? = null
- private val conversationUser: User
- private val webSocketTicket: String
- private var resumeId: String? = null
- var sessionId: String? = null
- private set
- private var hasMCU = false
- var isConnected: Boolean
- private set
- private val webSocketConnectionHelper: WebSocketConnectionHelper
- private var internalWebSocket: WebSocket? = null
- private val connectionUrl: String
- private var currentRoomToken: String? = null
- private var currentNormalBackendSession: String? = null
- private var reconnecting = false
- private val usersHashMap: HashMap<String?, Participant>
- private var messagesQueue: MutableList<String> = ArrayList()
- private val signalingMessageReceiver = ExternalSignalingMessageReceiver()
- val signalingMessageSender = ExternalSignalingMessageSender()
- init {
- sharedApplication!!.componentApplication.inject(this)
- this.connectionUrl = connectionUrl
- this.conversationUser = conversationUser
- this.webSocketTicket = webSocketTicket
- webSocketConnectionHelper = WebSocketConnectionHelper()
- usersHashMap = HashMap()
- isConnected = false
- eventBus!!.register(this)
- restartWebSocket()
- }
- private fun sendHello() {
- try {
- if (TextUtils.isEmpty(resumeId)) {
- internalWebSocket!!.send(
- LoganSquare.serialize(
- webSocketConnectionHelper
- .getAssembledHelloModel(conversationUser, webSocketTicket)
- )
- )
- } else {
- internalWebSocket!!.send(
- LoganSquare.serialize(
- webSocketConnectionHelper
- .getAssembledHelloModelForResume(resumeId)
- )
- )
- }
- } catch (e: IOException) {
- Log.e(TAG, "Failed to serialize hello model")
- }
- }
- override fun onOpen(webSocket: WebSocket, response: Response) {
- Log.d(TAG, "Open webSocket")
- internalWebSocket = webSocket
- sendHello()
- }
- private fun closeWebSocket(webSocket: WebSocket) {
- webSocket.close(NORMAL_CLOSURE, null)
- webSocket.cancel()
- if (webSocket === internalWebSocket) {
- isConnected = false
- messagesQueue = ArrayList()
- }
- sleep(ONE_SECOND)
- restartWebSocket()
- }
- fun clearResumeId() {
- resumeId = ""
- }
- fun restartWebSocket() {
- reconnecting = true
- Log.d(TAG, "restartWebSocket: $connectionUrl")
- val request = Request.Builder().url(connectionUrl).build()
- okHttpClient!!.newWebSocket(request, this)
- }
- override fun onMessage(webSocket: WebSocket, text: String) {
- if (webSocket === internalWebSocket) {
- Log.d(TAG, "Receiving : $webSocket $text")
- try {
- val (messageType) = LoganSquare.parse(text, BaseWebSocketMessage::class.java)
- if (messageType != null) {
- when (messageType) {
- "hello" -> processHelloMessage(webSocket, text)
- "error" -> processErrorMessage(webSocket, text)
- "room" -> processJoinedRoomMessage(text)
- "event" -> processEventMessage(text)
- "message" -> processMessage(text)
- "bye" -> {
- isConnected = false
- resumeId = ""
- }
- else -> {}
- }
- } else {
- Log.e(TAG, "Received message with type: null")
- }
- } catch (e: IOException) {
- Log.e(TAG, "Failed to recognize WebSocket message", e)
- }
- }
- }
- @Throws(IOException::class)
- private fun processMessage(text: String) {
- val (_, callWebSocketMessage) = LoganSquare.parse(text, CallOverallWebSocketMessage::class.java)
- if (callWebSocketMessage != null) {
- val ncSignalingMessage = callWebSocketMessage.ncSignalingMessage
- if (ncSignalingMessage != null &&
- TextUtils.isEmpty(ncSignalingMessage.from) &&
- callWebSocketMessage.senderWebSocketMessage != null
- ) {
- ncSignalingMessage.from = callWebSocketMessage.senderWebSocketMessage!!.sessionId
- }
- signalingMessageReceiver.process(callWebSocketMessage)
- }
- }
- @Throws(IOException::class)
- private fun processEventMessage(text: String) {
- val eventOverallWebSocketMessage = LoganSquare.parse(text, EventOverallWebSocketMessage::class.java)
- if (eventOverallWebSocketMessage.eventMap != null) {
- val target = eventOverallWebSocketMessage.eventMap!!["target"] as String?
- if (target != null) {
- when (target) {
- Globals.TARGET_ROOM -> {
- if ("message" == eventOverallWebSocketMessage.eventMap!!["type"]) {
- processRoomMessageMessage(eventOverallWebSocketMessage)
- } else if ("join" == eventOverallWebSocketMessage.eventMap!!["type"]) {
- processRoomJoinMessage(eventOverallWebSocketMessage)
- } else if ("leave" == eventOverallWebSocketMessage.eventMap!!["type"]) {
- processRoomLeaveMessage(eventOverallWebSocketMessage)
- }
- signalingMessageReceiver.process(eventOverallWebSocketMessage.eventMap)
- }
- Globals.TARGET_PARTICIPANTS ->
- signalingMessageReceiver.process(eventOverallWebSocketMessage.eventMap)
- else ->
- Log.i(TAG, "Received unknown/ignored event target: $target")
- }
- } else {
- Log.w(TAG, "Received message with event target: null")
- }
- }
- }
- private fun processRoomMessageMessage(eventOverallWebSocketMessage: EventOverallWebSocketMessage) {
- val messageHashMap = eventOverallWebSocketMessage.eventMap?.get("message") as Map<*, *>?
- if (messageHashMap != null && messageHashMap.containsKey("data")) {
- val dataHashMap = messageHashMap["data"] as Map<*, *>?
- if (dataHashMap != null && dataHashMap.containsKey("chat")) {
- val chatMap = dataHashMap["chat"] as Map<*, *>?
- if (chatMap != null && chatMap.containsKey("refresh") && chatMap["refresh"] as Boolean) {
- val refreshChatHashMap = HashMap<String, String?>()
- refreshChatHashMap[BundleKeys.KEY_ROOM_TOKEN] = messageHashMap["roomid"] as String?
- refreshChatHashMap[BundleKeys.KEY_INTERNAL_USER_ID] = (conversationUser.id!!).toString()
- eventBus!!.post(WebSocketCommunicationEvent("refreshChat", refreshChatHashMap))
- }
- } else if (dataHashMap != null && dataHashMap.containsKey("recording")) {
- val recordingMap = dataHashMap["recording"] as Map<*, *>?
- if (recordingMap != null && recordingMap.containsKey("status")) {
- val status = (recordingMap["status"] as Long?)!!.toInt()
- Log.d(TAG, "status is $status")
- val recordingHashMap = HashMap<String, String>()
- recordingHashMap[BundleKeys.KEY_RECORDING_STATE] = status.toString()
- eventBus!!.post(WebSocketCommunicationEvent("recordingStatus", recordingHashMap))
- }
- }
- }
- }
- private fun processRoomJoinMessage(eventOverallWebSocketMessage: EventOverallWebSocketMessage) {
- val joinEventList = eventOverallWebSocketMessage.eventMap?.get("join") as List<HashMap<String, Any>>?
- var internalHashMap: HashMap<String, Any>
- var participant: Participant
- for (i in joinEventList!!.indices) {
- internalHashMap = joinEventList[i]
- val userMap = internalHashMap["user"] as HashMap<String, Any>?
- participant = Participant()
- val userId = internalHashMap["userid"] as String?
- if (userId != null) {
- participant.actorType = ActorType.USERS
- participant.actorId = userId
- } else {
- participant.actorType = ActorType.GUESTS
- // FIXME seems to be not given by the HPB: participant.setActorId();
- }
- if (userMap != null) {
- // There is no "user" attribute for guest participants.
- participant.displayName = userMap["displayname"] as String?
- }
- usersHashMap[internalHashMap["sessionid"] as String?] = participant
- }
- }
- private fun processRoomLeaveMessage(eventOverallWebSocketMessage: EventOverallWebSocketMessage) {
- val leaveEventList = eventOverallWebSocketMessage.eventMap?.get("leave") as List<String>?
- for (i in leaveEventList!!.indices) {
- usersHashMap.remove(leaveEventList[i])
- }
- }
- fun getUserMap(): HashMap<String?, Participant> {
- return usersHashMap
- }
- @Throws(IOException::class)
- private fun processJoinedRoomMessage(text: String) {
- val (_, roomWebSocketMessage) = LoganSquare.parse(text, JoinedRoomOverallWebSocketMessage::class.java)
- if (roomWebSocketMessage != null) {
- currentRoomToken = roomWebSocketMessage.roomId
- if (roomWebSocketMessage.roomPropertiesWebSocketMessage != null && !TextUtils.isEmpty(currentRoomToken)) {
- sendRoomJoinedEvent()
- }
- }
- }
- @Throws(IOException::class)
- private fun processErrorMessage(webSocket: WebSocket, text: String) {
- Log.e(TAG, "Received error: $text")
- val (_, message) = LoganSquare.parse(text, ErrorOverallWebSocketMessage::class.java)
- if (message != null) {
- if ("no_such_session" == message.code) {
- Log.d(TAG, "WebSocket " + webSocket.hashCode() + " resumeID " + resumeId + " expired")
- resumeId = ""
- currentRoomToken = ""
- currentNormalBackendSession = ""
- restartWebSocket()
- } else if ("hello_expected" == message.code) {
- restartWebSocket()
- }
- }
- }
- @Throws(IOException::class)
- private fun processHelloMessage(webSocket: WebSocket, text: String) {
- isConnected = true
- reconnecting = false
- val oldResumeId = resumeId
- val (_, helloResponseWebSocketMessage1) = LoganSquare.parse(
- text,
- HelloResponseOverallWebSocketMessage::class.java
- )
- if (helloResponseWebSocketMessage1 != null) {
- resumeId = helloResponseWebSocketMessage1.resumeId
- sessionId = helloResponseWebSocketMessage1.sessionId
- hasMCU = helloResponseWebSocketMessage1.serverHasMCUSupport()
- }
- for (i in messagesQueue.indices) {
- webSocket.send(messagesQueue[i])
- }
- messagesQueue = ArrayList()
- val helloHashMap = HashMap<String, String?>()
- if (!TextUtils.isEmpty(oldResumeId)) {
- helloHashMap["oldResumeId"] = oldResumeId
- } else {
- currentRoomToken = ""
- currentNormalBackendSession = ""
- }
- if (!TextUtils.isEmpty(currentRoomToken)) {
- helloHashMap[Globals.ROOM_TOKEN] = currentRoomToken
- }
- eventBus!!.post(WebSocketCommunicationEvent("hello", helloHashMap))
- }
- private fun sendRoomJoinedEvent() {
- val joinRoomHashMap = HashMap<String, String?>()
- joinRoomHashMap[Globals.ROOM_TOKEN] = currentRoomToken
- eventBus!!.post(WebSocketCommunicationEvent("roomJoined", joinRoomHashMap))
- }
- override fun onMessage(webSocket: WebSocket, bytes: ByteString) {
- Log.d(TAG, "Receiving bytes : " + bytes.hex())
- }
- override fun onClosing(webSocket: WebSocket, code: Int, reason: String) {
- Log.d(TAG, "onClosing : $code / $reason")
- }
- override fun onClosed(webSocket: WebSocket, code: Int, reason: String) {
- Log.d(TAG, "onClosed : $code / $reason")
- isConnected = false
- }
- override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {
- Log.e(TAG, "Error : WebSocket " + webSocket.hashCode(), t)
- closeWebSocket(webSocket)
- }
- fun hasMCU(): Boolean {
- return hasMCU
- }
- fun joinRoomWithRoomTokenAndSession(roomToken: String, normalBackendSession: String?) {
- Log.d(TAG, "joinRoomWithRoomTokenAndSession")
- Log.d(TAG, " roomToken: $roomToken")
- Log.d(TAG, " session: $normalBackendSession")
- try {
- val message = LoganSquare.serialize(
- webSocketConnectionHelper.getAssembledJoinOrLeaveRoomModel(roomToken, normalBackendSession)
- )
- if (roomToken == "") {
- Log.d(TAG, "sending 'leave room' via websocket")
- currentNormalBackendSession = ""
- sendMessage(message)
- } else if (roomToken == currentRoomToken && normalBackendSession == currentNormalBackendSession) {
- Log.d(TAG, "roomToken & session are unchanged. Joining locally without to send websocket message")
- sendRoomJoinedEvent()
- } else {
- Log.d(TAG, "Sending join room message via websocket")
- currentNormalBackendSession = normalBackendSession
- sendMessage(message)
- }
- } catch (e: IOException) {
- Log.e(TAG, "Failed to serialize signaling message", e)
- }
- }
- private fun sendCallMessage(ncSignalingMessage: NCSignalingMessage) {
- try {
- val message = LoganSquare.serialize(
- webSocketConnectionHelper.getAssembledCallMessageModel(ncSignalingMessage)
- )
- sendMessage(message)
- } catch (e: IOException) {
- Log.e(TAG, "Failed to serialize signaling message", e)
- }
- }
- private fun sendMessage(message: String) {
- if (!isConnected || reconnecting) {
- messagesQueue.add(message)
- if (!reconnecting) {
- restartWebSocket()
- }
- } else {
- if (!internalWebSocket!!.send(message)) {
- messagesQueue.add(message)
- restartWebSocket()
- }
- }
- }
- fun sendBye() {
- if (isConnected) {
- try {
- val byeWebSocketMessage = ByeWebSocketMessage()
- byeWebSocketMessage.type = "bye"
- byeWebSocketMessage.bye = HashMap()
- internalWebSocket!!.send(LoganSquare.serialize(byeWebSocketMessage))
- } catch (e: IOException) {
- Log.e(TAG, "Failed to serialize bye message")
- }
- }
- }
- fun getDisplayNameForSession(session: String?): String? {
- val participant = usersHashMap[session]
- if (participant != null) {
- if (participant.displayName != null) {
- return participant.displayName
- }
- }
- return ""
- }
- @Subscribe(threadMode = ThreadMode.BACKGROUND)
- fun onMessageEvent(networkEvent: NetworkEvent) {
- if (networkEvent.networkConnectionEvent == NetworkEvent.NetworkConnectionEvent.NETWORK_CONNECTED &&
- !isConnected
- ) {
- restartWebSocket()
- }
- }
- fun getSignalingMessageReceiver(): SignalingMessageReceiver {
- return signalingMessageReceiver
- }
- /**
- * Temporary implementation of SignalingMessageReceiver until signaling related code is extracted to a Signaling
- * class.
- *
- *
- * All listeners are called in the WebSocket reader thread. This thread should be the same as long as the WebSocket
- * stays connected, but it may change whenever it is connected again.
- */
- private class ExternalSignalingMessageReceiver : SignalingMessageReceiver() {
- fun process(eventMap: Map<String, Any?>?) {
- processEvent(eventMap)
- }
- fun process(message: CallWebSocketMessage?) {
- if (message?.ncSignalingMessage?.type == "startedTyping" ||
- message?.ncSignalingMessage?.type == "stoppedTyping"
- ) {
- processCallWebSocketMessage(message)
- } else {
- processSignalingMessage(message?.ncSignalingMessage)
- }
- }
- }
- inner class ExternalSignalingMessageSender : SignalingMessageSender {
- override fun send(ncSignalingMessage: NCSignalingMessage) {
- sendCallMessage(ncSignalingMessage)
- }
- }
- companion object {
- private const val TAG = "WebSocketInstance"
- private const val NORMAL_CLOSURE = 1000
- private const val ONE_SECOND: Long = 1000
- }
- }
|