SocketConnection.swift 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201
  1. //
  2. // SocketConnection.swift
  3. // Broadcast Extension
  4. //
  5. // Created by Alex-Dan Bumbu on 22/03/2021.
  6. // Copyright © 2021 Atlassian Inc. All rights reserved.
  7. //
  8. // From https://github.com/jitsi/jitsi-meet-sdk-samples (Apache 2.0 license)
  9. // SPDX-FileCopyrightText: 2021 Alex-Dan Bumbu, Atlassian Inc. All rights reserved.
  10. // SPDX-License-Identifier: Apache-2.0
  11. import Foundation
  12. class SocketConnection: NSObject {
  13. var didOpen: (() -> Void)?
  14. var didClose: ((Error?) -> Void)?
  15. var streamHasSpaceAvailable: (() -> Void)?
  16. private let filePath: String
  17. private var socketHandle: Int32 = -1
  18. private var address: sockaddr_un?
  19. private var inputStream: InputStream?
  20. private var outputStream: OutputStream?
  21. private var networkQueue: DispatchQueue?
  22. private var shouldKeepRunning = false
  23. init?(filePath path: String) {
  24. filePath = path
  25. socketHandle = Darwin.socket(AF_UNIX, SOCK_STREAM, 0)
  26. guard socketHandle != -1 else {
  27. print("failure: create socket")
  28. return nil
  29. }
  30. }
  31. func open() -> Bool {
  32. print("open socket connection")
  33. guard FileManager.default.fileExists(atPath: filePath) else {
  34. print("failure: socket file missing")
  35. return false
  36. }
  37. guard setupAddress() == true else {
  38. return false
  39. }
  40. guard connectSocket() == true else {
  41. return false
  42. }
  43. setupStreams()
  44. inputStream?.open()
  45. outputStream?.open()
  46. return true
  47. }
  48. func close() {
  49. unscheduleStreams()
  50. inputStream?.delegate = nil
  51. outputStream?.delegate = nil
  52. inputStream?.close()
  53. outputStream?.close()
  54. inputStream = nil
  55. outputStream = nil
  56. }
  57. func writeToStream(buffer: UnsafePointer<UInt8>, maxLength length: Int) -> Int {
  58. outputStream?.write(buffer, maxLength: length) ?? 0
  59. }
  60. }
  61. extension SocketConnection: StreamDelegate {
  62. func stream(_ aStream: Stream, handle eventCode: Stream.Event) {
  63. switch eventCode {
  64. case .openCompleted:
  65. print("client stream open completed")
  66. if aStream == outputStream {
  67. didOpen?()
  68. }
  69. case .hasBytesAvailable:
  70. if aStream == inputStream {
  71. var buffer: UInt8 = 0
  72. let numberOfBytesRead = inputStream?.read(&buffer, maxLength: 1)
  73. if numberOfBytesRead == 0 && aStream.streamStatus == .atEnd {
  74. print("server socket closed")
  75. close()
  76. notifyDidClose(error: nil)
  77. }
  78. }
  79. case .hasSpaceAvailable:
  80. if aStream == outputStream {
  81. streamHasSpaceAvailable?()
  82. }
  83. case .errorOccurred:
  84. print("client stream error occured: \(String(describing: aStream.streamError))")
  85. close()
  86. notifyDidClose(error: aStream.streamError)
  87. default:
  88. break
  89. }
  90. }
  91. }
  92. private extension SocketConnection {
  93. func setupAddress() -> Bool {
  94. var addr = sockaddr_un()
  95. guard filePath.count < MemoryLayout.size(ofValue: addr.sun_path) else {
  96. print("failure: fd path is too long")
  97. return false
  98. }
  99. _ = withUnsafeMutablePointer(to: &addr.sun_path.0) { ptr in
  100. filePath.withCString {
  101. strncpy(ptr, $0, filePath.count)
  102. }
  103. }
  104. address = addr
  105. return true
  106. }
  107. func connectSocket() -> Bool {
  108. guard var addr = address else {
  109. return false
  110. }
  111. let status = withUnsafePointer(to: &addr) { ptr in
  112. ptr.withMemoryRebound(to: sockaddr.self, capacity: 1) {
  113. Darwin.connect(socketHandle, $0, socklen_t(MemoryLayout<sockaddr_un>.size))
  114. }
  115. }
  116. guard status == noErr else {
  117. print("failure: \(status)")
  118. return false
  119. }
  120. return true
  121. }
  122. func setupStreams() {
  123. var readStream: Unmanaged<CFReadStream>?
  124. var writeStream: Unmanaged<CFWriteStream>?
  125. CFStreamCreatePairWithSocket(kCFAllocatorDefault, socketHandle, &readStream, &writeStream)
  126. inputStream = readStream?.takeRetainedValue()
  127. inputStream?.delegate = self
  128. inputStream?.setProperty(kCFBooleanTrue, forKey: Stream.PropertyKey(kCFStreamPropertyShouldCloseNativeSocket as String))
  129. outputStream = writeStream?.takeRetainedValue()
  130. outputStream?.delegate = self
  131. outputStream?.setProperty(kCFBooleanTrue, forKey: Stream.PropertyKey(kCFStreamPropertyShouldCloseNativeSocket as String))
  132. scheduleStreams()
  133. }
  134. func scheduleStreams() {
  135. shouldKeepRunning = true
  136. networkQueue = DispatchQueue.global(qos: .userInitiated)
  137. networkQueue?.async { [weak self] in
  138. self?.inputStream?.schedule(in: .current, forMode: .common)
  139. self?.outputStream?.schedule(in: .current, forMode: .common)
  140. RunLoop.current.run()
  141. var isRunning = false
  142. repeat {
  143. isRunning = self?.shouldKeepRunning ?? false && RunLoop.current.run(mode: .default, before: .distantFuture)
  144. } while (isRunning)
  145. }
  146. }
  147. func unscheduleStreams() {
  148. networkQueue?.sync { [weak self] in
  149. self?.inputStream?.remove(from: .current, forMode: .common)
  150. self?.outputStream?.remove(from: .current, forMode: .common)
  151. }
  152. shouldKeepRunning = false
  153. }
  154. func notifyDidClose(error: Error?) {
  155. if didClose != nil {
  156. didClose?(error)
  157. }
  158. }
  159. }