WebSocketInstance.kt 20 KB


  1. /*
  2. * Nextcloud Talk - Android Client
  3. *
  4. * SPDX-FileCopyrightText: 2023 Marcel Hibbe <dev@mhibbe.de>
  5. * SPDX-FileCopyrightText: 2017-2018 Mario Danic <mario@lovelyhq.com>
  6. * SPDX-License-Identifier: GPL-3.0-or-later
  7. */
  8. package com.nextcloud.talk.webrtc
  9. import android.content.Context
  10. import android.text.TextUtils
  11. import android.util.Log
  12. import autodagger.AutoInjector
  13. import com.bluelinelabs.logansquare.LoganSquare
  14. import com.nextcloud.talk.application.NextcloudTalkApplication
  15. import com.nextcloud.talk.application.NextcloudTalkApplication.Companion.sharedApplication
  16. import com.nextcloud.talk.data.user.model.User
  17. import com.nextcloud.talk.events.NetworkEvent
  18. import com.nextcloud.talk.events.WebSocketCommunicationEvent
  19. import com.nextcloud.talk.models.json.participants.Participant
  20. import com.nextcloud.talk.models.json.participants.Participant.ActorType
  21. import com.nextcloud.talk.models.json.signaling.NCSignalingMessage
  22. import com.nextcloud.talk.models.json.signaling.settings.FederationSettings
  23. import com.nextcloud.talk.models.json.websocket.BaseWebSocketMessage
  24. import com.nextcloud.talk.models.json.websocket.ByeWebSocketMessage
  25. import com.nextcloud.talk.models.json.websocket.CallOverallWebSocketMessage
  26. import com.nextcloud.talk.models.json.websocket.CallWebSocketMessage
  27. import com.nextcloud.talk.models.json.websocket.ErrorOverallWebSocketMessage
  28. import com.nextcloud.talk.models.json.websocket.EventOverallWebSocketMessage
  29. import com.nextcloud.talk.models.json.websocket.HelloResponseOverallWebSocketMessage
  30. import com.nextcloud.talk.models.json.websocket.JoinedRoomOverallWebSocketMessage
  31. import com.nextcloud.talk.signaling.SignalingMessageReceiver
  32. import com.nextcloud.talk.signaling.SignalingMessageSender
  33. import com.nextcloud.talk.utils.bundle.BundleKeys
  34. import okhttp3.OkHttpClient
  35. import okhttp3.Request
  36. import okhttp3.Response
  37. import okhttp3.WebSocket
  38. import okhttp3.WebSocketListener
  39. import okio.ByteString
  40. import org.greenrobot.eventbus.EventBus
  41. import org.greenrobot.eventbus.Subscribe
  42. import org.greenrobot.eventbus.ThreadMode
  43. import java.io.IOException
  44. import java.lang.Thread.sleep
  45. import javax.inject.Inject
  46. @AutoInjector(NextcloudTalkApplication::class)
  47. @Suppress("TooManyFunctions")
  48. class WebSocketInstance internal constructor(
  49. conversationUser: User,
  50. connectionUrl: String,
  51. webSocketTicket: String
  52. ) : WebSocketListener() {
  53. @JvmField
  54. @Inject
  55. var okHttpClient: OkHttpClient? = null
  56. @JvmField
  57. @Inject
  58. var eventBus: EventBus? = null
  59. @JvmField
  60. @Inject
  61. var context: Context? = null
  62. private val conversationUser: User
  63. private val webSocketTicket: String
  64. private var resumeId: String? = null
  65. var sessionId: String? = null
  66. private set
  67. private var hasMCU = false
  68. var isConnected: Boolean
  69. private set
  70. private val webSocketConnectionHelper: WebSocketConnectionHelper
  71. private var internalWebSocket: WebSocket? = null
  72. private val connectionUrl: String
  73. private var currentRoomToken: String? = null
  74. private var currentNormalBackendSession: String? = null
  75. private var currentFederation: FederationSettings? = null
  76. private var reconnecting = false
  77. private val usersHashMap: HashMap<String?, Participant>
  78. private var messagesQueue: MutableList<String> = ArrayList()
  79. private val signalingMessageReceiver = ExternalSignalingMessageReceiver()
  80. val signalingMessageSender = ExternalSignalingMessageSender()
  81. init {
  82. sharedApplication!!.componentApplication.inject(this)
  83. this.connectionUrl = connectionUrl
  84. this.conversationUser = conversationUser
  85. this.webSocketTicket = webSocketTicket
  86. webSocketConnectionHelper = WebSocketConnectionHelper()
  87. usersHashMap = HashMap()
  88. isConnected = false
  89. eventBus!!.register(this)
  90. restartWebSocket()
  91. }
  92. private fun sendHello() {
  93. try {
  94. if (TextUtils.isEmpty(resumeId)) {
  95. internalWebSocket!!.send(
  96. LoganSquare.serialize(
  97. webSocketConnectionHelper
  98. .getAssembledHelloModel(conversationUser, webSocketTicket)
  99. )
  100. )
  101. } else {
  102. internalWebSocket!!.send(
  103. LoganSquare.serialize(
  104. webSocketConnectionHelper
  105. .getAssembledHelloModelForResume(resumeId)
  106. )
  107. )
  108. }
  109. } catch (e: IOException) {
  110. Log.e(TAG, "Failed to serialize hello model")
  111. }
  112. }
  113. override fun onOpen(webSocket: WebSocket, response: Response) {
  114. Log.d(TAG, "Open webSocket")
  115. internalWebSocket = webSocket
  116. sendHello()
  117. }
  118. private fun closeWebSocket(webSocket: WebSocket) {
  119. webSocket.close(NORMAL_CLOSURE, null)
  120. webSocket.cancel()
  121. if (webSocket === internalWebSocket) {
  122. isConnected = false
  123. messagesQueue = ArrayList()
  124. }
  125. sleep(ONE_SECOND)
  126. restartWebSocket()
  127. }
  128. fun clearResumeId() {
  129. resumeId = ""
  130. }
  131. fun restartWebSocket() {
  132. reconnecting = true
  133. Log.d(TAG, "restartWebSocket: $connectionUrl")
  134. val request = Request.Builder().url(connectionUrl).build()
  135. okHttpClient!!.newWebSocket(request, this)
  136. }
  137. override fun onMessage(webSocket: WebSocket, text: String) {
  138. if (webSocket === internalWebSocket) {
  139. Log.d(TAG, "Receiving : $webSocket $text")
  140. try {
  141. val (messageType) = LoganSquare.parse(text, BaseWebSocketMessage::class.java)
  142. if (messageType != null) {
  143. when (messageType) {
  144. "hello" -> processHelloMessage(webSocket, text)
  145. "error" -> processErrorMessage(webSocket, text)
  146. "room" -> processJoinedRoomMessage(text)
  147. "event" -> processEventMessage(text)
  148. "message" -> processMessage(text)
  149. "bye" -> {
  150. isConnected = false
  151. resumeId = ""
  152. }
  153. else -> {}
  154. }
  155. } else {
  156. Log.e(TAG, "Received message with type: null")
  157. }
  158. } catch (e: IOException) {
  159. Log.e(TAG, "Failed to recognize WebSocket message", e)
  160. }
  161. }
  162. }
  163. @Throws(IOException::class)
  164. private fun processMessage(text: String) {
  165. val (_, callWebSocketMessage) = LoganSquare.parse(text, CallOverallWebSocketMessage::class.java)
  166. if (callWebSocketMessage != null) {
  167. val ncSignalingMessage = callWebSocketMessage.ncSignalingMessage
  168. if (ncSignalingMessage != null &&
  169. TextUtils.isEmpty(ncSignalingMessage.from) &&
  170. callWebSocketMessage.senderWebSocketMessage != null
  171. ) {
  172. ncSignalingMessage.from = callWebSocketMessage.senderWebSocketMessage!!.sessionId
  173. }
  174. signalingMessageReceiver.process(callWebSocketMessage)
  175. }
  176. }
  177. @Throws(IOException::class)
  178. private fun processEventMessage(text: String) {
  179. val eventOverallWebSocketMessage = LoganSquare.parse(text, EventOverallWebSocketMessage::class.java)
  180. if (eventOverallWebSocketMessage.eventMap != null) {
  181. val target = eventOverallWebSocketMessage.eventMap!!["target"] as String?
  182. if (target != null) {
  183. when (target) {
  184. Globals.TARGET_ROOM -> {
  185. if ("message" == eventOverallWebSocketMessage.eventMap!!["type"]) {
  186. processRoomMessageMessage(eventOverallWebSocketMessage)
  187. } else if ("join" == eventOverallWebSocketMessage.eventMap!!["type"]) {
  188. processRoomJoinMessage(eventOverallWebSocketMessage)
  189. } else if ("leave" == eventOverallWebSocketMessage.eventMap!!["type"]) {
  190. processRoomLeaveMessage(eventOverallWebSocketMessage)
  191. }
  192. signalingMessageReceiver.process(eventOverallWebSocketMessage.eventMap)
  193. }
  194. Globals.TARGET_PARTICIPANTS ->
  195. signalingMessageReceiver.process(eventOverallWebSocketMessage.eventMap)
  196. else ->
  197. Log.i(TAG, "Received unknown/ignored event target: $target")
  198. }
  199. } else {
  200. Log.w(TAG, "Received message with event target: null")
  201. }
  202. }
  203. }
  204. private fun processRoomMessageMessage(eventOverallWebSocketMessage: EventOverallWebSocketMessage) {
  205. val messageHashMap = eventOverallWebSocketMessage.eventMap?.get("message") as Map<*, *>?
  206. if (messageHashMap != null && messageHashMap.containsKey("data")) {
  207. val dataHashMap = messageHashMap["data"] as Map<*, *>?
  208. if (dataHashMap != null && dataHashMap.containsKey("chat")) {
  209. val chatMap = dataHashMap["chat"] as Map<*, *>?
  210. if (chatMap != null && chatMap.containsKey("refresh") && chatMap["refresh"] as Boolean) {
  211. val refreshChatHashMap = HashMap<String, String?>()
  212. refreshChatHashMap[BundleKeys.KEY_ROOM_TOKEN] = messageHashMap["roomid"] as String?
  213. refreshChatHashMap[BundleKeys.KEY_INTERNAL_USER_ID] = (conversationUser.id!!).toString()
  214. eventBus!!.post(WebSocketCommunicationEvent("refreshChat", refreshChatHashMap))
  215. }
  216. } else if (dataHashMap != null && dataHashMap.containsKey("recording")) {
  217. val recordingMap = dataHashMap["recording"] as Map<*, *>?
  218. if (recordingMap != null && recordingMap.containsKey("status")) {
  219. val status = (recordingMap["status"] as Long?)!!.toInt()
  220. Log.d(TAG, "status is $status")
  221. val recordingHashMap = HashMap<String, String>()
  222. recordingHashMap[BundleKeys.KEY_RECORDING_STATE] = status.toString()
  223. eventBus!!.post(WebSocketCommunicationEvent("recordingStatus", recordingHashMap))
  224. }
  225. }
  226. }
  227. }
  228. private fun processRoomJoinMessage(eventOverallWebSocketMessage: EventOverallWebSocketMessage) {
  229. val joinEventList = eventOverallWebSocketMessage.eventMap?.get("join") as List<HashMap<String, Any>>?
  230. var internalHashMap: HashMap<String, Any>
  231. var participant: Participant
  232. for (i in joinEventList!!.indices) {
  233. internalHashMap = joinEventList[i]
  234. val userMap = internalHashMap["user"] as HashMap<String, Any>?
  235. participant = Participant()
  236. val userId = internalHashMap["userid"] as String?
  237. if (userId != null) {
  238. participant.actorType = ActorType.USERS
  239. participant.actorId = userId
  240. } else {
  241. participant.actorType = ActorType.GUESTS
  242. // FIXME seems to be not given by the HPB: participant.setActorId();
  243. }
  244. if (userMap != null) {
  245. // There is no "user" attribute for guest participants.
  246. participant.displayName = userMap["displayname"] as String?
  247. }
  248. usersHashMap[internalHashMap["sessionid"] as String?] = participant
  249. }
  250. }
  251. private fun processRoomLeaveMessage(eventOverallWebSocketMessage: EventOverallWebSocketMessage) {
  252. val leaveEventList = eventOverallWebSocketMessage.eventMap?.get("leave") as List<String>?
  253. for (i in leaveEventList!!.indices) {
  254. usersHashMap.remove(leaveEventList[i])
  255. }
  256. }
  257. fun getUserMap(): HashMap<String?, Participant> {
  258. return usersHashMap
  259. }
  260. @Throws(IOException::class)
  261. private fun processJoinedRoomMessage(text: String) {
  262. val (_, roomWebSocketMessage) = LoganSquare.parse(text, JoinedRoomOverallWebSocketMessage::class.java)
  263. if (roomWebSocketMessage != null) {
  264. currentRoomToken = roomWebSocketMessage.roomId
  265. if (roomWebSocketMessage.roomPropertiesWebSocketMessage != null && !TextUtils.isEmpty(currentRoomToken)) {
  266. sendRoomJoinedEvent()
  267. }
  268. }
  269. }
  270. @Throws(IOException::class)
  271. private fun processErrorMessage(webSocket: WebSocket, text: String) {
  272. Log.e(TAG, "Received error: $text")
  273. val (_, message) = LoganSquare.parse(text, ErrorOverallWebSocketMessage::class.java)
  274. if (message != null) {
  275. if ("no_such_session" == message.code) {
  276. Log.d(TAG, "WebSocket " + webSocket.hashCode() + " resumeID " + resumeId + " expired")
  277. resumeId = ""
  278. currentRoomToken = ""
  279. currentNormalBackendSession = ""
  280. restartWebSocket()
  281. } else if ("hello_expected" == message.code) {
  282. restartWebSocket()
  283. }
  284. }
  285. }
  286. @Throws(IOException::class)
  287. private fun processHelloMessage(webSocket: WebSocket, text: String) {
  288. isConnected = true
  289. reconnecting = false
  290. val oldResumeId = resumeId
  291. val (_, helloResponseWebSocketMessage1) = LoganSquare.parse(
  292. text,
  293. HelloResponseOverallWebSocketMessage::class.java
  294. )
  295. if (helloResponseWebSocketMessage1 != null) {
  296. resumeId = helloResponseWebSocketMessage1.resumeId
  297. sessionId = helloResponseWebSocketMessage1.sessionId
  298. hasMCU = helloResponseWebSocketMessage1.serverHasMCUSupport()
  299. }
  300. for (i in messagesQueue.indices) {
  301. webSocket.send(messagesQueue[i])
  302. }
  303. messagesQueue = ArrayList()
  304. val helloHashMap = HashMap<String, String?>()
  305. if (!TextUtils.isEmpty(oldResumeId)) {
  306. helloHashMap["oldResumeId"] = oldResumeId
  307. } else {
  308. currentRoomToken = ""
  309. currentNormalBackendSession = ""
  310. }
  311. if (!TextUtils.isEmpty(currentRoomToken)) {
  312. helloHashMap[Globals.ROOM_TOKEN] = currentRoomToken
  313. }
  314. eventBus!!.post(WebSocketCommunicationEvent("hello", helloHashMap))
  315. }
  316. private fun sendRoomJoinedEvent() {
  317. val joinRoomHashMap = HashMap<String, String?>()
  318. joinRoomHashMap[Globals.ROOM_TOKEN] = currentRoomToken
  319. eventBus!!.post(WebSocketCommunicationEvent("roomJoined", joinRoomHashMap))
  320. }
  321. override fun onMessage(webSocket: WebSocket, bytes: ByteString) {
  322. Log.d(TAG, "Receiving bytes : " + bytes.hex())
  323. }
  324. override fun onClosing(webSocket: WebSocket, code: Int, reason: String) {
  325. Log.d(TAG, "onClosing : $code / $reason")
  326. }
  327. override fun onClosed(webSocket: WebSocket, code: Int, reason: String) {
  328. Log.d(TAG, "onClosed : $code / $reason")
  329. isConnected = false
  330. }
  331. override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {
  332. Log.e(TAG, "Error : WebSocket " + webSocket.hashCode(), t)
  333. closeWebSocket(webSocket)
  334. }
  335. fun hasMCU(): Boolean {
  336. return hasMCU
  337. }
  338. @Suppress("Detekt.ComplexMethod")
  339. fun joinRoomWithRoomTokenAndSession(
  340. roomToken: String,
  341. normalBackendSession: String?,
  342. federation: FederationSettings? = null
  343. ) {
  344. Log.d(TAG, "joinRoomWithRoomTokenAndSession")
  345. Log.d(TAG, " roomToken: $roomToken")
  346. Log.d(TAG, " session: $normalBackendSession")
  347. try {
  348. val message = LoganSquare.serialize(
  349. webSocketConnectionHelper.getAssembledJoinOrLeaveRoomModel(roomToken, normalBackendSession, federation)
  350. )
  351. if (roomToken == "") {
  352. Log.d(TAG, "sending 'leave room' via websocket")
  353. currentNormalBackendSession = ""
  354. currentFederation = null
  355. sendMessage(message)
  356. } else if (
  357. roomToken == currentRoomToken &&
  358. normalBackendSession == currentNormalBackendSession &&
  359. federation?.roomId == currentFederation?.roomId &&
  360. federation?.nextcloudServer == currentFederation?.nextcloudServer
  361. ) {
  362. Log.d(TAG, "roomToken & session are unchanged. Joining locally without to send websocket message")
  363. sendRoomJoinedEvent()
  364. } else {
  365. Log.d(TAG, "Sending join room message via websocket")
  366. currentNormalBackendSession = normalBackendSession
  367. currentFederation = federation
  368. sendMessage(message)
  369. }
  370. } catch (e: IOException) {
  371. Log.e(TAG, "Failed to serialize signaling message", e)
  372. }
  373. }
  374. private fun sendCallMessage(ncSignalingMessage: NCSignalingMessage) {
  375. try {
  376. val message = LoganSquare.serialize(
  377. webSocketConnectionHelper.getAssembledCallMessageModel(ncSignalingMessage)
  378. )
  379. sendMessage(message)
  380. } catch (e: IOException) {
  381. Log.e(TAG, "Failed to serialize signaling message", e)
  382. }
  383. }
  384. private fun sendMessage(message: String) {
  385. if (!isConnected || reconnecting) {
  386. messagesQueue.add(message)
  387. if (!reconnecting) {
  388. restartWebSocket()
  389. }
  390. } else {
  391. if (!internalWebSocket!!.send(message)) {
  392. messagesQueue.add(message)
  393. restartWebSocket()
  394. }
  395. }
  396. }
  397. fun sendBye() {
  398. if (isConnected) {
  399. try {
  400. val byeWebSocketMessage = ByeWebSocketMessage()
  401. byeWebSocketMessage.type = "bye"
  402. byeWebSocketMessage.bye = HashMap()
  403. internalWebSocket!!.send(LoganSquare.serialize(byeWebSocketMessage))
  404. } catch (e: IOException) {
  405. Log.e(TAG, "Failed to serialize bye message")
  406. }
  407. }
  408. }
  409. fun getDisplayNameForSession(session: String?): String? {
  410. val participant = usersHashMap[session]
  411. if (participant != null) {
  412. if (participant.displayName != null) {
  413. return participant.displayName
  414. }
  415. }
  416. return ""
  417. }
  418. @Subscribe(threadMode = ThreadMode.BACKGROUND)
  419. fun onMessageEvent(networkEvent: NetworkEvent) {
  420. if (networkEvent.networkConnectionEvent == NetworkEvent.NetworkConnectionEvent.NETWORK_CONNECTED &&
  421. !isConnected
  422. ) {
  423. restartWebSocket()
  424. }
  425. }
  426. fun getSignalingMessageReceiver(): SignalingMessageReceiver {
  427. return signalingMessageReceiver
  428. }
  429. /**
  430. * Temporary implementation of SignalingMessageReceiver until signaling related code is extracted to a Signaling
  431. * class.
  432. *
  433. *
  434. * All listeners are called in the WebSocket reader thread. This thread should be the same as long as the WebSocket
  435. * stays connected, but it may change whenever it is connected again.
  436. */
  437. private class ExternalSignalingMessageReceiver : SignalingMessageReceiver() {
  438. fun process(eventMap: Map<String, Any?>?) {
  439. processEvent(eventMap)
  440. }
  441. fun process(message: CallWebSocketMessage?) {
  442. if (message?.ncSignalingMessage?.type == "startedTyping" ||
  443. message?.ncSignalingMessage?.type == "stoppedTyping"
  444. ) {
  445. processCallWebSocketMessage(message)
  446. } else {
  447. processSignalingMessage(message?.ncSignalingMessage)
  448. }
  449. }
  450. }
  451. inner class ExternalSignalingMessageSender : SignalingMessageSender {
  452. override fun send(ncSignalingMessage: NCSignalingMessage) {
  453. sendCallMessage(ncSignalingMessage)
  454. }
  455. }
  456. companion object {
  457. private const val TAG = "WebSocketInstance"
  458. private const val NORMAL_CLOSURE = 1000
  459. private const val ONE_SECOND: Long = 1000
  460. }
  461. }