WebSocketInstance.kt 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492
  1. /*
  2. * Nextcloud Talk - Android Client
  3. *
  4. * SPDX-FileCopyrightText: 2023 Marcel Hibbe <dev@mhibbe.de>
  5. * SPDX-FileCopyrightText: 2017-2018 Mario Danic <mario@lovelyhq.com>
  6. * SPDX-License-Identifier: GPL-3.0-or-later
  7. */
  8. package com.nextcloud.talk.webrtc
  9. import android.content.Context
  10. import android.text.TextUtils
  11. import android.util.Log
  12. import autodagger.AutoInjector
  13. import com.bluelinelabs.logansquare.LoganSquare
  14. import com.nextcloud.talk.application.NextcloudTalkApplication
  15. import com.nextcloud.talk.application.NextcloudTalkApplication.Companion.sharedApplication
  16. import com.nextcloud.talk.data.user.model.User
  17. import com.nextcloud.talk.events.NetworkEvent
  18. import com.nextcloud.talk.events.WebSocketCommunicationEvent
  19. import com.nextcloud.talk.models.json.participants.Participant
  20. import com.nextcloud.talk.models.json.participants.Participant.ActorType
  21. import com.nextcloud.talk.models.json.signaling.NCSignalingMessage
  22. import com.nextcloud.talk.models.json.websocket.BaseWebSocketMessage
  23. import com.nextcloud.talk.models.json.websocket.ByeWebSocketMessage
  24. import com.nextcloud.talk.models.json.websocket.CallOverallWebSocketMessage
  25. import com.nextcloud.talk.models.json.websocket.CallWebSocketMessage
  26. import com.nextcloud.talk.models.json.websocket.ErrorOverallWebSocketMessage
  27. import com.nextcloud.talk.models.json.websocket.EventOverallWebSocketMessage
  28. import com.nextcloud.talk.models.json.websocket.HelloResponseOverallWebSocketMessage
  29. import com.nextcloud.talk.models.json.websocket.JoinedRoomOverallWebSocketMessage
  30. import com.nextcloud.talk.signaling.SignalingMessageReceiver
  31. import com.nextcloud.talk.signaling.SignalingMessageSender
  32. import com.nextcloud.talk.utils.bundle.BundleKeys
  33. import okhttp3.OkHttpClient
  34. import okhttp3.Request
  35. import okhttp3.Response
  36. import okhttp3.WebSocket
  37. import okhttp3.WebSocketListener
  38. import okio.ByteString
  39. import org.greenrobot.eventbus.EventBus
  40. import org.greenrobot.eventbus.Subscribe
  41. import org.greenrobot.eventbus.ThreadMode
  42. import java.io.IOException
  43. import java.lang.Thread.sleep
  44. import javax.inject.Inject
  45. @AutoInjector(NextcloudTalkApplication::class)
  46. @Suppress("TooManyFunctions")
  47. class WebSocketInstance internal constructor(
  48. conversationUser: User,
  49. connectionUrl: String,
  50. webSocketTicket: String
  51. ) : WebSocketListener() {
  52. @JvmField
  53. @Inject
  54. var okHttpClient: OkHttpClient? = null
  55. @JvmField
  56. @Inject
  57. var eventBus: EventBus? = null
  58. @JvmField
  59. @Inject
  60. var context: Context? = null
  61. private val conversationUser: User
  62. private val webSocketTicket: String
  63. private var resumeId: String? = null
  64. var sessionId: String? = null
  65. private set
  66. private var hasMCU = false
  67. var isConnected: Boolean
  68. private set
  69. private val webSocketConnectionHelper: WebSocketConnectionHelper
  70. private var internalWebSocket: WebSocket? = null
  71. private val connectionUrl: String
  72. private var currentRoomToken: String? = null
  73. private var currentNormalBackendSession: String? = null
  74. private var reconnecting = false
  75. private val usersHashMap: HashMap<String?, Participant>
  76. private var messagesQueue: MutableList<String> = ArrayList()
  77. private val signalingMessageReceiver = ExternalSignalingMessageReceiver()
  78. val signalingMessageSender = ExternalSignalingMessageSender()
  79. init {
  80. sharedApplication!!.componentApplication.inject(this)
  81. this.connectionUrl = connectionUrl
  82. this.conversationUser = conversationUser
  83. this.webSocketTicket = webSocketTicket
  84. webSocketConnectionHelper = WebSocketConnectionHelper()
  85. usersHashMap = HashMap()
  86. isConnected = false
  87. eventBus!!.register(this)
  88. restartWebSocket()
  89. }
  90. private fun sendHello() {
  91. try {
  92. if (TextUtils.isEmpty(resumeId)) {
  93. internalWebSocket!!.send(
  94. LoganSquare.serialize(
  95. webSocketConnectionHelper
  96. .getAssembledHelloModel(conversationUser, webSocketTicket)
  97. )
  98. )
  99. } else {
  100. internalWebSocket!!.send(
  101. LoganSquare.serialize(
  102. webSocketConnectionHelper
  103. .getAssembledHelloModelForResume(resumeId)
  104. )
  105. )
  106. }
  107. } catch (e: IOException) {
  108. Log.e(TAG, "Failed to serialize hello model")
  109. }
  110. }
  111. override fun onOpen(webSocket: WebSocket, response: Response) {
  112. Log.d(TAG, "Open webSocket")
  113. internalWebSocket = webSocket
  114. sendHello()
  115. }
  116. private fun closeWebSocket(webSocket: WebSocket) {
  117. webSocket.close(NORMAL_CLOSURE, null)
  118. webSocket.cancel()
  119. if (webSocket === internalWebSocket) {
  120. isConnected = false
  121. messagesQueue = ArrayList()
  122. }
  123. sleep(ONE_SECOND)
  124. restartWebSocket()
  125. }
  126. fun clearResumeId() {
  127. resumeId = ""
  128. }
  129. fun restartWebSocket() {
  130. reconnecting = true
  131. Log.d(TAG, "restartWebSocket: $connectionUrl")
  132. val request = Request.Builder().url(connectionUrl).build()
  133. okHttpClient!!.newWebSocket(request, this)
  134. }
  135. override fun onMessage(webSocket: WebSocket, text: String) {
  136. if (webSocket === internalWebSocket) {
  137. Log.d(TAG, "Receiving : $webSocket $text")
  138. try {
  139. val (messageType) = LoganSquare.parse(text, BaseWebSocketMessage::class.java)
  140. if (messageType != null) {
  141. when (messageType) {
  142. "hello" -> processHelloMessage(webSocket, text)
  143. "error" -> processErrorMessage(webSocket, text)
  144. "room" -> processJoinedRoomMessage(text)
  145. "event" -> processEventMessage(text)
  146. "message" -> processMessage(text)
  147. "bye" -> {
  148. isConnected = false
  149. resumeId = ""
  150. }
  151. else -> {}
  152. }
  153. } else {
  154. Log.e(TAG, "Received message with type: null")
  155. }
  156. } catch (e: IOException) {
  157. Log.e(TAG, "Failed to recognize WebSocket message", e)
  158. }
  159. }
  160. }
  161. @Throws(IOException::class)
  162. private fun processMessage(text: String) {
  163. val (_, callWebSocketMessage) = LoganSquare.parse(text, CallOverallWebSocketMessage::class.java)
  164. if (callWebSocketMessage != null) {
  165. val ncSignalingMessage = callWebSocketMessage.ncSignalingMessage
  166. if (ncSignalingMessage != null &&
  167. TextUtils.isEmpty(ncSignalingMessage.from) &&
  168. callWebSocketMessage.senderWebSocketMessage != null
  169. ) {
  170. ncSignalingMessage.from = callWebSocketMessage.senderWebSocketMessage!!.sessionId
  171. }
  172. signalingMessageReceiver.process(callWebSocketMessage)
  173. }
  174. }
  175. @Throws(IOException::class)
  176. private fun processEventMessage(text: String) {
  177. val eventOverallWebSocketMessage = LoganSquare.parse(text, EventOverallWebSocketMessage::class.java)
  178. if (eventOverallWebSocketMessage.eventMap != null) {
  179. val target = eventOverallWebSocketMessage.eventMap!!["target"] as String?
  180. if (target != null) {
  181. when (target) {
  182. Globals.TARGET_ROOM -> {
  183. if ("message" == eventOverallWebSocketMessage.eventMap!!["type"]) {
  184. processRoomMessageMessage(eventOverallWebSocketMessage)
  185. } else if ("join" == eventOverallWebSocketMessage.eventMap!!["type"]) {
  186. processRoomJoinMessage(eventOverallWebSocketMessage)
  187. } else if ("leave" == eventOverallWebSocketMessage.eventMap!!["type"]) {
  188. processRoomLeaveMessage(eventOverallWebSocketMessage)
  189. }
  190. signalingMessageReceiver.process(eventOverallWebSocketMessage.eventMap)
  191. }
  192. Globals.TARGET_PARTICIPANTS ->
  193. signalingMessageReceiver.process(eventOverallWebSocketMessage.eventMap)
  194. else ->
  195. Log.i(TAG, "Received unknown/ignored event target: $target")
  196. }
  197. } else {
  198. Log.w(TAG, "Received message with event target: null")
  199. }
  200. }
  201. }
  202. private fun processRoomMessageMessage(eventOverallWebSocketMessage: EventOverallWebSocketMessage) {
  203. val messageHashMap = eventOverallWebSocketMessage.eventMap?.get("message") as Map<*, *>?
  204. if (messageHashMap != null && messageHashMap.containsKey("data")) {
  205. val dataHashMap = messageHashMap["data"] as Map<*, *>?
  206. if (dataHashMap != null && dataHashMap.containsKey("chat")) {
  207. val chatMap = dataHashMap["chat"] as Map<*, *>?
  208. if (chatMap != null && chatMap.containsKey("refresh") && chatMap["refresh"] as Boolean) {
  209. val refreshChatHashMap = HashMap<String, String?>()
  210. refreshChatHashMap[BundleKeys.KEY_ROOM_TOKEN] = messageHashMap["roomid"] as String?
  211. refreshChatHashMap[BundleKeys.KEY_INTERNAL_USER_ID] = (conversationUser.id!!).toString()
  212. eventBus!!.post(WebSocketCommunicationEvent("refreshChat", refreshChatHashMap))
  213. }
  214. } else if (dataHashMap != null && dataHashMap.containsKey("recording")) {
  215. val recordingMap = dataHashMap["recording"] as Map<*, *>?
  216. if (recordingMap != null && recordingMap.containsKey("status")) {
  217. val status = (recordingMap["status"] as Long?)!!.toInt()
  218. Log.d(TAG, "status is $status")
  219. val recordingHashMap = HashMap<String, String>()
  220. recordingHashMap[BundleKeys.KEY_RECORDING_STATE] = status.toString()
  221. eventBus!!.post(WebSocketCommunicationEvent("recordingStatus", recordingHashMap))
  222. }
  223. }
  224. }
  225. }
  226. private fun processRoomJoinMessage(eventOverallWebSocketMessage: EventOverallWebSocketMessage) {
  227. val joinEventList = eventOverallWebSocketMessage.eventMap?.get("join") as List<HashMap<String, Any>>?
  228. var internalHashMap: HashMap<String, Any>
  229. var participant: Participant
  230. for (i in joinEventList!!.indices) {
  231. internalHashMap = joinEventList[i]
  232. val userMap = internalHashMap["user"] as HashMap<String, Any>?
  233. participant = Participant()
  234. val userId = internalHashMap["userid"] as String?
  235. if (userId != null) {
  236. participant.actorType = ActorType.USERS
  237. participant.actorId = userId
  238. } else {
  239. participant.actorType = ActorType.GUESTS
  240. // FIXME seems to be not given by the HPB: participant.setActorId();
  241. }
  242. if (userMap != null) {
  243. // There is no "user" attribute for guest participants.
  244. participant.displayName = userMap["displayname"] as String?
  245. }
  246. usersHashMap[internalHashMap["sessionid"] as String?] = participant
  247. }
  248. }
  249. private fun processRoomLeaveMessage(eventOverallWebSocketMessage: EventOverallWebSocketMessage) {
  250. val leaveEventList = eventOverallWebSocketMessage.eventMap?.get("leave") as List<String>?
  251. for (i in leaveEventList!!.indices) {
  252. usersHashMap.remove(leaveEventList[i])
  253. }
  254. }
  255. fun getUserMap(): HashMap<String?, Participant> {
  256. return usersHashMap
  257. }
  258. @Throws(IOException::class)
  259. private fun processJoinedRoomMessage(text: String) {
  260. val (_, roomWebSocketMessage) = LoganSquare.parse(text, JoinedRoomOverallWebSocketMessage::class.java)
  261. if (roomWebSocketMessage != null) {
  262. currentRoomToken = roomWebSocketMessage.roomId
  263. if (roomWebSocketMessage.roomPropertiesWebSocketMessage != null && !TextUtils.isEmpty(currentRoomToken)) {
  264. sendRoomJoinedEvent()
  265. }
  266. }
  267. }
  268. @Throws(IOException::class)
  269. private fun processErrorMessage(webSocket: WebSocket, text: String) {
  270. Log.e(TAG, "Received error: $text")
  271. val (_, message) = LoganSquare.parse(text, ErrorOverallWebSocketMessage::class.java)
  272. if (message != null) {
  273. if ("no_such_session" == message.code) {
  274. Log.d(TAG, "WebSocket " + webSocket.hashCode() + " resumeID " + resumeId + " expired")
  275. resumeId = ""
  276. currentRoomToken = ""
  277. currentNormalBackendSession = ""
  278. restartWebSocket()
  279. } else if ("hello_expected" == message.code) {
  280. restartWebSocket()
  281. }
  282. }
  283. }
  284. @Throws(IOException::class)
  285. private fun processHelloMessage(webSocket: WebSocket, text: String) {
  286. isConnected = true
  287. reconnecting = false
  288. val oldResumeId = resumeId
  289. val (_, helloResponseWebSocketMessage1) = LoganSquare.parse(
  290. text,
  291. HelloResponseOverallWebSocketMessage::class.java
  292. )
  293. if (helloResponseWebSocketMessage1 != null) {
  294. resumeId = helloResponseWebSocketMessage1.resumeId
  295. sessionId = helloResponseWebSocketMessage1.sessionId
  296. hasMCU = helloResponseWebSocketMessage1.serverHasMCUSupport()
  297. }
  298. for (i in messagesQueue.indices) {
  299. webSocket.send(messagesQueue[i])
  300. }
  301. messagesQueue = ArrayList()
  302. val helloHashMap = HashMap<String, String?>()
  303. if (!TextUtils.isEmpty(oldResumeId)) {
  304. helloHashMap["oldResumeId"] = oldResumeId
  305. } else {
  306. currentRoomToken = ""
  307. currentNormalBackendSession = ""
  308. }
  309. if (!TextUtils.isEmpty(currentRoomToken)) {
  310. helloHashMap[Globals.ROOM_TOKEN] = currentRoomToken
  311. }
  312. eventBus!!.post(WebSocketCommunicationEvent("hello", helloHashMap))
  313. }
  314. private fun sendRoomJoinedEvent() {
  315. val joinRoomHashMap = HashMap<String, String?>()
  316. joinRoomHashMap[Globals.ROOM_TOKEN] = currentRoomToken
  317. eventBus!!.post(WebSocketCommunicationEvent("roomJoined", joinRoomHashMap))
  318. }
  319. override fun onMessage(webSocket: WebSocket, bytes: ByteString) {
  320. Log.d(TAG, "Receiving bytes : " + bytes.hex())
  321. }
  322. override fun onClosing(webSocket: WebSocket, code: Int, reason: String) {
  323. Log.d(TAG, "onClosing : $code / $reason")
  324. }
  325. override fun onClosed(webSocket: WebSocket, code: Int, reason: String) {
  326. Log.d(TAG, "onClosed : $code / $reason")
  327. isConnected = false
  328. }
  329. override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {
  330. Log.e(TAG, "Error : WebSocket " + webSocket.hashCode(), t)
  331. closeWebSocket(webSocket)
  332. }
  333. fun hasMCU(): Boolean {
  334. return hasMCU
  335. }
  336. fun joinRoomWithRoomTokenAndSession(roomToken: String, normalBackendSession: String?) {
  337. Log.d(TAG, "joinRoomWithRoomTokenAndSession")
  338. Log.d(TAG, " roomToken: $roomToken")
  339. Log.d(TAG, " session: $normalBackendSession")
  340. try {
  341. val message = LoganSquare.serialize(
  342. webSocketConnectionHelper.getAssembledJoinOrLeaveRoomModel(roomToken, normalBackendSession)
  343. )
  344. if (roomToken == "") {
  345. Log.d(TAG, "sending 'leave room' via websocket")
  346. currentNormalBackendSession = ""
  347. sendMessage(message)
  348. } else if (roomToken == currentRoomToken && normalBackendSession == currentNormalBackendSession) {
  349. Log.d(TAG, "roomToken & session are unchanged. Joining locally without to send websocket message")
  350. sendRoomJoinedEvent()
  351. } else {
  352. Log.d(TAG, "Sending join room message via websocket")
  353. currentNormalBackendSession = normalBackendSession
  354. sendMessage(message)
  355. }
  356. } catch (e: IOException) {
  357. Log.e(TAG, "Failed to serialize signaling message", e)
  358. }
  359. }
  360. private fun sendCallMessage(ncSignalingMessage: NCSignalingMessage) {
  361. try {
  362. val message = LoganSquare.serialize(
  363. webSocketConnectionHelper.getAssembledCallMessageModel(ncSignalingMessage)
  364. )
  365. sendMessage(message)
  366. } catch (e: IOException) {
  367. Log.e(TAG, "Failed to serialize signaling message", e)
  368. }
  369. }
  370. private fun sendMessage(message: String) {
  371. if (!isConnected || reconnecting) {
  372. messagesQueue.add(message)
  373. if (!reconnecting) {
  374. restartWebSocket()
  375. }
  376. } else {
  377. if (!internalWebSocket!!.send(message)) {
  378. messagesQueue.add(message)
  379. restartWebSocket()
  380. }
  381. }
  382. }
  383. fun sendBye() {
  384. if (isConnected) {
  385. try {
  386. val byeWebSocketMessage = ByeWebSocketMessage()
  387. byeWebSocketMessage.type = "bye"
  388. byeWebSocketMessage.bye = HashMap()
  389. internalWebSocket!!.send(LoganSquare.serialize(byeWebSocketMessage))
  390. } catch (e: IOException) {
  391. Log.e(TAG, "Failed to serialize bye message")
  392. }
  393. }
  394. }
  395. fun getDisplayNameForSession(session: String?): String? {
  396. val participant = usersHashMap[session]
  397. if (participant != null) {
  398. if (participant.displayName != null) {
  399. return participant.displayName
  400. }
  401. }
  402. return ""
  403. }
  404. @Subscribe(threadMode = ThreadMode.BACKGROUND)
  405. fun onMessageEvent(networkEvent: NetworkEvent) {
  406. if (networkEvent.networkConnectionEvent == NetworkEvent.NetworkConnectionEvent.NETWORK_CONNECTED &&
  407. !isConnected
  408. ) {
  409. restartWebSocket()
  410. }
  411. }
  412. fun getSignalingMessageReceiver(): SignalingMessageReceiver {
  413. return signalingMessageReceiver
  414. }
  415. /**
  416. * Temporary implementation of SignalingMessageReceiver until signaling related code is extracted to a Signaling
  417. * class.
  418. *
  419. *
  420. * All listeners are called in the WebSocket reader thread. This thread should be the same as long as the WebSocket
  421. * stays connected, but it may change whenever it is connected again.
  422. */
  423. private class ExternalSignalingMessageReceiver : SignalingMessageReceiver() {
  424. fun process(eventMap: Map<String, Any?>?) {
  425. processEvent(eventMap)
  426. }
  427. fun process(message: CallWebSocketMessage?) {
  428. if (message?.ncSignalingMessage?.type == "startedTyping" ||
  429. message?.ncSignalingMessage?.type == "stoppedTyping"
  430. ) {
  431. processCallWebSocketMessage(message)
  432. } else {
  433. processSignalingMessage(message?.ncSignalingMessage)
  434. }
  435. }
  436. }
  437. inner class ExternalSignalingMessageSender : SignalingMessageSender {
  438. override fun send(ncSignalingMessage: NCSignalingMessage) {
  439. sendCallMessage(ncSignalingMessage)
  440. }
  441. }
  442. companion object {
  443. private const val TAG = "WebSocketInstance"
  444. private const val NORMAL_CLOSURE = 1000
  445. private const val ONE_SECOND: Long = 1000
  446. }
  447. }