filetransfer.py 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. ## filetransfer.py
  2. ##
  3. ## Copyright (C) 2004 Alexey "Snake" Nezhdanov
  4. ##
  5. ## This program is free software; you can redistribute it and/or modify
  6. ## it under the terms of the GNU General Public License as published by
  7. ## the Free Software Foundation; either version 2, or (at your option)
  8. ## any later version.
  9. ##
  10. ## This program is distributed in the hope that it will be useful,
  11. ## but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. ## GNU General Public License for more details.
  14. # $Id$
  15. """
  16. This module contains IBB class that is the simple implementation of XEP-0047.
  17. Note that this is just a transport for data. You have to negotiate data transfer before
  18. (via StreamInitiation most probably). Unfortunately SI is not implemented yet.
  19. """
  20. from .protocol import *
  21. from .dispatcher import PlugIn
  22. import base64
  23. class IBB(PlugIn):
  24. """ IBB used to transfer small-sized data chunk over estabilished xmpp connection.
  25. Data is split into small blocks (by default 3000 bytes each), encoded as base 64
  26. and sent to another entity that compiles these blocks back into the data chunk.
  27. This is very inefficiend but should work under any circumstances. Note that
  28. using IBB normally should be the last resort.
  29. """
  30. def __init__(self):
  31. """ Initialise internal variables. """
  32. PlugIn.__init__(self)
  33. self.DBG_LINE='ibb'
  34. self._exported_methods=[self.OpenStream]
  35. self._streams={}
  36. self._ampnode=Node(NS_AMP+' amp',payload=[Node('rule',{'condition':'deliver-at','value':'stored','action':'error'}),Node('rule',{'condition':'match-resource','value':'exact','action':'error'})])
  37. def plugin(self,owner):
  38. """ Register handlers for receiving incoming datastreams. Used internally. """
  39. self._owner.RegisterHandlerOnce('iq',self.StreamOpenReplyHandler) # Move to StreamOpen and specify stanza id
  40. self._owner.RegisterHandler('iq',self.IqHandler,ns=NS_IBB)
  41. self._owner.RegisterHandler('message',self.ReceiveHandler,ns=NS_IBB)
  42. def IqHandler(self,conn,stanza):
  43. """ Handles streams state change. Used internally. """
  44. typ=stanza.getType()
  45. self.DEBUG('IqHandler called typ->%s'%typ,'info')
  46. if typ=='set' and stanza.getTag('open',namespace=NS_IBB): self.StreamOpenHandler(conn,stanza)
  47. elif typ=='set' and stanza.getTag('close',namespace=NS_IBB): self.StreamCloseHandler(conn,stanza)
  48. elif typ=='result': self.StreamCommitHandler(conn,stanza)
  49. elif typ=='error': self.StreamOpenReplyHandler(conn,stanza)
  50. else: conn.send(Error(stanza,ERR_BAD_REQUEST))
  51. raise NodeProcessed
  52. def StreamOpenHandler(self,conn,stanza):
  53. """ Handles opening of new incoming stream. Used internally. """
  54. """
  55. <iq type='set'
  56. from='romeo@montague.net/orchard'
  57. to='juliet@capulet.com/balcony'
  58. id='inband_1'>
  59. <open sid='mySID'
  60. block-size='4096'
  61. xmlns='http://jabber.org/protocol/ibb'/>
  62. </iq>
  63. """
  64. err=None
  65. sid,blocksize=stanza.getTagAttr('open','sid'),stanza.getTagAttr('open','block-size')
  66. self.DEBUG('StreamOpenHandler called sid->%s blocksize->%s'%(sid,blocksize),'info')
  67. try: blocksize=int(blocksize)
  68. except: err=ERR_BAD_REQUEST
  69. if not sid or not blocksize: err=ERR_BAD_REQUEST
  70. elif sid in list(self._streams.keys()): err=ERR_UNEXPECTED_REQUEST
  71. if err: rep=Error(stanza,err)
  72. else:
  73. self.DEBUG("Opening stream: id %s, block-size %s"%(sid,blocksize),'info')
  74. rep=Protocol('iq',stanza.getFrom(),'result',stanza.getTo(),{'id':stanza.getID()})
  75. self._streams[sid]={'direction':'<'+str(stanza.getFrom()),'block-size':blocksize,'fp':open('/tmp/xmpp_file_'+sid,'w'),'seq':0,'syn_id':stanza.getID()}
  76. conn.send(rep)
  77. def OpenStream(self,sid,to,fp,blocksize=3000):
  78. """ Start new stream. You should provide stream id 'sid', the endpoind jid 'to',
  79. the file object containing info for send 'fp'. Also the desired blocksize can be specified.
  80. Take into account that recommended stanza size is 4k and IBB uses base64 encoding
  81. that increases size of data by 1/3."""
  82. if sid in list(self._streams.keys()): return
  83. if not JID(to).getResource(): return
  84. self._streams[sid]={'direction':'|>'+to,'block-size':blocksize,'fp':fp,'seq':0}
  85. self._owner.RegisterCycleHandler(self.SendHandler)
  86. syn=Protocol('iq',to,'set',payload=[Node(NS_IBB+' open',{'sid':sid,'block-size':blocksize})])
  87. self._owner.send(syn)
  88. self._streams[sid]['syn_id']=syn.getID()
  89. return self._streams[sid]
  90. def SendHandler(self,conn):
  91. """ Send next portion of data if it is time to do it. Used internally. """
  92. self.DEBUG('SendHandler called','info')
  93. for sid in list(self._streams.keys()):
  94. stream=self._streams[sid]
  95. if stream['direction'][:2]=='|>': cont=1
  96. elif stream['direction'][0]=='>':
  97. chunk=stream['fp'].read(stream['block-size'])
  98. if chunk:
  99. datanode=Node(NS_IBB+' data',{'sid':sid,'seq':stream['seq']},base64.encodestring(chunk))
  100. stream['seq']+=1
  101. if stream['seq']==65536: stream['seq']=0
  102. conn.send(Protocol('message',stream['direction'][1:],payload=[datanode,self._ampnode]))
  103. else:
  104. """ notify the other side about stream closing
  105. notify the local user about sucessfull send
  106. delete the local stream"""
  107. conn.send(Protocol('iq',stream['direction'][1:],'set',payload=[Node(NS_IBB+' close',{'sid':sid})]))
  108. conn.Event(self.DBG_LINE,'SUCCESSFULL SEND',stream)
  109. del self._streams[sid]
  110. self._owner.UnregisterCycleHandler(self.SendHandler)
  111. """
  112. <message from='romeo@montague.net/orchard' to='juliet@capulet.com/balcony' id='msg1'>
  113. <data xmlns='http://jabber.org/protocol/ibb' sid='mySID' seq='0'>
  114. qANQR1DBwU4DX7jmYZnncmUQB/9KuKBddzQH+tZ1ZywKK0yHKnq57kWq+RFtQdCJ
  115. WpdWpR0uQsuJe7+vh3NWn59/gTc5MDlX8dS9p0ovStmNcyLhxVgmqS8ZKhsblVeu
  116. IpQ0JgavABqibJolc3BKrVtVV1igKiX/N7Pi8RtY1K18toaMDhdEfhBRzO/XB0+P
  117. AQhYlRjNacGcslkhXqNjK5Va4tuOAPy2n1Q8UUrHbUd0g+xJ9Bm0G0LZXyvCWyKH
  118. kuNEHFQiLuCY6Iv0myq6iX6tjuHehZlFSh80b5BVV9tNLwNR5Eqz1klxMhoghJOA
  119. </data>
  120. <amp xmlns='http://jabber.org/protocol/amp'>
  121. <rule condition='deliver-at' value='stored' action='error'/>
  122. <rule condition='match-resource' value='exact' action='error'/>
  123. </amp>
  124. </message>
  125. """
  126. def ReceiveHandler(self,conn,stanza):
  127. """ Receive next portion of incoming datastream and store it write
  128. it to temporary file. Used internally.
  129. """
  130. sid,seq,data=stanza.getTagAttr('data','sid'),stanza.getTagAttr('data','seq'),stanza.getTagData('data')
  131. self.DEBUG('ReceiveHandler called sid->%s seq->%s'%(sid,seq),'info')
  132. try: seq=int(seq); data=base64.decodestring(data)
  133. except: seq=''; data=''
  134. err=None
  135. if not sid in list(self._streams.keys()): err=ERR_ITEM_NOT_FOUND
  136. else:
  137. stream=self._streams[sid]
  138. if not data: err=ERR_BAD_REQUEST
  139. elif seq!=stream['seq']: err=ERR_UNEXPECTED_REQUEST
  140. else:
  141. self.DEBUG('Successfull receive sid->%s %s+%s bytes'%(sid,stream['fp'].tell(),len(data)),'ok')
  142. stream['seq']+=1
  143. stream['fp'].write(data)
  144. if err:
  145. self.DEBUG('Error on receive: %s'%err,'error')
  146. conn.send(Error(Iq(to=stanza.getFrom(),frm=stanza.getTo(),payload=[Node(NS_IBB+' close')]),err,reply=0))
  147. def StreamCloseHandler(self,conn,stanza):
  148. """ Handle stream closure due to all data transmitted.
  149. Raise xmpppy event specifying successfull data receive. """
  150. sid=stanza.getTagAttr('close','sid')
  151. self.DEBUG('StreamCloseHandler called sid->%s'%sid,'info')
  152. if sid in list(self._streams.keys()):
  153. conn.send(stanza.buildReply('result'))
  154. conn.Event(self.DBG_LINE,'SUCCESSFULL RECEIVE',self._streams[sid])
  155. del self._streams[sid]
  156. else: conn.send(Error(stanza,ERR_ITEM_NOT_FOUND))
  157. def StreamBrokenHandler(self,conn,stanza):
  158. """ Handle stream closure due to all some error while receiving data.
  159. Raise xmpppy event specifying unsuccessfull data receive. """
  160. syn_id=stanza.getID()
  161. self.DEBUG('StreamBrokenHandler called syn_id->%s'%syn_id,'info')
  162. for sid in list(self._streams.keys()):
  163. stream=self._streams[sid]
  164. if stream['syn_id']==syn_id:
  165. if stream['direction'][0]=='<': conn.Event(self.DBG_LINE,'ERROR ON RECEIVE',stream)
  166. else: conn.Event(self.DBG_LINE,'ERROR ON SEND',stream)
  167. del self._streams[sid]
  168. def StreamOpenReplyHandler(self,conn,stanza):
  169. """ Handle remote side reply about is it agree or not to receive our datastream.
  170. Used internally. Raises xmpppy event specfiying if the data transfer
  171. is agreed upon."""
  172. syn_id=stanza.getID()
  173. self.DEBUG('StreamOpenReplyHandler called syn_id->%s'%syn_id,'info')
  174. for sid in list(self._streams.keys()):
  175. stream=self._streams[sid]
  176. if stream['syn_id']==syn_id:
  177. if stanza.getType()=='error':
  178. if stream['direction'][0]=='<': conn.Event(self.DBG_LINE,'ERROR ON RECEIVE',stream)
  179. else: conn.Event(self.DBG_LINE,'ERROR ON SEND',stream)
  180. del self._streams[sid]
  181. elif stanza.getType()=='result':
  182. if stream['direction'][0]=='|':
  183. stream['direction']=stream['direction'][1:]
  184. conn.Event(self.DBG_LINE,'STREAM COMMITTED',stream)
  185. else: conn.send(Error(stanza,ERR_UNEXPECTED_REQUEST))