WebSocketInstance.kt 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485
  1. /*
  2. * Nextcloud Talk application
  3. *
  4. * @author Mario Danic
  5. * Copyright (C) 2017-2018 Mario Danic <mario@lovelyhq.com>
  6. *
  7. * This program is free software: you can redistribute it and/or modify
  8. * it under the terms of the GNU General Public License as published by
  9. * the Free Software Foundation, either version 3 of the License, or
  10. * at your option) any later version.
  11. *
  12. * This program is distributed in the hope that it will be useful,
  13. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  14. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  15. * GNU General Public License for more details.
  16. *
  17. * You should have received a copy of the GNU General Public License
  18. * along with this program. If not, see <http://www.gnu.org/licenses/>.
  19. */
  20. package com.nextcloud.talk.webrtc
  21. import android.content.Context
  22. import android.text.TextUtils
  23. import android.util.Log
  24. import autodagger.AutoInjector
  25. import com.bluelinelabs.logansquare.LoganSquare
  26. import com.nextcloud.talk.application.NextcloudTalkApplication
  27. import com.nextcloud.talk.application.NextcloudTalkApplication.Companion.sharedApplication
  28. import com.nextcloud.talk.data.user.model.User
  29. import com.nextcloud.talk.events.NetworkEvent
  30. import com.nextcloud.talk.events.WebSocketCommunicationEvent
  31. import com.nextcloud.talk.models.json.participants.Participant
  32. import com.nextcloud.talk.models.json.participants.Participant.ActorType
  33. import com.nextcloud.talk.models.json.signaling.NCSignalingMessage
  34. import com.nextcloud.talk.models.json.websocket.BaseWebSocketMessage
  35. import com.nextcloud.talk.models.json.websocket.ByeWebSocketMessage
  36. import com.nextcloud.talk.models.json.websocket.CallOverallWebSocketMessage
  37. import com.nextcloud.talk.models.json.websocket.CallWebSocketMessage
  38. import com.nextcloud.talk.models.json.websocket.ErrorOverallWebSocketMessage
  39. import com.nextcloud.talk.models.json.websocket.EventOverallWebSocketMessage
  40. import com.nextcloud.talk.models.json.websocket.HelloResponseOverallWebSocketMessage
  41. import com.nextcloud.talk.models.json.websocket.JoinedRoomOverallWebSocketMessage
  42. import com.nextcloud.talk.signaling.SignalingMessageReceiver
  43. import com.nextcloud.talk.signaling.SignalingMessageSender
  44. import com.nextcloud.talk.utils.bundle.BundleKeys
  45. import okhttp3.OkHttpClient
  46. import okhttp3.Request
  47. import okhttp3.Response
  48. import okhttp3.WebSocket
  49. import okhttp3.WebSocketListener
  50. import okio.ByteString
  51. import org.greenrobot.eventbus.EventBus
  52. import org.greenrobot.eventbus.Subscribe
  53. import org.greenrobot.eventbus.ThreadMode
  54. import java.io.IOException
  55. import javax.inject.Inject
  56. @AutoInjector(NextcloudTalkApplication::class)
  57. class WebSocketInstance internal constructor(
  58. conversationUser: User,
  59. connectionUrl: String,
  60. webSocketTicket: String
  61. ) : WebSocketListener() {
  62. @JvmField
  63. @Inject
  64. var okHttpClient: OkHttpClient? = null
  65. @JvmField
  66. @Inject
  67. var eventBus: EventBus? = null
  68. @JvmField
  69. @Inject
  70. var context: Context? = null
  71. private val conversationUser: User
  72. private val webSocketTicket: String
  73. private var resumeId: String? = null
  74. var sessionId: String? = null
  75. private set
  76. private var hasMCU = false
  77. var isConnected: Boolean
  78. private set
  79. private val webSocketConnectionHelper: WebSocketConnectionHelper
  80. private var internalWebSocket: WebSocket? = null
  81. private val connectionUrl: String
  82. private var currentRoomToken: String? = null
  83. private var reconnecting = false
  84. private val usersHashMap: HashMap<String?, Participant>
  85. private var messagesQueue: MutableList<String> = ArrayList()
  86. private val signalingMessageReceiver = ExternalSignalingMessageReceiver()
  87. val signalingMessageSender = ExternalSignalingMessageSender()
  88. init {
  89. sharedApplication!!.componentApplication.inject(this)
  90. this.connectionUrl = connectionUrl
  91. this.conversationUser = conversationUser
  92. this.webSocketTicket = webSocketTicket
  93. webSocketConnectionHelper = WebSocketConnectionHelper()
  94. usersHashMap = HashMap()
  95. isConnected = false
  96. eventBus!!.register(this)
  97. restartWebSocket()
  98. }
  99. private fun sendHello() {
  100. try {
  101. if (TextUtils.isEmpty(resumeId)) {
  102. internalWebSocket!!.send(
  103. LoganSquare.serialize(
  104. webSocketConnectionHelper
  105. .getAssembledHelloModel(conversationUser, webSocketTicket)
  106. )
  107. )
  108. } else {
  109. internalWebSocket!!.send(
  110. LoganSquare.serialize(
  111. webSocketConnectionHelper
  112. .getAssembledHelloModelForResume(resumeId)
  113. )
  114. )
  115. }
  116. } catch (e: IOException) {
  117. Log.e(TAG, "Failed to serialize hello model")
  118. }
  119. }
  120. override fun onOpen(webSocket: WebSocket, response: Response) {
  121. Log.d(TAG, "Open webSocket")
  122. internalWebSocket = webSocket
  123. sendHello()
  124. }
  125. private fun closeWebSocket(webSocket: WebSocket) {
  126. webSocket.close(NORMAL_CLOSURE, null)
  127. webSocket.cancel()
  128. if (webSocket === internalWebSocket) {
  129. isConnected = false
  130. messagesQueue = ArrayList()
  131. }
  132. restartWebSocket()
  133. }
  134. fun clearResumeId() {
  135. resumeId = ""
  136. }
  137. fun restartWebSocket() {
  138. reconnecting = true
  139. // TODO when improving logging, keep in mind this issue: https://github.com/nextcloud/talk-android/issues/1013
  140. Log.d(TAG, "restartWebSocket: $connectionUrl")
  141. val request = Request.Builder().url(connectionUrl).build()
  142. okHttpClient!!.newWebSocket(request, this)
  143. }
  144. override fun onMessage(webSocket: WebSocket, text: String) {
  145. if (webSocket === internalWebSocket) {
  146. Log.d(TAG, "Receiving : $webSocket $text")
  147. try {
  148. val (messageType) = LoganSquare.parse(text, BaseWebSocketMessage::class.java)
  149. if (messageType != null) {
  150. when (messageType) {
  151. "hello" -> processHelloMessage(webSocket, text)
  152. "error" -> processErrorMessage(webSocket, text)
  153. "room" -> processJoinedRoomMessage(text)
  154. "event" -> processEventMessage(text)
  155. "message" -> processMessage(text)
  156. "bye" -> {
  157. isConnected = false
  158. resumeId = ""
  159. }
  160. else -> {}
  161. }
  162. } else {
  163. Log.e(TAG, "Received message with type: null")
  164. }
  165. } catch (e: IOException) {
  166. Log.e(TAG, "Failed to recognize WebSocket message", e)
  167. }
  168. }
  169. }
  170. @Throws(IOException::class)
  171. private fun processMessage(text: String) {
  172. val (_, callWebSocketMessage) = LoganSquare.parse(text, CallOverallWebSocketMessage::class.java)
  173. if (callWebSocketMessage != null) {
  174. val ncSignalingMessage = callWebSocketMessage.ncSignalingMessage
  175. if (ncSignalingMessage != null &&
  176. TextUtils.isEmpty(ncSignalingMessage.from) &&
  177. callWebSocketMessage.senderWebSocketMessage != null
  178. ) {
  179. ncSignalingMessage.from = callWebSocketMessage.senderWebSocketMessage!!.sessionId
  180. }
  181. signalingMessageReceiver.process(callWebSocketMessage)
  182. }
  183. }
  184. @Throws(IOException::class)
  185. private fun processEventMessage(text: String) {
  186. val eventOverallWebSocketMessage = LoganSquare.parse(text, EventOverallWebSocketMessage::class.java)
  187. if (eventOverallWebSocketMessage.eventMap != null) {
  188. val target = eventOverallWebSocketMessage.eventMap!!["target"] as String?
  189. if (target != null) {
  190. when (target) {
  191. Globals.TARGET_ROOM -> {
  192. if ("message" == eventOverallWebSocketMessage.eventMap!!["type"]) {
  193. processRoomMessageMessage(eventOverallWebSocketMessage)
  194. } else if ("join" == eventOverallWebSocketMessage.eventMap!!["type"]) {
  195. processRoomJoinMessage(eventOverallWebSocketMessage)
  196. } else if ("leave" == eventOverallWebSocketMessage.eventMap!!["type"]) {
  197. processRoomLeaveMessage(eventOverallWebSocketMessage)
  198. }
  199. signalingMessageReceiver.process(eventOverallWebSocketMessage.eventMap)
  200. }
  201. Globals.TARGET_PARTICIPANTS ->
  202. signalingMessageReceiver.process(eventOverallWebSocketMessage.eventMap)
  203. else ->
  204. Log.i(TAG, "Received unknown/ignored event target: $target")
  205. }
  206. } else {
  207. Log.w(TAG, "Received message with event target: null")
  208. }
  209. }
  210. }
  211. private fun processRoomMessageMessage(eventOverallWebSocketMessage: EventOverallWebSocketMessage) {
  212. val messageHashMap = eventOverallWebSocketMessage.eventMap?.get("message") as Map<*, *>?
  213. if (messageHashMap != null && messageHashMap.containsKey("data")) {
  214. val dataHashMap = messageHashMap["data"] as Map<*, *>?
  215. if (dataHashMap != null && dataHashMap.containsKey("chat")) {
  216. val chatMap = dataHashMap["chat"] as Map<*, *>?
  217. if (chatMap != null && chatMap.containsKey("refresh") && chatMap["refresh"] as Boolean) {
  218. val refreshChatHashMap = HashMap<String, String?>()
  219. refreshChatHashMap[BundleKeys.KEY_ROOM_TOKEN] = messageHashMap["roomid"] as String?
  220. refreshChatHashMap[BundleKeys.KEY_INTERNAL_USER_ID] = (conversationUser.id!!).toString()
  221. eventBus!!.post(WebSocketCommunicationEvent("refreshChat", refreshChatHashMap))
  222. }
  223. } else if (dataHashMap != null && dataHashMap.containsKey("recording")) {
  224. val recordingMap = dataHashMap["recording"] as Map<*, *>?
  225. if (recordingMap != null && recordingMap.containsKey("status")) {
  226. val status = (recordingMap["status"] as Long?)!!.toInt()
  227. Log.d(TAG, "status is $status")
  228. val recordingHashMap = HashMap<String, String>()
  229. recordingHashMap[BundleKeys.KEY_RECORDING_STATE] = status.toString()
  230. eventBus!!.post(WebSocketCommunicationEvent("recordingStatus", recordingHashMap))
  231. }
  232. }
  233. }
  234. }
  235. private fun processRoomJoinMessage(eventOverallWebSocketMessage: EventOverallWebSocketMessage) {
  236. val joinEventList = eventOverallWebSocketMessage.eventMap?.get("join") as List<HashMap<String, Any>>?
  237. var internalHashMap: HashMap<String, Any>
  238. var participant: Participant
  239. for (i in joinEventList!!.indices) {
  240. internalHashMap = joinEventList[i]
  241. val userMap = internalHashMap["user"] as HashMap<String, Any>?
  242. participant = Participant()
  243. val userId = internalHashMap["userid"] as String?
  244. if (userId != null) {
  245. participant.actorType = ActorType.USERS
  246. participant.actorId = userId
  247. } else {
  248. participant.actorType = ActorType.GUESTS
  249. // FIXME seems to be not given by the HPB: participant.setActorId();
  250. }
  251. if (userMap != null) {
  252. // There is no "user" attribute for guest participants.
  253. participant.displayName = userMap["displayname"] as String?
  254. }
  255. usersHashMap[internalHashMap["sessionid"] as String?] = participant
  256. }
  257. }
  258. private fun processRoomLeaveMessage(eventOverallWebSocketMessage: EventOverallWebSocketMessage) {
  259. val leaveEventList = eventOverallWebSocketMessage.eventMap?.get("leave") as List<String>?
  260. for (i in leaveEventList!!.indices) {
  261. usersHashMap.remove(leaveEventList[i])
  262. }
  263. }
  264. fun getUserMap(): HashMap<String?, Participant> {
  265. return usersHashMap
  266. }
  267. @Throws(IOException::class)
  268. private fun processJoinedRoomMessage(text: String) {
  269. val (_, roomWebSocketMessage) = LoganSquare.parse(text, JoinedRoomOverallWebSocketMessage::class.java)
  270. if (roomWebSocketMessage != null) {
  271. currentRoomToken = roomWebSocketMessage.roomId
  272. if (
  273. roomWebSocketMessage.roomPropertiesWebSocketMessage != null &&
  274. !TextUtils.isEmpty(currentRoomToken)
  275. ) {
  276. sendRoomJoinedEvent()
  277. }
  278. }
  279. }
  280. @Throws(IOException::class)
  281. private fun processErrorMessage(webSocket: WebSocket, text: String) {
  282. Log.e(TAG, "Received error: $text")
  283. val (_, message) = LoganSquare.parse(text, ErrorOverallWebSocketMessage::class.java)
  284. if (message != null) {
  285. if ("no_such_session" == message.code) {
  286. Log.d(TAG, "WebSocket " + webSocket.hashCode() + " resumeID " + resumeId + " expired")
  287. resumeId = ""
  288. currentRoomToken = ""
  289. restartWebSocket()
  290. } else if ("hello_expected" == message.code) {
  291. restartWebSocket()
  292. }
  293. }
  294. }
  295. @Throws(IOException::class)
  296. private fun processHelloMessage(webSocket: WebSocket, text: String) {
  297. isConnected = true
  298. reconnecting = false
  299. val oldResumeId = resumeId
  300. val (_, helloResponseWebSocketMessage1) = LoganSquare.parse(
  301. text,
  302. HelloResponseOverallWebSocketMessage::class.java
  303. )
  304. if (helloResponseWebSocketMessage1 != null) {
  305. resumeId = helloResponseWebSocketMessage1.resumeId
  306. sessionId = helloResponseWebSocketMessage1.sessionId
  307. hasMCU = helloResponseWebSocketMessage1.serverHasMCUSupport()
  308. }
  309. for (i in messagesQueue.indices) {
  310. webSocket.send(messagesQueue[i])
  311. }
  312. messagesQueue = ArrayList()
  313. val helloHasHap = HashMap<String, String?>()
  314. if (!TextUtils.isEmpty(oldResumeId)) {
  315. helloHasHap["oldResumeId"] = oldResumeId
  316. } else {
  317. currentRoomToken = ""
  318. }
  319. if (!TextUtils.isEmpty(currentRoomToken)) {
  320. helloHasHap[Globals.ROOM_TOKEN] = currentRoomToken
  321. }
  322. eventBus!!.post(WebSocketCommunicationEvent("hello", helloHasHap))
  323. }
  324. private fun sendRoomJoinedEvent() {
  325. val joinRoomHashMap = HashMap<String, String?>()
  326. joinRoomHashMap[Globals.ROOM_TOKEN] = currentRoomToken
  327. eventBus!!.post(WebSocketCommunicationEvent("roomJoined", joinRoomHashMap))
  328. }
  329. override fun onMessage(webSocket: WebSocket, bytes: ByteString) {
  330. Log.d(TAG, "Receiving bytes : " + bytes.hex())
  331. }
  332. override fun onClosing(webSocket: WebSocket, code: Int, reason: String) {
  333. Log.d(TAG, "onClosing : $code / $reason")
  334. }
  335. override fun onClosed(webSocket: WebSocket, code: Int, reason: String) {
  336. Log.d(TAG, "onClosed : $code / $reason")
  337. isConnected = false
  338. }
  339. override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {
  340. Log.d(TAG, "Error : WebSocket " + webSocket.hashCode() + " onFailure: " + t.message)
  341. closeWebSocket(webSocket)
  342. }
  343. fun hasMCU(): Boolean {
  344. return hasMCU
  345. }
  346. fun joinRoomWithRoomTokenAndSession(roomToken: String, normalBackendSession: String?) {
  347. Log.d(TAG, "joinRoomWithRoomTokenAndSession")
  348. Log.d(TAG, " roomToken: $roomToken")
  349. Log.d(TAG, " session: $normalBackendSession")
  350. try {
  351. val message = LoganSquare.serialize(
  352. webSocketConnectionHelper.getAssembledJoinOrLeaveRoomModel(roomToken, normalBackendSession)
  353. )
  354. if (!isConnected || reconnecting) {
  355. messagesQueue.add(message)
  356. } else {
  357. if (roomToken == currentRoomToken) {
  358. sendRoomJoinedEvent()
  359. } else {
  360. internalWebSocket!!.send(message)
  361. }
  362. }
  363. } catch (e: IOException) {
  364. Log.e(TAG, e.message, e)
  365. }
  366. }
  367. private fun sendCallMessage(ncSignalingMessage: NCSignalingMessage) {
  368. try {
  369. val message = LoganSquare.serialize(
  370. webSocketConnectionHelper.getAssembledCallMessageModel(ncSignalingMessage)
  371. )
  372. if (!isConnected || reconnecting) {
  373. messagesQueue.add(message)
  374. } else {
  375. internalWebSocket!!.send(message)
  376. }
  377. } catch (e: IOException) {
  378. Log.e(TAG, "Failed to serialize signaling message", e)
  379. }
  380. }
  381. fun sendBye() {
  382. if (isConnected) {
  383. try {
  384. val byeWebSocketMessage = ByeWebSocketMessage()
  385. byeWebSocketMessage.type = "bye"
  386. byeWebSocketMessage.bye = HashMap()
  387. internalWebSocket!!.send(LoganSquare.serialize(byeWebSocketMessage))
  388. } catch (e: IOException) {
  389. Log.e(TAG, "Failed to serialize bye message")
  390. }
  391. }
  392. }
  393. fun getDisplayNameForSession(session: String?): String? {
  394. val participant = usersHashMap[session]
  395. if (participant != null) {
  396. if (participant.displayName != null) {
  397. return participant.displayName
  398. }
  399. }
  400. return ""
  401. }
  402. @Subscribe(threadMode = ThreadMode.BACKGROUND)
  403. fun onMessageEvent(networkEvent: NetworkEvent) {
  404. if (networkEvent.networkConnectionEvent == NetworkEvent.NetworkConnectionEvent.NETWORK_CONNECTED &&
  405. !isConnected
  406. ) {
  407. restartWebSocket()
  408. }
  409. }
  410. fun getSignalingMessageReceiver(): SignalingMessageReceiver {
  411. return signalingMessageReceiver
  412. }
  413. /**
  414. * Temporary implementation of SignalingMessageReceiver until signaling related code is extracted to a Signaling
  415. * class.
  416. *
  417. *
  418. * All listeners are called in the WebSocket reader thread. This thread should be the same as long as the WebSocket
  419. * stays connected, but it may change whenever it is connected again.
  420. */
  421. private class ExternalSignalingMessageReceiver : SignalingMessageReceiver() {
  422. fun process(eventMap: Map<String, Any?>?) {
  423. processEvent(eventMap)
  424. }
  425. fun process(message: CallWebSocketMessage?) {
  426. if (message?.ncSignalingMessage?.type == "startedTyping" ||
  427. message?.ncSignalingMessage?.type == "stoppedTyping"
  428. ) {
  429. processCallWebSocketMessage(message)
  430. } else {
  431. processSignalingMessage(message?.ncSignalingMessage)
  432. }
  433. }
  434. }
  435. inner class ExternalSignalingMessageSender : SignalingMessageSender {
  436. override fun send(ncSignalingMessage: NCSignalingMessage) {
  437. sendCallMessage(ncSignalingMessage)
  438. }
  439. }
  440. companion object {
  441. private const val TAG = "WebSocketInstance"
  442. private const val NORMAL_CLOSURE = 1000
  443. }
  444. }