adhoc.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701
  1. """
  2. SleekXMPP: The Sleek XMPP Library
  3. Copyright (C) 2011 Nathanael C. Fritz, Lance J.T. Stout
  4. This file is part of SleekXMPP.
  5. See the file LICENSE for copying permission.
  6. """
  7. import logging
  8. import time
  9. from sleekxmpp import Iq
  10. from sleekxmpp.exceptions import IqError
  11. from sleekxmpp.xmlstream.handler import Callback
  12. from sleekxmpp.xmlstream.matcher import StanzaPath
  13. from sleekxmpp.xmlstream import register_stanza_plugin, JID
  14. from sleekxmpp.plugins import BasePlugin
  15. from sleekxmpp.plugins.xep_0050 import stanza
  16. from sleekxmpp.plugins.xep_0050 import Command
  17. from sleekxmpp.plugins.xep_0004 import Form
  18. log = logging.getLogger(__name__)
  19. class XEP_0050(BasePlugin):
  20. """
  21. XEP-0050: Ad-Hoc Commands
  22. XMPP's Adhoc Commands provides a generic workflow mechanism for
  23. interacting with applications. The result is similar to menu selections
  24. and multi-step dialogs in normal desktop applications. Clients do not
  25. need to know in advance what commands are provided by any particular
  26. application or agent. While adhoc commands provide similar functionality
  27. to Jabber-RPC, adhoc commands are used primarily for human interaction.
  28. Also see <http://xmpp.org/extensions/xep-0050.html>
  29. Configuration Values:
  30. threaded -- Indicates if command events should be threaded.
  31. Defaults to True.
  32. Events:
  33. command_execute -- Received a command with action="execute"
  34. command_next -- Received a command with action="next"
  35. command_complete -- Received a command with action="complete"
  36. command_cancel -- Received a command with action="cancel"
  37. Attributes:
  38. threaded -- Indicates if command events should be threaded.
  39. Defaults to True.
  40. commands -- A dictionary mapping JID/node pairs to command
  41. names and handlers.
  42. sessions -- A dictionary or equivalent backend mapping
  43. session IDs to dictionaries containing data
  44. relevant to a command's session.
  45. Methods:
  46. plugin_init -- Overrides base_plugin.plugin_init
  47. post_init -- Overrides base_plugin.post_init
  48. new_session -- Return a new session ID.
  49. prep_handlers -- Placeholder. May call with a list of handlers
  50. to prepare them for use with the session storage
  51. backend, if needed.
  52. set_backend -- Replace the default session storage with some
  53. external storage mechanism, such as a database.
  54. The provided backend wrapper must be able to
  55. act using the same syntax as a dictionary.
  56. add_command -- Add a command for use by external entitites.
  57. get_commands -- Retrieve a list of commands provided by a
  58. remote agent.
  59. send_command -- Send a command request to a remote agent.
  60. start_command -- Command user API: initiate a command session
  61. continue_command -- Command user API: proceed to the next step
  62. cancel_command -- Command user API: cancel a command
  63. complete_command -- Command user API: finish a command
  64. terminate_command -- Command user API: delete a command's session
  65. """
  66. name = 'xep_0050'
  67. description = 'XEP-0050: Ad-Hoc Commands'
  68. dependencies = set(['xep_0030', 'xep_0004'])
  69. stanza = stanza
  70. default_config = {
  71. 'threaded': True,
  72. 'session_db': None
  73. }
  74. def plugin_init(self):
  75. """Start the XEP-0050 plugin."""
  76. self.sessions = self.session_db
  77. if self.sessions is None:
  78. self.sessions = {}
  79. self.commands = {}
  80. self.xmpp.register_handler(
  81. Callback("Ad-Hoc Execute",
  82. StanzaPath('iq@type=set/command'),
  83. self._handle_command))
  84. register_stanza_plugin(Iq, Command)
  85. register_stanza_plugin(Command, Form, iterable=True)
  86. self.xmpp.add_event_handler('command_execute',
  87. self._handle_command_start,
  88. threaded=self.threaded)
  89. self.xmpp.add_event_handler('command_next',
  90. self._handle_command_next,
  91. threaded=self.threaded)
  92. self.xmpp.add_event_handler('command_cancel',
  93. self._handle_command_cancel,
  94. threaded=self.threaded)
  95. self.xmpp.add_event_handler('command_complete',
  96. self._handle_command_complete,
  97. threaded=self.threaded)
  98. def plugin_end(self):
  99. self.xmpp.del_event_handler('command_execute',
  100. self._handle_command_start)
  101. self.xmpp.del_event_handler('command_next',
  102. self._handle_command_next)
  103. self.xmpp.del_event_handler('command_cancel',
  104. self._handle_command_cancel)
  105. self.xmpp.del_event_handler('command_complete',
  106. self._handle_command_complete)
  107. self.xmpp.remove_handler('Ad-Hoc Execute')
  108. self.xmpp['xep_0030'].del_feature(feature=Command.namespace)
  109. self.xmpp['xep_0030'].set_items(node=Command.namespace, items=tuple())
  110. def session_bind(self, jid):
  111. self.xmpp['xep_0030'].add_feature(Command.namespace)
  112. self.xmpp['xep_0030'].set_items(node=Command.namespace, items=tuple())
  113. def set_backend(self, db):
  114. """
  115. Replace the default session storage dictionary with
  116. a generic, external data storage mechanism.
  117. The replacement backend must be able to interact through
  118. the same syntax and interfaces as a normal dictionary.
  119. Arguments:
  120. db -- The new session storage mechanism.
  121. """
  122. self.sessions = db
  123. def prep_handlers(self, handlers, **kwargs):
  124. """
  125. Prepare a list of functions for use by the backend service.
  126. Intended to be replaced by the backend service as needed.
  127. Arguments:
  128. handlers -- A list of function pointers
  129. **kwargs -- Any additional parameters required by the backend.
  130. """
  131. pass
  132. # =================================================================
  133. # Server side (command provider) API
  134. def add_command(self, jid=None, node=None, name='', handler=None):
  135. """
  136. Make a new command available to external entities.
  137. Access control may be implemented in the provided handler.
  138. Command workflow is done across a sequence of command handlers. The
  139. first handler is given the initial Iq stanza of the request in order
  140. to support access control. Subsequent handlers are given only the
  141. payload items of the command. All handlers will receive the command's
  142. session data.
  143. Arguments:
  144. jid -- The JID that will expose the command.
  145. node -- The node associated with the command.
  146. name -- A human readable name for the command.
  147. handler -- A function that will generate the response to the
  148. initial command request, as well as enforcing any
  149. access control policies.
  150. """
  151. if jid is None:
  152. jid = self.xmpp.boundjid
  153. elif not isinstance(jid, JID):
  154. jid = JID(jid)
  155. item_jid = jid.full
  156. self.xmpp['xep_0030'].add_identity(category='automation',
  157. itype='command-list',
  158. name='Ad-Hoc commands',
  159. node=Command.namespace,
  160. jid=jid)
  161. self.xmpp['xep_0030'].add_item(jid=item_jid,
  162. name=name,
  163. node=Command.namespace,
  164. subnode=node,
  165. ijid=jid)
  166. self.xmpp['xep_0030'].add_identity(category='automation',
  167. itype='command-node',
  168. name=name,
  169. node=node,
  170. jid=jid)
  171. self.xmpp['xep_0030'].add_feature(Command.namespace, None, jid)
  172. self.commands[(item_jid, node)] = (name, handler)
  173. def new_session(self):
  174. """Return a new session ID."""
  175. return str(time.time()) + '-' + self.xmpp.new_id()
  176. def _handle_command(self, iq):
  177. """Raise command events based on the command action."""
  178. self.xmpp.event('command_%s' % iq['command']['action'], iq)
  179. def _handle_command_start(self, iq):
  180. """
  181. Process an initial request to execute a command.
  182. Arguments:
  183. iq -- The command execution request.
  184. """
  185. sessionid = self.new_session()
  186. node = iq['command']['node']
  187. key = (iq['to'].full, node)
  188. name, handler = self.commands.get(key, ('Not found', None))
  189. if not handler:
  190. log.debug('Command not found: %s, %s', key, self.commands)
  191. payload = []
  192. for stanza in iq['command']['substanzas']:
  193. payload.append(stanza)
  194. if len(payload) == 1:
  195. payload = payload[0]
  196. interfaces = set([item.plugin_attrib for item in payload])
  197. payload_classes = set([item.__class__ for item in payload])
  198. initial_session = {'id': sessionid,
  199. 'from': iq['from'],
  200. 'to': iq['to'],
  201. 'node': node,
  202. 'payload': payload,
  203. 'interfaces': interfaces,
  204. 'payload_classes': payload_classes,
  205. 'notes': None,
  206. 'has_next': False,
  207. 'allow_complete': False,
  208. 'allow_prev': False,
  209. 'past': [],
  210. 'next': None,
  211. 'prev': None,
  212. 'cancel': None}
  213. session = handler(iq, initial_session)
  214. self._process_command_response(iq, session)
  215. def _handle_command_next(self, iq):
  216. """
  217. Process a request for the next step in the workflow
  218. for a command with multiple steps.
  219. Arguments:
  220. iq -- The command continuation request.
  221. """
  222. sessionid = iq['command']['sessionid']
  223. session = self.sessions.get(sessionid)
  224. if session:
  225. handler = session['next']
  226. interfaces = session['interfaces']
  227. results = []
  228. for stanza in iq['command']['substanzas']:
  229. if stanza.plugin_attrib in interfaces:
  230. results.append(stanza)
  231. if len(results) == 1:
  232. results = results[0]
  233. session = handler(results, session)
  234. self._process_command_response(iq, session)
  235. else:
  236. raise XMPPError('item-not-found')
  237. def _handle_command_prev(self, iq):
  238. """
  239. Process a request for the prev step in the workflow
  240. for a command with multiple steps.
  241. Arguments:
  242. iq -- The command continuation request.
  243. """
  244. sessionid = iq['command']['sessionid']
  245. session = self.sessions.get(sessionid)
  246. if session:
  247. handler = session['prev']
  248. interfaces = session['interfaces']
  249. results = []
  250. for stanza in iq['command']['substanzas']:
  251. if stanza.plugin_attrib in interfaces:
  252. results.append(stanza)
  253. if len(results) == 1:
  254. results = results[0]
  255. session = handler(results, session)
  256. self._process_command_response(iq, session)
  257. else:
  258. raise XMPPError('item-not-found')
  259. def _process_command_response(self, iq, session):
  260. """
  261. Generate a command reply stanza based on the
  262. provided session data.
  263. Arguments:
  264. iq -- The command request stanza.
  265. session -- A dictionary of relevant session data.
  266. """
  267. sessionid = session['id']
  268. payload = session['payload']
  269. if payload is None:
  270. payload = []
  271. if not isinstance(payload, list):
  272. payload = [payload]
  273. interfaces = session.get('interfaces', set())
  274. payload_classes = session.get('payload_classes', set())
  275. interfaces.update(set([item.plugin_attrib for item in payload]))
  276. payload_classes.update(set([item.__class__ for item in payload]))
  277. session['interfaces'] = interfaces
  278. session['payload_classes'] = payload_classes
  279. self.sessions[sessionid] = session
  280. for item in payload:
  281. register_stanza_plugin(Command, item.__class__, iterable=True)
  282. iq.reply()
  283. iq['command']['node'] = session['node']
  284. iq['command']['sessionid'] = session['id']
  285. if session['next'] is None:
  286. iq['command']['actions'] = []
  287. iq['command']['status'] = 'completed'
  288. elif session['has_next']:
  289. actions = ['next']
  290. if session['allow_complete']:
  291. actions.append('complete')
  292. if session['allow_prev']:
  293. actions.append('prev')
  294. iq['command']['actions'] = actions
  295. iq['command']['status'] = 'executing'
  296. else:
  297. iq['command']['actions'] = ['complete']
  298. iq['command']['status'] = 'executing'
  299. iq['command']['notes'] = session['notes']
  300. for item in payload:
  301. iq['command'].append(item)
  302. iq.send()
  303. def _handle_command_cancel(self, iq):
  304. """
  305. Process a request to cancel a command's execution.
  306. Arguments:
  307. iq -- The command cancellation request.
  308. """
  309. node = iq['command']['node']
  310. sessionid = iq['command']['sessionid']
  311. session = self.sessions.get(sessionid)
  312. if session:
  313. handler = session['cancel']
  314. if handler:
  315. handler(iq, session)
  316. del self.sessions[sessionid]
  317. iq.reply()
  318. iq['command']['node'] = node
  319. iq['command']['sessionid'] = sessionid
  320. iq['command']['status'] = 'canceled'
  321. iq['command']['notes'] = session['notes']
  322. iq.send()
  323. else:
  324. raise XMPPError('item-not-found')
  325. def _handle_command_complete(self, iq):
  326. """
  327. Process a request to finish the execution of command
  328. and terminate the workflow.
  329. All data related to the command session will be removed.
  330. Arguments:
  331. iq -- The command completion request.
  332. """
  333. node = iq['command']['node']
  334. sessionid = iq['command']['sessionid']
  335. session = self.sessions.get(sessionid)
  336. if session:
  337. handler = session['next']
  338. interfaces = session['interfaces']
  339. results = []
  340. for stanza in iq['command']['substanzas']:
  341. if stanza.plugin_attrib in interfaces:
  342. results.append(stanza)
  343. if len(results) == 1:
  344. results = results[0]
  345. if handler:
  346. handler(results, session)
  347. del self.sessions[sessionid]
  348. payload = session['payload']
  349. if payload is None:
  350. payload = []
  351. if not isinstance(payload, list):
  352. payload = [payload]
  353. for item in payload:
  354. register_stanza_plugin(Command, item.__class__, iterable=True)
  355. iq.reply()
  356. iq['command']['node'] = node
  357. iq['command']['sessionid'] = sessionid
  358. iq['command']['actions'] = []
  359. iq['command']['status'] = 'completed'
  360. iq['command']['notes'] = session['notes']
  361. for item in payload:
  362. iq['command'].append(item)
  363. iq.send()
  364. else:
  365. raise XMPPError('item-not-found')
  366. # =================================================================
  367. # Client side (command user) API
  368. def get_commands(self, jid, **kwargs):
  369. """
  370. Return a list of commands provided by a given JID.
  371. Arguments:
  372. jid -- The JID to query for commands.
  373. local -- If true, then the query is for a JID/node
  374. combination handled by this Sleek instance and
  375. no stanzas need to be sent.
  376. Otherwise, a disco stanza must be sent to the
  377. remove JID to retrieve the items.
  378. ifrom -- Specifiy the sender's JID.
  379. block -- If true, block and wait for the stanzas' reply.
  380. timeout -- The time in seconds to block while waiting for
  381. a reply. If None, then wait indefinitely.
  382. callback -- Optional callback to execute when a reply is
  383. received instead of blocking and waiting for
  384. the reply.
  385. iterator -- If True, return a result set iterator using
  386. the XEP-0059 plugin, if the plugin is loaded.
  387. Otherwise the parameter is ignored.
  388. """
  389. return self.xmpp['xep_0030'].get_items(jid=jid,
  390. node=Command.namespace,
  391. **kwargs)
  392. def send_command(self, jid, node, ifrom=None, action='execute',
  393. payload=None, sessionid=None, flow=False, **kwargs):
  394. """
  395. Create and send a command stanza, without using the provided
  396. workflow management APIs.
  397. Arguments:
  398. jid -- The JID to send the command request or result.
  399. node -- The node for the command.
  400. ifrom -- Specify the sender's JID.
  401. action -- May be one of: execute, cancel, complete,
  402. or cancel.
  403. payload -- Either a list of payload items, or a single
  404. payload item such as a data form.
  405. sessionid -- The current session's ID value.
  406. flow -- If True, process the Iq result using the
  407. command workflow methods contained in the
  408. session instead of returning the response
  409. stanza itself. Defaults to False.
  410. block -- Specify if the send call will block until a
  411. response is received, or a timeout occurs.
  412. Defaults to True.
  413. timeout -- The length of time (in seconds) to wait for a
  414. response before exiting the send call
  415. if blocking is used. Defaults to
  416. sleekxmpp.xmlstream.RESPONSE_TIMEOUT
  417. callback -- Optional reference to a stream handler
  418. function. Will be executed when a reply
  419. stanza is received if flow=False.
  420. """
  421. iq = self.xmpp.Iq()
  422. iq['type'] = 'set'
  423. iq['to'] = jid
  424. iq['from'] = ifrom
  425. iq['command']['node'] = node
  426. iq['command']['action'] = action
  427. if sessionid is not None:
  428. iq['command']['sessionid'] = sessionid
  429. if payload is not None:
  430. if not isinstance(payload, list):
  431. payload = [payload]
  432. for item in payload:
  433. iq['command'].append(item)
  434. if not flow:
  435. return iq.send(**kwargs)
  436. else:
  437. if kwargs.get('block', True):
  438. try:
  439. result = iq.send(**kwargs)
  440. except IqError as err:
  441. result = err.iq
  442. self._handle_command_result(result)
  443. else:
  444. iq.send(block=False, callback=self._handle_command_result)
  445. def start_command(self, jid, node, session, ifrom=None, block=False):
  446. """
  447. Initiate executing a command provided by a remote agent.
  448. The default workflow provided is non-blocking, but a blocking
  449. version may be used with block=True.
  450. The provided session dictionary should contain:
  451. next -- A handler for processing the command result.
  452. error -- A handler for processing any error stanzas
  453. generated by the request.
  454. Arguments:
  455. jid -- The JID to send the command request.
  456. node -- The node for the desired command.
  457. session -- A dictionary of relevant session data.
  458. ifrom -- Optionally specify the sender's JID.
  459. block -- If True, block execution until a result
  460. is received. Defaults to False.
  461. """
  462. session['jid'] = jid
  463. session['node'] = node
  464. session['timestamp'] = time.time()
  465. session['block'] = block
  466. if 'payload' not in session:
  467. session['payload'] = None
  468. iq = self.xmpp.Iq()
  469. iq['type'] = 'set'
  470. iq['to'] = jid
  471. iq['from'] = ifrom
  472. session['from'] = ifrom
  473. iq['command']['node'] = node
  474. iq['command']['action'] = 'execute'
  475. if session['payload'] is not None:
  476. payload = session['payload']
  477. if not isinstance(payload, list):
  478. payload = list(payload)
  479. for stanza in payload:
  480. iq['command'].append(stanza)
  481. sessionid = 'client:pending_' + iq['id']
  482. session['id'] = sessionid
  483. self.sessions[sessionid] = session
  484. if session['block']:
  485. try:
  486. result = iq.send(block=True)
  487. except IqError as err:
  488. result = err.iq
  489. self._handle_command_result(result)
  490. else:
  491. iq.send(block=False, callback=self._handle_command_result)
  492. def continue_command(self, session, direction='next'):
  493. """
  494. Execute the next action of the command.
  495. Arguments:
  496. session -- All stored data relevant to the current
  497. command session.
  498. """
  499. sessionid = 'client:' + session['id']
  500. self.sessions[sessionid] = session
  501. self.send_command(session['jid'],
  502. session['node'],
  503. ifrom=session.get('from', None),
  504. action=direction,
  505. payload=session.get('payload', None),
  506. sessionid=session['id'],
  507. flow=True,
  508. block=session['block'])
  509. def cancel_command(self, session):
  510. """
  511. Cancel the execution of a command.
  512. Arguments:
  513. session -- All stored data relevant to the current
  514. command session.
  515. """
  516. sessionid = 'client:' + session['id']
  517. self.sessions[sessionid] = session
  518. self.send_command(session['jid'],
  519. session['node'],
  520. ifrom=session.get('from', None),
  521. action='cancel',
  522. payload=session.get('payload', None),
  523. sessionid=session['id'],
  524. flow=True,
  525. block=session['block'])
  526. def complete_command(self, session):
  527. """
  528. Finish the execution of a command workflow.
  529. Arguments:
  530. session -- All stored data relevant to the current
  531. command session.
  532. """
  533. sessionid = 'client:' + session['id']
  534. self.sessions[sessionid] = session
  535. self.send_command(session['jid'],
  536. session['node'],
  537. ifrom=session.get('from', None),
  538. action='complete',
  539. payload=session.get('payload', None),
  540. sessionid=session['id'],
  541. flow=True,
  542. block=session['block'])
  543. def terminate_command(self, session):
  544. """
  545. Delete a command's session after a command has completed
  546. or an error has occured.
  547. Arguments:
  548. session -- All stored data relevant to the current
  549. command session.
  550. """
  551. sessionid = 'client:' + session['id']
  552. try:
  553. del self.sessions[sessionid]
  554. except Exception as e:
  555. log.error("Error deleting adhoc command session: %s" % e.message)
  556. def _handle_command_result(self, iq):
  557. """
  558. Process the results of a command request.
  559. Will execute the 'next' handler stored in the session
  560. data, or the 'error' handler depending on the Iq's type.
  561. Arguments:
  562. iq -- The command response.
  563. """
  564. sessionid = 'client:' + iq['command']['sessionid']
  565. pending = False
  566. if sessionid not in self.sessions:
  567. pending = True
  568. pendingid = 'client:pending_' + iq['id']
  569. if pendingid not in self.sessions:
  570. return
  571. sessionid = pendingid
  572. session = self.sessions[sessionid]
  573. sessionid = 'client:' + iq['command']['sessionid']
  574. session['id'] = iq['command']['sessionid']
  575. self.sessions[sessionid] = session
  576. if pending:
  577. del self.sessions[pendingid]
  578. handler_type = 'next'
  579. if iq['type'] == 'error':
  580. handler_type = 'error'
  581. handler = session.get(handler_type, None)
  582. if handler:
  583. handler(iq, session)
  584. elif iq['type'] == 'error':
  585. self.terminate_command(session)
  586. if iq['command']['status'] == 'completed':
  587. self.terminate_command(session)