123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377 |
- ## transports.py
- ##
- ## Copyright (C) 2003-2005 Alexey "Snake" Nezhdanov
- ##
- ## This program is free software; you can redistribute it and/or modify
- ## it under the terms of the GNU General Public License as published by
- ## the Free Software Foundation; either version 2, or (at your option)
- ## any later version.
- ##
- ## This program is distributed in the hope that it will be useful,
- ## but WITHOUT ANY WARRANTY; without even the implied warranty of
- ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- ## GNU General Public License for more details.
- # $Id$
- """
- Main xmpppy mechanism. Provides library with methods to assign different handlers
- to different XMPP stanzas.
- Contains one tunable attribute: DefaultTimeout (25 seconds by default). It defines time that
- Dispatcher.SendAndWaitForResponce method will wait for reply stanza before giving up.
- """
- import time,sys
- from . import simplexml
- from .protocol import *
- from .client import PlugIn
- DefaultTimeout=25
- ID=0
- class Dispatcher(PlugIn):
- """ Ancestor of PlugIn class. Handles XMPP stream, i.e. aware of stream headers.
- Can be plugged out/in to restart these headers (used for SASL f.e.). """
- def __init__(self):
- PlugIn.__init__(self)
- DBG_LINE='dispatcher'
- self.handlers={}
- self._expected={}
- self._defaultHandler=None
- self._pendingExceptions=[]
- self._eventHandler=None
- self._cycleHandlers=[]
- self._exported_methods=[self.Process,self.RegisterHandler,self.RegisterDefaultHandler,\
- self.RegisterEventHandler,self.UnregisterCycleHandler,self.RegisterCycleHandler,\
- self.RegisterHandlerOnce,self.UnregisterHandler,self.RegisterProtocol,\
- self.WaitForResponse,self.SendAndWaitForResponse,self.send,self.disconnect,\
- self.SendAndCallForResponse, ]
- def dumpHandlers(self):
- """ Return set of user-registered callbacks in it's internal format.
- Used within the library to carry user handlers set over Dispatcher replugins. """
- return self.handlers
- def restoreHandlers(self,handlers):
- """ Restores user-registered callbacks structure from dump previously obtained via dumpHandlers.
- Used within the library to carry user handlers set over Dispatcher replugins. """
- self.handlers=handlers
- def _init(self):
- """ Registers default namespaces/protocols/handlers. Used internally. """
- self.RegisterNamespace('unknown')
- self.RegisterNamespace(NS_STREAMS)
- self.RegisterNamespace(self._owner.defaultNamespace)
- self.RegisterProtocol('iq',Iq)
- self.RegisterProtocol('presence',Presence)
- self.RegisterProtocol('message',Message)
- self.RegisterDefaultHandler(self.returnStanzaHandler)
- self.RegisterHandler('error',self.streamErrorHandler,xmlns=NS_STREAMS)
- def plugin(self, owner):
- """ Plug the Dispatcher instance into Client class instance and send initial stream header. Used internally."""
- self._init()
- for method in self._old_owners_methods:
- if method.__name__=='send': self._owner_send=method; break
- self._owner.lastErrNode=None
- self._owner.lastErr=None
- self._owner.lastErrCode=None
- self.StreamInit()
- def plugout(self):
- """ Prepares instance to be destructed. """
- self.Stream.dispatch=None
- self.Stream.DEBUG=None
- self.Stream.features=None
- self.Stream.destroy()
- def StreamInit(self):
- """ Send an initial stream header. """
- self.Stream=simplexml.NodeBuilder()
- self.Stream._dispatch_depth=2
- self.Stream.dispatch=self.dispatch
- self.Stream.stream_header_received=self._check_stream_start
- self._owner.debug_flags.append(simplexml.DBG_NODEBUILDER)
- self.Stream.DEBUG=self._owner.DEBUG
- self.Stream.features=None
- self._metastream=Node('stream:stream')
- self._metastream.setNamespace(self._owner.Namespace)
- self._metastream.setAttr('version','1.0')
- self._metastream.setAttr('xmlns:stream',NS_STREAMS)
- self._metastream.setAttr('to',self._owner.Server)
- self._owner.send("<?xml version='1.0'?>%s>"%str(self._metastream)[:-2])
- def _check_stream_start(self,ns,tag,attrs):
- if ns!=NS_STREAMS or tag!='stream':
- raise ValueError('Incorrect stream start: (%s,%s). Terminating.'%(tag,ns))
- def Process(self, timeout=0):
- """ Check incoming stream for data waiting. If "timeout" is positive - block for as max. this time.
- Returns:
- 1) length of processed data if some data were processed;
- 2) '0' string if no data were processed but link is alive;
- 3) 0 (zero) if underlying connection is closed.
- Take note that in case of disconnection detect during Process() call
- disconnect handlers are called automatically.
- """
- for handler in self._cycleHandlers: handler(self)
- if len(self._pendingExceptions) > 0:
- _pendingException = self._pendingExceptions.pop()
- raise _pendingException[0](_pendingException[1]).with_traceback(_pendingException[2])
- if self._owner.Connection.pending_data(timeout):
- try: data=self._owner.Connection.receive()
- except IOError: return
- self.Stream.Parse(data)
- if len(self._pendingExceptions) > 0:
- _pendingException = self._pendingExceptions.pop()
- ex = _pendingException[0](_pendingException[1])
- if hasattr(ex, "with_traceback"):
- ex = ex.with_traceback(_pendingException[2])
- raise ex
- if data: return len(data)
- return '0' # It means that nothing is received but link is alive.
- def RegisterNamespace(self,xmlns,order='info'):
- """ Creates internal structures for newly registered namespace.
- You can register handlers for this namespace afterwards. By default one namespace
- already registered (jabber:client or jabber:component:accept depending on context. """
- self.DEBUG('Registering namespace "%s"'%xmlns,order)
- self.handlers[xmlns]={}
- self.RegisterProtocol('unknown',Protocol,xmlns=xmlns)
- self.RegisterProtocol('default',Protocol,xmlns=xmlns)
- def RegisterProtocol(self,tag_name,Proto,xmlns=None,order='info'):
- """ Used to declare some top-level stanza name to dispatcher.
- Needed to start registering handlers for such stanzas.
- Iq, message and presence protocols are registered by default. """
- if not xmlns: xmlns=self._owner.defaultNamespace
- self.DEBUG('Registering protocol "%s" as %s(%s)'%(tag_name,Proto,xmlns), order)
- self.handlers[xmlns][tag_name]={type:Proto, 'default':[]}
- def RegisterNamespaceHandler(self,xmlns,handler,typ='',ns='', makefirst=0, system=0):
- """ Register handler for processing all stanzas for specified namespace. """
- self.RegisterHandler('default', handler, typ, ns, xmlns, makefirst, system)
- def RegisterHandler(self,name,handler,typ='',ns='',xmlns=None, makefirst=0, system=0):
- """Register user callback as stanzas handler of declared type. Callback must take
- (if chained, see later) arguments: dispatcher instance (for replying), incomed
- return of previous handlers.
- The callback must raise xmpp.NodeProcessed just before return if it want preven
- callbacks to be called with the same stanza as argument _and_, more importantly
- library from returning stanza to sender with error set (to be enabled in 0.2 ve
- Arguments:
- "name" - name of stanza. F.e. "iq".
- "handler" - user callback.
- "typ" - value of stanza's "type" attribute. If not specified any value match
- "ns" - namespace of child that stanza must contain.
- "chained" - chain together output of several handlers.
- "makefirst" - insert handler in the beginning of handlers list instead of
- adding it to the end. Note that more common handlers (i.e. w/o "typ" and "
- will be called first nevertheless.
- "system" - call handler even if NodeProcessed Exception were raised already.
- """
- if not xmlns: xmlns=self._owner.defaultNamespace
- self.DEBUG('Registering handler %s for "%s" type->%s ns->%s(%s)'%(handler,name,typ,ns,xmlns), 'info')
- if not typ and not ns: typ='default'
- if xmlns not in self.handlers: self.RegisterNamespace(xmlns,'warn')
- if name not in self.handlers[xmlns]: self.RegisterProtocol(name,Protocol,xmlns,'warn')
- if typ+ns not in self.handlers[xmlns][name]: self.handlers[xmlns][name][typ+ns]=[]
- if makefirst: self.handlers[xmlns][name][typ+ns].insert(0,{'func':handler,'system':system})
- else: self.handlers[xmlns][name][typ+ns].append({'func':handler,'system':system})
- def RegisterHandlerOnce(self,name,handler,typ='',ns='',xmlns=None,makefirst=0, system=0):
- """ Unregister handler after first call (not implemented yet). """
- if not xmlns: xmlns=self._owner.defaultNamespace
- self.RegisterHandler(name, handler, typ, ns, xmlns, makefirst, system)
- def UnregisterHandler(self,name,handler,typ='',ns='',xmlns=None):
- """ Unregister handler. "typ" and "ns" must be specified exactly the same as with registering."""
- if not xmlns: xmlns=self._owner.defaultNamespace
- if xmlns not in self.handlers: return
- if not typ and not ns: typ='default'
- for pack in self.handlers[xmlns][name][typ+ns]:
- if handler==pack['func']: break
- else: pack=None
- try: self.handlers[xmlns][name][typ+ns].remove(pack)
- except ValueError: pass
- def RegisterDefaultHandler(self,handler):
- """ Specify the handler that will be used if no NodeProcessed exception were raised.
- This is returnStanzaHandler by default. """
- self._defaultHandler=handler
- def RegisterEventHandler(self,handler):
- """ Register handler that will process events. F.e. "FILERECEIVED" event. """
- self._eventHandler=handler
- def returnStanzaHandler(self,conn,stanza):
- """ Return stanza back to the sender with <feature-not-implemennted/> error set. """
- if stanza.getType() in ['get','set']:
- conn.send(Error(stanza,ERR_FEATURE_NOT_IMPLEMENTED))
- def streamErrorHandler(self,conn,error):
- name,text='error',error.getData()
- for tag in error.getChildren():
- if tag.getNamespace()==NS_XMPP_STREAMS:
- if tag.getName()=='text': text=tag.getData()
- else: name=tag.getName()
- if name in list(stream_exceptions.keys()): exc=stream_exceptions[name]
- else: exc=StreamError
- raise exc((name,text))
- def RegisterCycleHandler(self,handler):
- """ Register handler that will be called on every Dispatcher.Process() call. """
- if handler not in self._cycleHandlers: self._cycleHandlers.append(handler)
- def UnregisterCycleHandler(self,handler):
- """ Unregister handler that will is called on every Dispatcher.Process() call."""
- if handler in self._cycleHandlers: self._cycleHandlers.remove(handler)
- def Event(self,realm,event,data):
- """ Raise some event. Takes three arguments:
- 1) "realm" - scope of event. Usually a namespace.
- 2) "event" - the event itself. F.e. "SUCESSFULL SEND".
- 3) data that comes along with event. Depends on event."""
- if self._eventHandler: self._eventHandler(realm,event,data)
- def dispatch(self,stanza,session=None,direct=0):
- """ Main procedure that performs XMPP stanza recognition and calling apppropriate handlers for it.
- Called internally. """
- if not session: session=self
- session.Stream._mini_dom=None
- name=stanza.getName()
- if not direct and self._owner._route:
- if name == 'route':
- if stanza.getAttr('error') == None:
- if len(stanza.getChildren()) == 1:
- stanza = stanza.getChildren()[0]
- name=stanza.getName()
- else:
- for each in stanza.getChildren():
- self.dispatch(each,session,direct=1)
- return
- elif name == 'presence':
- return
- elif name in ('features','bind'):
- pass
- else:
- raise UnsupportedStanzaType(name)
- if name=='features': session.Stream.features=stanza
- xmlns=stanza.getNamespace()
- if xmlns not in self.handlers:
- self.DEBUG("Unknown namespace: " + xmlns,'warn')
- xmlns='unknown'
- if name not in self.handlers[xmlns]:
- self.DEBUG("Unknown stanza: " + name,'warn')
- name='unknown'
- else:
- self.DEBUG("Got %s/%s stanza"%(xmlns,name), 'ok')
- if stanza.__class__.__name__=='Node': stanza=self.handlers[xmlns][name][type](node=stanza)
- typ=stanza.getType()
- if not typ: typ=''
- stanza.props=stanza.getProperties()
- ID=stanza.getID()
- session.DEBUG("Dispatching %s stanza with type->%s props->%s id->%s"%(name,typ,stanza.props,ID),'ok')
- list=['default'] # we will use all handlers:
- if typ in self.handlers[xmlns][name]: list.append(typ) # from very common...
- for prop in stanza.props:
- if prop in self.handlers[xmlns][name]: list.append(prop)
- if typ and typ+prop in self.handlers[xmlns][name]: list.append(typ+prop) # ...to very particular
- chain=self.handlers[xmlns]['default']['default']
- for key in list:
- if key: chain = chain + self.handlers[xmlns][name][key]
- output=''
- if ID in session._expected:
- user=0
- if type(session._expected[ID])==type(()):
- cb,args=session._expected[ID]
- session.DEBUG("Expected stanza arrived. Callback %s(%s) found!"%(cb,args),'ok')
- try: cb(session,stanza,**args)
- except Exception as typ:
- if typ.__class__.__name__!='NodeProcessed': raise
- else:
- session.DEBUG("Expected stanza arrived!",'ok')
- session._expected[ID]=stanza
- else: user=1
- for handler in chain:
- if user or handler['system']:
- try:
- handler['func'](session,stanza)
- except Exception as typ:
- if typ.__class__.__name__!='NodeProcessed':
- self._pendingExceptions.insert(0, sys.exc_info())
- return
- user=0
- if user and self._defaultHandler: self._defaultHandler(session,stanza)
- def WaitForResponse(self, ID, timeout=DefaultTimeout):
- """ Block and wait until stanza with specific "id" attribute will come.
- If no such stanza is arrived within timeout, return None.
- If operation failed for some reason then owner's attributes
- lastErrNode, lastErr and lastErrCode are set accordingly. """
- self._expected[ID]=None
- has_timed_out=0
- abort_time=time.time() + timeout
- self.DEBUG("Waiting for ID:%s with timeout %s..." % (ID,timeout),'wait')
- while not self._expected[ID]:
- if not self.Process(0.04):
- self._owner.lastErr="Disconnect"
- return None
- if time.time() > abort_time:
- self._owner.lastErr="Timeout"
- return None
- response=self._expected[ID]
- del self._expected[ID]
- if response.getErrorCode():
- self._owner.lastErrNode=response
- self._owner.lastErr=response.getError()
- self._owner.lastErrCode=response.getErrorCode()
- return response
- def SendAndWaitForResponse(self, stanza, timeout=DefaultTimeout):
- """ Put stanza on the wire and wait for recipient's response to it. """
- return self.WaitForResponse(self.send(stanza),timeout)
- def SendAndCallForResponse(self, stanza, func, args={}):
- """ Put stanza on the wire and call back when recipient replies.
- Additional callback arguments can be specified in args. """
- self._expected[self.send(stanza)]=(func,args)
- def send(self,stanza):
- """ Serialise stanza and put it on the wire. Assign an unique ID to it before send.
- Returns assigned ID."""
- if type(stanza) in [type(''), type('')]: return self._owner_send(stanza)
- if not isinstance(stanza,Protocol): _ID=None
- elif not stanza.getID():
- global ID
- ID+=1
- _ID=repr(ID)
- stanza.setID(_ID)
- else: _ID=stanza.getID()
- if self._owner._registered_name and not stanza.getAttr('from'): stanza.setAttr('from',self._owner._registered_name)
- if self._owner._route and stanza.getName()!='bind':
- to=self._owner.Server
- if stanza.getTo() and stanza.getTo().getDomain():
- to=stanza.getTo().getDomain()
- frm=stanza.getFrom()
- if frm.getDomain():
- frm=frm.getDomain()
- route=Protocol('route',to=to,frm=frm,payload=[stanza])
- stanza=route
- stanza.setNamespace(self._owner.Namespace)
- stanza.setParent(self._metastream)
- self._owner_send(stanza)
- return _ID
- def disconnect(self):
- """ Send a stream terminator and and handle all incoming stanzas before stream closure. """
- self._owner_send('</stream:stream>')
- while self.Process(1): pass
|