stream.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. import socket
  2. import threading
  3. import logging
  4. from sleekxmpp.stanza import Iq
  5. from sleekxmpp.util import Queue
  6. from sleekxmpp.exceptions import XMPPError
  7. log = logging.getLogger(__name__)
  8. class IBBytestream(object):
  9. def __init__(self, xmpp, sid, block_size, jid, peer, window_size=1, use_messages=False):
  10. self.xmpp = xmpp
  11. self.sid = sid
  12. self.block_size = block_size
  13. self.window_size = window_size
  14. self.use_messages = use_messages
  15. if jid is None:
  16. jid = xmpp.boundjid
  17. self.self_jid = jid
  18. self.peer_jid = peer
  19. self.send_seq = -1
  20. self.recv_seq = -1
  21. self._send_seq_lock = threading.Lock()
  22. self._recv_seq_lock = threading.Lock()
  23. self.stream_started = threading.Event()
  24. self.stream_in_closed = threading.Event()
  25. self.stream_out_closed = threading.Event()
  26. self.recv_queue = Queue()
  27. self.send_window = threading.BoundedSemaphore(value=self.window_size)
  28. self.window_ids = set()
  29. self.window_empty = threading.Event()
  30. self.window_empty.set()
  31. def send(self, data):
  32. if not self.stream_started.is_set() or \
  33. self.stream_out_closed.is_set():
  34. raise socket.error
  35. data = data[0:self.block_size]
  36. self.send_window.acquire()
  37. with self._send_seq_lock:
  38. self.send_seq = (self.send_seq + 1) % 65535
  39. seq = self.send_seq
  40. if self.use_messages:
  41. msg = self.xmpp.Message()
  42. msg['to'] = self.peer_jid
  43. msg['from'] = self.self_jid
  44. msg['id'] = self.xmpp.new_id()
  45. msg['ibb_data']['sid'] = self.sid
  46. msg['ibb_data']['seq'] = seq
  47. msg['ibb_data']['data'] = data
  48. msg.send()
  49. self.send_window.release()
  50. else:
  51. iq = self.xmpp.Iq()
  52. iq['type'] = 'set'
  53. iq['to'] = self.peer_jid
  54. iq['from'] = self.self_jid
  55. iq['ibb_data']['sid'] = self.sid
  56. iq['ibb_data']['seq'] = seq
  57. iq['ibb_data']['data'] = data
  58. self.window_empty.clear()
  59. self.window_ids.add(iq['id'])
  60. iq.send(block=False, callback=self._recv_ack)
  61. return len(data)
  62. def sendall(self, data):
  63. sent_len = 0
  64. while sent_len < len(data):
  65. sent_len += self.send(data[sent_len:])
  66. def _recv_ack(self, iq):
  67. self.window_ids.remove(iq['id'])
  68. if not self.window_ids:
  69. self.window_empty.set()
  70. self.send_window.release()
  71. if iq['type'] == 'error':
  72. self.close()
  73. def _recv_data(self, stanza):
  74. with self._recv_seq_lock:
  75. new_seq = stanza['ibb_data']['seq']
  76. if new_seq != (self.recv_seq + 1) % 65535:
  77. self.close()
  78. raise XMPPError('unexpected-request')
  79. self.recv_seq = new_seq
  80. data = stanza['ibb_data']['data']
  81. if len(data) > self.block_size:
  82. self.close()
  83. raise XMPPError('not-acceptable')
  84. self.recv_queue.put(data)
  85. self.xmpp.event('ibb_stream_data', {'stream': self, 'data': data})
  86. if isinstance(stanza, Iq):
  87. stanza.reply()
  88. stanza.send()
  89. def recv(self, *args, **kwargs):
  90. return self.read(block=True)
  91. def read(self, block=True, timeout=None, **kwargs):
  92. if not self.stream_started.is_set() or \
  93. self.stream_in_closed.is_set():
  94. raise socket.error
  95. if timeout is not None:
  96. block = True
  97. try:
  98. return self.recv_queue.get(block, timeout)
  99. except:
  100. return None
  101. def close(self):
  102. iq = self.xmpp.Iq()
  103. iq['type'] = 'set'
  104. iq['to'] = self.peer_jid
  105. iq['from'] = self.self_jid
  106. iq['ibb_close']['sid'] = self.sid
  107. self.stream_out_closed.set()
  108. iq.send(block=False,
  109. callback=lambda x: self.stream_in_closed.set())
  110. self.xmpp.event('ibb_stream_end', self)
  111. def _closed(self, iq):
  112. self.stream_in_closed.set()
  113. self.stream_out_closed.set()
  114. iq.reply()
  115. iq.send()
  116. self.xmpp.event('ibb_stream_end', self)
  117. def makefile(self, *args, **kwargs):
  118. return self
  119. def connect(*args, **kwargs):
  120. return None
  121. def shutdown(self, *args, **kwargs):
  122. return None