transports.py 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777
  1. ## transports.py
  2. ##
  3. ## Copyright (C) 2003-2004 Alexey "Snake" Nezhdanov
  4. ##
  5. ## This program is free software; you can redistribute it and/or modify
  6. ## it under the terms of the GNU General Public License as published by
  7. ## the Free Software Foundation; either version 2, or (at your option)
  8. ## any later version.
  9. ##
  10. ## This program is distributed in the hope that it will be useful,
  11. ## but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. ## GNU General Public License for more details.
  14. # $Id$
  15. """
  16. This module contains the low-level implementations of xmpppy connect methods or
  17. (in other words) transports for xmpp-stanzas.
  18. Currently here is three transports:
  19. direct TCP connect - TCPsocket class
  20. proxied TCP connect - HTTPPROXYsocket class (CONNECT proxies)
  21. TLS connection - TLS class. Can be used for SSL connections also.
  22. Transports are stackable so you - f.e. TLS use HTPPROXYsocket or TCPsocket as more low-level transport.
  23. Also exception 'error' is defined to allow capture of this module specific exceptions.
  24. """
  25. import socket,ssl,select,base64,sys
  26. from . import dispatcher
  27. from .simplexml import ustr
  28. from .client import PlugIn
  29. from .protocol import *
  30. try:
  31. from httplib import HTTPConnection, HTTPSConnection, _CS_IDLE, BadStatusLine
  32. except ImportError:
  33. from http.client import HTTPConnection, HTTPSConnection, _CS_IDLE, BadStatusLine
  34. from errno import ECONNREFUSED
  35. import random
  36. import gzip
  37. from io import StringIO
  38. from six import ensure_binary
  39. try:
  40. from urllib2 import urlparse
  41. urlparse = urlparse.urlparse
  42. except ImportError:
  43. from urllib import parse
  44. urlparse = parse.urlparse
  45. if not hasattr(sys, 'exc_clear'):
  46. def exc_clear(): pass
  47. setattr(sys, 'exc_clear', exc_clear)
  48. # determine which DNS resolution library is available
  49. HAVE_DNSPYTHON = False
  50. HAVE_PYDNS = False
  51. try:
  52. import dns.resolver # http://dnspython.org/
  53. HAVE_DNSPYTHON = True
  54. except ImportError:
  55. try:
  56. import DNS # http://pydns.sf.net/
  57. HAVE_PYDNS = True
  58. except ImportError:
  59. pass
  60. DATA_RECEIVED='DATA RECEIVED'
  61. DATA_SENT='DATA SENT'
  62. class error:
  63. """An exception to be raised in case of low-level errors in methods of 'transports' module."""
  64. def __init__(self,comment):
  65. """Cache the descriptive string"""
  66. self._comment=comment
  67. def __str__(self):
  68. """Serialise exception into pre-cached descriptive string."""
  69. return self._comment
  70. BUFLEN=1024
  71. class TCPsocket(PlugIn):
  72. """ This class defines direct TCP connection method. """
  73. def __init__(self, server=None, use_srv=True):
  74. """ Cache connection point 'server'. 'server' is the tuple of (host, port)
  75. absolutely the same as standard tcp socket uses. However library will lookup for
  76. ('_xmpp-client._tcp.' + host) SRV record in DNS and connect to the found (if it is)
  77. server instead
  78. """
  79. PlugIn.__init__(self)
  80. self.DBG_LINE='socket'
  81. self._exported_methods=[self.send,self.disconnect]
  82. self._server, self.use_srv = server, use_srv
  83. def srv_lookup(self, server):
  84. " SRV resolver. Takes server=(host, port) as argument. Returns new (host, port) pair "
  85. if HAVE_DNSPYTHON or HAVE_PYDNS:
  86. host, port = server
  87. possible_queries = ['_xmpp-client._tcp.' + host]
  88. for query in possible_queries:
  89. try:
  90. if HAVE_DNSPYTHON:
  91. answers = [x for x in dns.resolver.query(query, 'SRV')]
  92. # Sort by priority, according to RFC 2782.
  93. answers.sort(key=lambda a: a.priority)
  94. if answers:
  95. host = str(answers[0].target)
  96. port = int(answers[0].port)
  97. break
  98. elif HAVE_PYDNS:
  99. # ensure we haven't cached an old configuration
  100. DNS.DiscoverNameServers()
  101. response = DNS.Request().req(query, qtype='SRV')
  102. # Sort by priority, according to RFC 2782.
  103. answers = sorted(response.answers, key=lambda a: a['data'][0])
  104. if len(answers) > 0:
  105. # ignore the priority and weight for now
  106. _, _, port, host = answers[0]['data']
  107. del _
  108. port = int(port)
  109. break
  110. except:
  111. self.DEBUG('An error occurred while looking up %s' % query, 'warn')
  112. server = (host, port)
  113. else:
  114. self.DEBUG("Could not load one of the supported DNS libraries (dnspython or pydns). SRV records will not be queried and you may need to set custom hostname/port for some servers to be accessible.\n",'warn')
  115. # end of SRV resolver
  116. return server
  117. def plugin(self, owner):
  118. """ Fire up connection. Return non-empty string on success.
  119. Also registers self.disconnected method in the owner's dispatcher.
  120. Called internally. """
  121. if not self._server: self._server=(self._owner.Server,5222)
  122. if self.use_srv: server=self.srv_lookup(self._server)
  123. else: server=self._server
  124. if not self.connect(server): return
  125. self._owner.Connection=self
  126. self._owner.RegisterDisconnectHandler(self.disconnected)
  127. return 'ok'
  128. def getHost(self):
  129. """ Return the 'host' value that is connection is [will be] made to."""
  130. return self._server[0]
  131. def getPort(self):
  132. """ Return the 'port' value that is connection is [will be] made to."""
  133. return self._server[1]
  134. def connect(self,server=None):
  135. """ Try to connect to the given host/port. Does not lookup for SRV record.
  136. Returns non-empty string on success. """
  137. if not server: server=self._server
  138. try:
  139. for res in socket.getaddrinfo(server[0], int(server[1]), 0, socket.SOCK_STREAM):
  140. af, socktype, proto, canonname, sa = res
  141. try:
  142. self._sock = socket.socket(af, socktype, proto)
  143. self._sock.connect(sa)
  144. self._send=self._sock.sendall
  145. self._recv=self._sock.recv
  146. self.DEBUG("Successfully connected to remote host %s"%repr(server),'start')
  147. return 'ok'
  148. except socket.error as xxx_todo_changeme:
  149. errno = xxx_todo_changeme.args[0]
  150. strerror = xxx_todo_changeme.args[1]
  151. if self._sock is not None: self._sock.close()
  152. self.DEBUG("Failed to connect to remote host %s: %s (%s)"%(repr(server), strerror, errno),'error')
  153. except socket.gaierror as xxx_todo_changeme1:
  154. errno = xxx_todo_changeme1.args[0]
  155. strerror = xxx_todo_changeme1.args[1]
  156. self.DEBUG("Failed to lookup remote host %s: %s (%s)"%(repr(server), strerror, errno),'error')
  157. def plugout(self):
  158. """ Disconnect from the remote server and unregister self.disconnected method from
  159. the owner's dispatcher. """
  160. self._sock.close()
  161. if 'Connection' in self._owner.__dict__:
  162. del self._owner.Connection
  163. self._owner.UnregisterDisconnectHandler(self.disconnected)
  164. def receive(self):
  165. """ Reads all pending incoming data.
  166. In case of disconnection calls owner's disconnected() method and then raises IOError exception."""
  167. try: received = self._recv(BUFLEN)
  168. except socket.error as e:
  169. self._seen_data=0
  170. if self.check_pending(e, 'receiving', 'asking for a retry'):
  171. return ''
  172. self.DEBUG('Socket error while receiving data','error')
  173. sys.exc_clear()
  174. self._owner.disconnected()
  175. raise IOError("Disconnected from server")
  176. except: received = ''
  177. while self.pending_data(0):
  178. try: add = self._recv(BUFLEN)
  179. except socket.error as e:
  180. self._seen_data=0
  181. if self.check_pending(e, 'receiving', 'ignoring'):
  182. break
  183. self.DEBUG('Socket error while receiving data','error')
  184. sys.exc_clear()
  185. self._owner.disconnected()
  186. raise IOError("Disconnected from server")
  187. except: add=''
  188. received +=add
  189. if not add: break
  190. if len(received): # length of 0 means disconnect
  191. self._seen_data=1
  192. self.DEBUG(received,'got')
  193. if hasattr(self._owner, 'Dispatcher'):
  194. self._owner.Dispatcher.Event('', DATA_RECEIVED, received)
  195. else:
  196. self.DEBUG('Socket error while receiving data','error')
  197. self._owner.disconnected()
  198. raise IOError("Disconnected from server")
  199. return received
  200. def send(self,raw_data,retry_timeout=1):
  201. """ Writes raw outgoing data. Blocks until done.
  202. If supplied data is unicode string, encodes it to utf-8 before send."""
  203. #print('type:', type(raw_data))
  204. if type(raw_data)==type(''): raw_data = raw_data
  205. elif type(raw_data)!=type(''): raw_data = ustr(raw_data)
  206. raw_data=ensure_binary(raw_data,'utf-8')
  207. try:
  208. sent = 0
  209. while not sent:
  210. try:
  211. self._send(raw_data)
  212. sent = 1
  213. except socket.error as e:
  214. if self.check_pending(e, 'sending', 'waiting to retry'):
  215. continue
  216. raise
  217. # Avoid printing messages that are empty keepalive packets.
  218. if raw_data.strip():
  219. self.DEBUG(raw_data,'sent')
  220. if hasattr(self._owner, 'Dispatcher'): # HTTPPROXYsocket will send data before we have a Dispatcher
  221. self._owner.Dispatcher.Event('', DATA_SENT, raw_data)
  222. except:
  223. self.DEBUG("Socket error while sending data",'error')
  224. self._owner.disconnected()
  225. def pending_data(self,timeout=0):
  226. """ Returns true if there is a data ready to be read. """
  227. return select.select([self._sock],[],[],timeout)[0]
  228. def check_pending(self, ex, direction, action):
  229. if hasattr(socket, 'sslerror'):
  230. if ex[0] == socket.SSL_ERROR_WANT_READ:
  231. sys.exc_clear()
  232. self.DEBUG("SSL_WANT_READ while {direction} data, {action}".format(**locals()), 'warn')
  233. return True
  234. if ex[0] == socket.SSL_ERROR_WANT_WRITE:
  235. sys.exc_clear()
  236. self.DEBUG("SSL_WANT_WRITE while {direction} data, {action}".format(**locals()), 'warn')
  237. return True
  238. else:
  239. if isinstance(ex, ssl.SSLWantReadError):
  240. sys.exc_clear()
  241. self.DEBUG("SSL_WANT_READ while {direction} data, {action}".format(**locals()), 'warn')
  242. return True
  243. if isinstance(ex, ssl.SSLWantWriteError):
  244. sys.exc_clear()
  245. self.DEBUG("SSL_WANT_WRITE while {direction} data, {action}".format(**locals()), 'warn')
  246. return True
  247. def disconnect(self):
  248. """ Closes the socket. """
  249. self.DEBUG("Closing socket",'stop')
  250. self._sock.close()
  251. def disconnected(self):
  252. """ Called when a Network Error or disconnection occurs.
  253. Designed to be overidden. """
  254. self.DEBUG("Socket operation failed",'error')
  255. DBG_CONNECT_PROXY='CONNECTproxy'
  256. class HTTPPROXYsocket(TCPsocket):
  257. """ HTTP (CONNECT) proxy connection class. Uses TCPsocket as the base class
  258. redefines only connect method. Allows to use HTTP proxies like squid with
  259. (optionally) simple authentication (using login and password). """
  260. def __init__(self,proxy,server,use_srv=True):
  261. """ Caches proxy and target addresses.
  262. 'proxy' argument is a dictionary with mandatory keys 'host' and 'port' (proxy address)
  263. and optional keys 'user' and 'password' to use for authentication.
  264. 'server' argument is a tuple of host and port - just like TCPsocket uses. """
  265. TCPsocket.__init__(self,server,use_srv)
  266. self.DBG_LINE=DBG_CONNECT_PROXY
  267. self._proxy=proxy
  268. def plugin(self, owner):
  269. """ Starts connection. Used interally. Returns non-empty string on success."""
  270. owner.debug_flags.append(DBG_CONNECT_PROXY)
  271. return TCPsocket.plugin(self,owner)
  272. def connect(self,server=None):
  273. """ Starts connection. Connects to proxy, supplies login and password to it
  274. (if were specified while creating instance). Instructs proxy to make
  275. connection to the target server. Returns non-empty sting on success. """
  276. if not TCPsocket.connect(self,(self._proxy['host'],self._proxy['port'])): return
  277. self.DEBUG("Proxy server contacted, performing authentification",'start')
  278. if not server: server=self._server
  279. connector = ['CONNECT %s:%s HTTP/1.0'%server,
  280. 'Proxy-Connection: Keep-Alive',
  281. 'Pragma: no-cache',
  282. 'Host: %s:%s'%server,
  283. 'User-Agent: HTTPPROXYsocket/v0.1']
  284. if 'user' in self._proxy and 'password' in self._proxy:
  285. credentials = '%s:%s'%(self._proxy['user'],self._proxy['password'])
  286. credentials = base64.encodestring(credentials).strip()
  287. connector.append('Proxy-Authorization: Basic '+credentials)
  288. connector.append('\r\n')
  289. self.send('\r\n'.join(connector))
  290. try: reply = self.receive().replace('\r','')
  291. except IOError:
  292. self.DEBUG('Proxy suddenly disconnected','error')
  293. self._owner.disconnected()
  294. return
  295. try: proto,code,desc=reply.split('\n')[0].split(' ',2)
  296. except: raise error('Invalid proxy reply')
  297. if code!='200':
  298. self.DEBUG('Invalid proxy reply: %s %s %s'%(proto,code,desc),'error')
  299. self._owner.disconnected()
  300. return
  301. while reply.find('\n\n') == -1:
  302. try: reply += self.receive().replace('\r','')
  303. except IOError:
  304. self.DEBUG('Proxy suddenly disconnected','error')
  305. self._owner.disconnected()
  306. return
  307. self.DEBUG("Authentification successfull. XMPP server contacted.",'ok')
  308. return 'ok'
  309. def DEBUG(self,text,severity):
  310. """Overwrites DEBUG tag to allow debug output be presented as "CONNECTproxy"."""
  311. return self._owner.DEBUG(DBG_CONNECT_PROXY,text,severity)
  312. class TLS(PlugIn):
  313. """ TLS connection used to encrypts already estabilished tcp connection."""
  314. def PlugIn(self,owner,now=0):
  315. """ If the 'now' argument is true then starts using encryption immidiatedly.
  316. If 'now' in false then starts encryption as soon as TLS feature is
  317. declared by the server (if it were already declared - it is ok).
  318. """
  319. if 'TLS' in owner.__dict__: return # Already enabled.
  320. PlugIn.PlugIn(self,owner)
  321. DBG_LINE='TLS'
  322. if now: return self._startSSL()
  323. if self._owner.Dispatcher.Stream.features:
  324. try: self.FeaturesHandler(self._owner.Dispatcher,self._owner.Dispatcher.Stream.features)
  325. except NodeProcessed: pass
  326. else: self._owner.RegisterHandlerOnce('features',self.FeaturesHandler,xmlns=NS_STREAMS)
  327. self.starttls=None
  328. def plugout(self,now=0):
  329. """ Unregisters TLS handler's from owner's dispatcher. Take note that encription
  330. can not be stopped once started. You can only break the connection and start over."""
  331. self._owner.UnregisterHandler('features',self.FeaturesHandler,xmlns=NS_STREAMS)
  332. self._owner.UnregisterHandler('proceed',self.StartTLSHandler,xmlns=NS_TLS)
  333. self._owner.UnregisterHandler('failure',self.StartTLSHandler,xmlns=NS_TLS)
  334. def FeaturesHandler(self, conn, feats):
  335. """ Used to analyse server <features/> tag for TLS support.
  336. If TLS is supported starts the encryption negotiation. Used internally"""
  337. if not feats.getTag('starttls',namespace=NS_TLS):
  338. self.DEBUG("TLS unsupported by remote server.",'warn')
  339. return
  340. self.DEBUG("TLS supported by remote server. Requesting TLS start.",'ok')
  341. self._owner.RegisterHandlerOnce('proceed',self.StartTLSHandler,xmlns=NS_TLS)
  342. self._owner.RegisterHandlerOnce('failure',self.StartTLSHandler,xmlns=NS_TLS)
  343. self._owner.Connection.send('<starttls xmlns="%s"/>'%NS_TLS)
  344. raise NodeProcessed
  345. def pending_data(self,timeout=0):
  346. """ Returns true if there possible is a data ready to be read. """
  347. return self._tcpsock._seen_data or select.select([self._tcpsock._sslObj],[],[],timeout)[0]
  348. def _startSSL(self):
  349. """ Immidiatedly switch socket to TLS mode. Used internally."""
  350. """ Here we should switch pending_data to hint mode."""
  351. tcpsock=self._owner.Connection
  352. tcpsock._sslObj = ssl.wrap_socket(tcpsock._sock, None, None)
  353. tcpsock._sslIssuer = tcpsock._sslObj.getpeercert().get('issuer')
  354. tcpsock._sslServer = tcpsock._sslObj.getpeercert().get('server')
  355. tcpsock._recv = tcpsock._sslObj.read
  356. tcpsock._send = tcpsock._sslObj.write
  357. tcpsock._seen_data=1
  358. self._tcpsock=tcpsock
  359. tcpsock.pending_data=self.pending_data
  360. tcpsock._sslObj.setblocking(False)
  361. self.starttls='success'
  362. def StartTLSHandler(self, conn, starttls):
  363. """ Handle server reply if TLS is allowed to process. Behaves accordingly.
  364. Used internally."""
  365. if starttls.getNamespace()!=NS_TLS: return
  366. self.starttls=starttls.getName()
  367. if self.starttls=='failure':
  368. self.DEBUG("Got starttls response: "+self.starttls,'error')
  369. return
  370. self.DEBUG("Got starttls proceed response. Switching to TLS/SSL...",'ok')
  371. self._startSSL()
  372. self._owner.Dispatcher.PlugOut()
  373. dispatcher.Dispatcher().PlugIn(self._owner)
  374. POST='POST'
  375. OK = 200
  376. BAD_REQUEST = 400
  377. FORBIDDEN = 403
  378. NOT_FOUND = 404
  379. class Bosh(PlugIn):
  380. connection_cls = {
  381. 'http': HTTPConnection,
  382. 'https': HTTPSConnection,
  383. }
  384. default_headers = {
  385. 'Content-Type': 'text/xml; charset=utf-8',
  386. 'Connection': 'Keep-Alive',
  387. }
  388. def __init__(self, endpoint, server=None, port=None, use_srv=True, wait=80,
  389. hold=4, requests=5, headers=None, PIPELINE=True, GZIP=True):
  390. PlugIn.__init__(self)
  391. self.DBG_LINE = 'bosh'
  392. self._exported_methods = [
  393. self.send, self.receive, self.disconnect,
  394. ]
  395. url = urlparse(endpoint)
  396. self._http_host = url.hostname
  397. self._http_path = url.path
  398. if url.port:
  399. self._http_port = url.port
  400. elif url.scheme == 'https':
  401. self._http_port = 443
  402. else:
  403. self._http_port = 80
  404. self._http_proto = url.scheme
  405. self._server = server
  406. self._port = port
  407. self.use_srv = use_srv
  408. self.Sid = None
  409. self._rid = 0
  410. self.wait = 80
  411. self.hold = hold
  412. self.requests = requests
  413. self._pipeline = None
  414. self.PIPELINE = PIPELINE
  415. if self.PIPELINE:
  416. self._respobjs = []
  417. else:
  418. self._respobjs = {}
  419. self.headers = headers or self.default_headers
  420. self.GZIP = GZIP
  421. def srv_lookup(self, server):
  422. # XXX Lookup TXT records to determine BOSH endpoint:
  423. # _xmppconnect IN TXT "_xmpp-client-xbosh=https://bosh.jabber.org:5280/bind"
  424. pass
  425. def plugin(self, owner):
  426. # XXX Provide resonable defaults if non were given, lookup service
  427. # records from DNS TXT records (see srv_lookup)
  428. if not self.connect(self._http_host, self._http_port):
  429. return
  430. self._owner.Connection=self
  431. self._owner.RegisterDisconnectHandler(self.disconnect)
  432. return 'ok'
  433. def connect(self, server=None, port=None, timeout=3, conopts={}):
  434. conn = self._connect(server, port, timeout, conopts)
  435. if conn:
  436. if self.PIPELINE:
  437. self._pipeline == conn
  438. else:
  439. conn.close()
  440. return 'ok'
  441. def _connect(self, server=None, port=None, timeout=3, conopts={}):
  442. endat = time.time() + timeout
  443. while True:
  444. cls = self.connection_cls[self._http_proto]
  445. conn = cls(server, port, **conopts)
  446. try:
  447. conn.connect()
  448. except socket.error as e:
  449. if e.errno == ECONNREFUSED: # Connection refused
  450. if time.time() > endat:
  451. msg = "Failed to connect to remote host %s: %s (%s)" % (
  452. 'server', e.strerror, e.errno,
  453. )
  454. self.DEBUG(msg, 'error')
  455. raise
  456. else:
  457. conn.close()
  458. raise
  459. time.sleep(.5)
  460. else:
  461. break
  462. return conn
  463. def Connection(self, reset=False):
  464. if self.PIPELINE:
  465. if not self._pipeline or not self._pipeline.sock:
  466. self._pipeline = self._connect(
  467. self._http_host, self._http_port
  468. )
  469. return self._pipeline
  470. conn = self._connect(self._http_host, self._http_port)
  471. conn.connect()
  472. return conn
  473. def refreshpipeline(I):
  474. if self._pipeline and self._pipeline.sock:
  475. self._pipeline.sock.shutdown()
  476. self._pipeline.sock.close()
  477. self._pipeline = None
  478. self.Connect()
  479. def plugout(self):
  480. for soc in self._respobjs:
  481. soc.close()
  482. if 'Connection' in self._owner.__dict__:
  483. del(self._owner.Connection)
  484. self._owner.UnregisterDisconnectHandler(self.disconnected)
  485. def receive(self):
  486. resp = ''
  487. if self.PIPELINE:
  488. res, data = self._respobjs.pop(0)
  489. else:
  490. res, data = self._respobjs.pop(sock)
  491. try:
  492. res.begin()
  493. except BadStatusLine:
  494. resp = sock.recv(1024)
  495. if len(resp) == 0:
  496. # The TCP Connection has been dropped, Resend the
  497. # request.
  498. self.refreshpipeline()
  499. node = Node(node=data)
  500. self.Rid = node.getAttr('rid')
  501. self.send(data)
  502. return resp
  503. else:
  504. # The server sent some data but it was a legit bad
  505. # status line.
  506. raise
  507. if res.status == OK:
  508. # Response to valid client request.
  509. headers = dict(res.getheaders())
  510. if headers.get('content-encoding', None) == 'gzip':
  511. a = StringIO()
  512. a.write(res.read())
  513. a.seek(0)
  514. gz = gzip.GzipFile(fileobj=a)
  515. data = gz.read()
  516. else:
  517. data = res.read()
  518. self.DEBUG(data, 'got')
  519. elif res.status == BAD_REQUEST:
  520. # Inform client that the format of an HTTP header or binding
  521. # element is unacceptable.
  522. self.DEBUG("The server did not undertand the request")
  523. raise Exception("Disconnected from server", 'error')
  524. elif res.status == FORBIDDEN:
  525. # Inform the client that it bas borken the session rules
  526. # (polling too frequently, requesting too frequently, too
  527. # many simultanious requests.
  528. self.DEBUG("Forbidden due to policy-violation", 'error')
  529. raise Exception("Disconnected from server")
  530. elif res.status == NOTFOUND:
  531. # Inform the client that (1) 'sid' is not valide, (2) 'stream' is
  532. # not valid, (3) 'rid' is larger than the upper limit of the
  533. # expected window, (4) connection manager is unable to resend
  534. # respons (5) 'key' sequence if invalid.
  535. self.DEBUG("Invalid/Corrupt Stream", 'error')
  536. raise Exception("Disconnected from server")
  537. else:
  538. msg = "Recieved status not defined in XEP-1204: %s" % res.status
  539. self.DEBUG(msg, 'error')
  540. raise Exception("Disconnected from server")
  541. node = Node(node=data)
  542. if node.getName() != 'body':
  543. self.DEBUG("The server sent an invalid BOSH payload", 'error')
  544. raise IOError("Disconnected from server")
  545. if node.getAttr('type') == 'terminate':
  546. msg = "Connection manager terminated stream: %s" % (
  547. node.getAttr('condition')
  548. )
  549. self.DEBUG(msg, 'info')
  550. raise IOError("Disconnected from server")
  551. resp = self.bosh_to_xmlstream(node)
  552. if resp:
  553. self._owner.Dispatcher.Event('', DATA_RECEIVED, resp)
  554. else:
  555. self.send(data)
  556. return resp
  557. def bosh_to_xmlstream(self, node):
  558. if not self.Sid or self.restart:
  559. # Expect a stream features elemnt that needs to be opened by a
  560. # stream element.
  561. if self.restart:
  562. self.restart = False
  563. else:
  564. self.Sid = node.getAttr('sid')
  565. self.AuthId = node.getAttr('authid')
  566. self.wait = int(node.getAttr('wait') or self.wait)
  567. self.hold = int(node.getAttr('hold') or self.hold)
  568. self.requests = int(node.getAttr('requests') or self.requests)
  569. stream=Node('stream:stream', payload=node.getChildren())
  570. stream.setNamespace(self._owner.Namespace)
  571. stream.setAttr('version','1.0')
  572. stream.setAttr('xmlns:stream', NS_STREAMS)
  573. stream.setAttr('from', self._owner.Server)
  574. data = str(stream)[:-len('</stream:stream>')]
  575. resp = "<?xml version='1.0'?>%s"%str(data)
  576. elif node.getChildren():
  577. resp = ''.join(str(i) for i in node.getChildren())
  578. else:
  579. resp = ''
  580. return resp
  581. def xmlstream_to_bosh(self, stream):
  582. if stream.startswith("<?xml version='1.0'?><stream"):
  583. # The begining of an xml stream. This is expected to
  584. # happen two times through out the lifetime of the bosh
  585. # session. When we first open the session and once
  586. # after authentication.
  587. # Sanitize stream tag so that it is suitable for parsing.
  588. stream = stream.split('>',1)[1]
  589. stream = '%s/>'%str(stream)[:-1]
  590. stream = Node(node=stream)
  591. # XXX This hasn't been tested with old-style auth. Will
  592. # probably need to detec that and handle similarly.
  593. SASL = getattr(self._owner, 'SASL', None)
  594. if SASL and SASL.startsasl == 'success':
  595. # Send restart after authentication.
  596. body = Node('body')
  597. body.setAttr('xmpp:restart', 'true')
  598. body.setAttr('xmlns:xmpp', 'urn:xmpp:xbosh')
  599. self.restart = True
  600. else:
  601. # Opening a new BOSH session.
  602. self.restart = False
  603. body=Node('body')
  604. body.setNamespace(NS_HTTP_BIND)
  605. body.setAttr('hold', self.hold)
  606. body.setAttr('wait', self.wait)
  607. body.setAttr('ver', '1.6')
  608. body.setAttr('xmpp:version', stream.getAttr('version'))
  609. body.setAttr('to', stream.getAttr('to'))
  610. body.setAttr('xmlns:xmpp', 'urn:xmpp:xbosh')
  611. # XXX Ack support for request acknowledgements.
  612. if self._server != self._http_host:
  613. if self._port:
  614. route = '%s:%s' % self._server, self._port
  615. else:
  616. route = self._server
  617. body.setAttr('route', route)
  618. else:
  619. # Mid stream, wrap the xml stanza in a BOSH body wrapper
  620. if stream:
  621. if type(stream) == type('') or type(stream) == type(''):
  622. stream = Node(node=stream)
  623. stream = [stream]
  624. else:
  625. stream = []
  626. body = Node('body', payload=stream)
  627. body.setNamespace('http://jabber.org/protocol/httpbind')
  628. body.setAttr('content', 'text/xml; charset=utf-8')
  629. body.setAttr('xml:lang', 'en')
  630. body.setAttr('rid', self.Rid)
  631. if self.Sid:
  632. body.setAttr('sid', self.Sid)
  633. return str(body)
  634. def send(self, raw_data, headers={}):
  635. if type(raw_data) != type('') or type(raw_data) != type(''):
  636. raw_data = str(raw_data)
  637. bosh_data = self.xmlstream_to_bosh(raw_data)
  638. default = dict(self.headers)
  639. default['Host'] = self._http_host
  640. default['Content-Length'] = len(bosh_data)
  641. if self.GZIP:
  642. default['Accept-Encoding'] = 'gzip, deflate'
  643. headers = dict(default, **headers)
  644. conn = self.Connection()
  645. if self.PIPELINE:
  646. conn._HTTPConnection__state = _CS_IDLE
  647. self.DEBUG(bosh_data, 'sent')
  648. conn.request(POST, self._http_path, bosh_data, headers)
  649. respobj = conn.response_class(
  650. conn.sock, strict=conn.strict, method=conn._method,
  651. )
  652. if self.PIPELINE:
  653. self._respobjs.append(
  654. (respobj, bosh_data)
  655. )
  656. else:
  657. self._respobjs[conn.sock] = (respobj, bosh_data)
  658. if hasattr(self._owner, 'Dispatcher') and bosh_data.strip():
  659. self._owner.Dispatcher.Event('', DATA_SENT, bosh_data)
  660. return True
  661. def disconnect(self):
  662. self.DEBUG("Closing socket", 'stop')
  663. if self.PIPELINE:
  664. if self._pipeline and self._pipeline.sock:
  665. self._pipeline.sock.shutdown()
  666. self._pipeline.close()
  667. else:
  668. for sock in self._respobjs:
  669. sock.shutdown()
  670. sock.close()
  671. def disconnected(self):
  672. self.DEBUG("BOSH transport operation failed", 'error')
  673. def pending_data(self, timeout=0):
  674. pending = False
  675. if self.PIPELINE:
  676. if not self._pipeline or not self._pipeline.sock:
  677. return
  678. pending = select.select([self._pipeline.sock], [], [], timeout)[0]
  679. else:
  680. pending = select.select(list(self._respobjs.keys()), [], [], timeout,)[0]
  681. if not pending and self.accepts_more_requests():
  682. self.send('')
  683. return pending
  684. def accepts_more_requests(self):
  685. if not self.authenticated():
  686. return False
  687. if self.PIPELINE:
  688. return len(self._respobjs) < self.hold
  689. if len(self._respobjs) >= self.requests - 1:
  690. return False
  691. return len(self._respobjs) < self.hold
  692. def authenticated(self):
  693. return self._owner and '+' in self._owner.connected
  694. @property
  695. def Rid(self):
  696. """
  697. An auto incrementing response id.
  698. """
  699. if not self._rid:
  700. self._rid = random.randint(0, 10000000)
  701. else:
  702. self._rid += 1
  703. return str(self._rid)
  704. @Rid.setter
  705. def Rid(self, i):
  706. """
  707. Set the Rid's next value
  708. """
  709. self._rid = int(i) - 1
  710. def getPort(self):
  711. """
  712. Return the port of the backend server (behind the endpoint).
  713. """
  714. return self._port