statemachine.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294
  1. """
  2. SleekXMPP: The Sleek XMPP Library
  3. Copyright (C) 2010 Nathanael C. Fritz
  4. This file is part of SleekXMPP.
  5. See the file LICENSE for copying permission.
  6. """
  7. import threading
  8. import time
  9. import logging
  10. log = logging.getLogger(__name__)
  11. class StateMachine(object):
  12. def __init__(self, states=None):
  13. if not states: states = []
  14. self.lock = threading.Condition()
  15. self.__states = []
  16. self.addStates(states)
  17. self.__default_state = self.__states[0]
  18. self.__current_state = self.__default_state
  19. def addStates(self, states):
  20. self.lock.acquire()
  21. try:
  22. for state in states:
  23. if state in self.__states:
  24. raise IndexError("The state '%s' is already in the StateMachine." % state)
  25. self.__states.append(state)
  26. finally:
  27. self.lock.release()
  28. def transition(self, from_state, to_state, wait=0.0, func=None, args=None, kwargs=None):
  29. '''
  30. Transition from the given `from_state` to the given `to_state`.
  31. This method will return `True` if the state machine is now in `to_state`. It
  32. will return `False` if a timeout occurred the transition did not occur.
  33. If `wait` is 0 (the default,) this method returns immediately if the state machine
  34. is not in `from_state`.
  35. If you want the thread to block and transition once the state machine to enters
  36. `from_state`, set `wait` to a non-negative value. Note there is no 'block
  37. indefinitely' flag since this leads to deadlock. If you want to wait indefinitely,
  38. choose a reasonable value for `wait` (e.g. 20 seconds) and do so in a while loop like so:
  39. ::
  40. while not thread_should_exit and not state_machine.transition('disconnected', 'connecting', wait=20 ):
  41. pass # timeout will occur every 20s unless transition occurs
  42. if thread_should_exit: return
  43. # perform actions here after successful transition
  44. This allows the thread to be responsive by setting `thread_should_exit=True`.
  45. The optional `func` argument allows the user to pass a callable operation which occurs
  46. within the context of the state transition (e.g. while the state machine is locked.)
  47. If `func` returns a True value, the transition will occur. If `func` returns a non-
  48. True value or if an exception is thrown, the transition will not occur. Any thrown
  49. exception is not caught by the state machine and is the caller's responsibility to handle.
  50. If `func` completes normally, this method will return the value returned by `func.` If
  51. values for `args` and `kwargs` are provided, they are expanded and passed like so:
  52. `func( *args, **kwargs )`.
  53. '''
  54. if not args:
  55. args = []
  56. if not kwargs:
  57. kwargs = {}
  58. return self.transition_any((from_state,), to_state, wait=wait,
  59. func=func, args=args, kwargs=kwargs)
  60. def transition_any(self, from_states, to_state, wait=0.0, func=None, args=None, kwargs=None):
  61. '''
  62. Transition from any of the given `from_states` to the given `to_state`.
  63. '''
  64. if not args:
  65. args = []
  66. if not kwargs:
  67. kwargs = {}
  68. if not isinstance(from_states, (tuple, list, set)):
  69. raise ValueError("from_states should be a list, tuple, or set")
  70. for state in from_states:
  71. if not state in self.__states:
  72. raise ValueError("StateMachine does not contain from_state %s." % state)
  73. if not to_state in self.__states:
  74. raise ValueError("StateMachine does not contain to_state %s." % to_state)
  75. if self.__current_state == to_state:
  76. return True
  77. start = time.time()
  78. while not self.lock.acquire(False):
  79. time.sleep(.001)
  80. if (start + wait - time.time()) <= 0.0:
  81. log.debug("==== Could not acquire lock in %s sec: %s -> %s ", wait, self.__current_state, to_state)
  82. return False
  83. while not self.__current_state in from_states:
  84. # detect timeout:
  85. remainder = start + wait - time.time()
  86. if remainder > 0:
  87. self.lock.wait(remainder)
  88. else:
  89. log.debug("State was not ready")
  90. self.lock.release()
  91. return False
  92. try: # lock is acquired; all other threads will return false or wait until notify/timeout
  93. if self.__current_state in from_states: # should always be True due to lock
  94. # Note that func might throw an exception, but that's OK, it aborts the transition
  95. return_val = func(*args,**kwargs) if func is not None else True
  96. # some 'false' value returned from func,
  97. # indicating that transition should not occur:
  98. if not return_val:
  99. return return_val
  100. log.debug(' ==== TRANSITION %s -> %s', self.__current_state, to_state)
  101. self._set_state(to_state)
  102. return return_val # some 'true' value returned by func or True if func was None
  103. else:
  104. log.error("StateMachine bug!! The lock should ensure this doesn't happen!")
  105. return False
  106. finally:
  107. self.lock.notify_all()
  108. self.lock.release()
  109. def transition_ctx(self, from_state, to_state, wait=0.0):
  110. '''
  111. Use the state machine as a context manager. The transition occurs on /exit/ from
  112. the `with` context, so long as no exception is thrown. For example:
  113. ::
  114. with state_machine.transition_ctx('one','two', wait=5) as locked:
  115. if locked:
  116. # the state machine is currently locked in state 'one', and will
  117. # transition to 'two' when the 'with' statement ends, so long as
  118. # no exception is thrown.
  119. print 'Currently locked in state one: %s' % state_machine['one']
  120. else:
  121. # The 'wait' timed out, and no lock has been acquired
  122. print 'Timed out before entering state "one"'
  123. print 'Since no exception was thrown, we are now in state "two": %s' % state_machine['two']
  124. The other main difference between this method and `transition()` is that the
  125. state machine is locked for the duration of the `with` statement. Normally,
  126. after a `transition()` occurs, the state machine is immediately unlocked and
  127. available to another thread to call `transition()` again.
  128. '''
  129. if not from_state in self.__states:
  130. raise ValueError("StateMachine does not contain from_state %s." % from_state)
  131. if not to_state in self.__states:
  132. raise ValueError("StateMachine does not contain to_state %s." % to_state)
  133. return _StateCtx(self, from_state, to_state, wait)
  134. def ensure(self, state, wait=0.0, block_on_transition=False):
  135. '''
  136. Ensure the state machine is currently in `state`, or wait until it enters `state`.
  137. '''
  138. return self.ensure_any((state,), wait=wait, block_on_transition=block_on_transition)
  139. def ensure_any(self, states, wait=0.0, block_on_transition=False):
  140. '''
  141. Ensure we are currently in one of the given `states` or wait until
  142. we enter one of those states.
  143. Note that due to the nature of the function, you cannot guarantee that
  144. the entirety of some operation completes while you remain in a given
  145. state. That would require acquiring and holding a lock, which
  146. would mean no other threads could do the same. (You'd essentially
  147. be serializing all of the threads that are 'ensuring' their tasks
  148. occurred in some state.
  149. '''
  150. if not (isinstance(states,tuple) or isinstance(states,list)):
  151. raise ValueError('states arg should be a tuple or list')
  152. for state in states:
  153. if not state in self.__states:
  154. raise ValueError("StateMachine does not contain state '%s'" % state)
  155. # if we're in the middle of a transition, determine whether we should
  156. # 'fall back' to the 'current' state, or wait for the new state, in order to
  157. # avoid an operation occurring in the wrong state.
  158. # TODO another option would be an ensure_ctx that uses a semaphore to allow
  159. # threads to indicate they want to remain in a particular state.
  160. self.lock.acquire()
  161. start = time.time()
  162. while not self.__current_state in states:
  163. # detect timeout:
  164. remainder = start + wait - time.time()
  165. if remainder > 0:
  166. self.lock.wait(remainder)
  167. else:
  168. self.lock.release()
  169. return False
  170. self.lock.release()
  171. return True
  172. def reset(self):
  173. # TODO need to lock before calling this?
  174. self.transition(self.__current_state, self.__default_state)
  175. def _set_state(self, state): #unsynchronized, only call internally after lock is acquired
  176. self.__current_state = state
  177. return state
  178. def current_state(self):
  179. '''
  180. Return the current state name.
  181. '''
  182. return self.__current_state
  183. def __getitem__(self, state):
  184. '''
  185. Non-blocking, non-synchronized test to determine if we are in the given state.
  186. Use `StateMachine.ensure(state)` to wait until the machine enters a certain state.
  187. '''
  188. return self.__current_state == state
  189. def __str__(self):
  190. return "".join(("StateMachine(", ','.join(self.__states), "): ", self.__current_state))
  191. class _StateCtx:
  192. def __init__(self, state_machine, from_state, to_state, wait):
  193. self.state_machine = state_machine
  194. self.from_state = from_state
  195. self.to_state = to_state
  196. self.wait = wait
  197. self._locked = False
  198. def __enter__(self):
  199. start = time.time()
  200. while not self.state_machine[self.from_state] or not self.state_machine.lock.acquire(False):
  201. # detect timeout:
  202. remainder = start + self.wait - time.time()
  203. if remainder > 0:
  204. self.state_machine.lock.wait(remainder)
  205. else:
  206. log.debug('StateMachine timeout while waiting for state: %s', self.from_state)
  207. return False
  208. self._locked = True # lock has been acquired at this point
  209. self.state_machine.lock.clear()
  210. log.debug('StateMachine entered context in state: %s',
  211. self.state_machine.current_state())
  212. return True
  213. def __exit__(self, exc_type, exc_val, exc_tb):
  214. if exc_val is not None:
  215. log.exception("StateMachine exception in context, remaining in state: %s\n%s:%s",
  216. self.state_machine.current_state(), exc_type.__name__, exc_val)
  217. if self._locked:
  218. if exc_val is None:
  219. log.debug(' ==== TRANSITION %s -> %s',
  220. self.state_machine.current_state(), self.to_state)
  221. self.state_machine._set_state(self.to_state)
  222. self.state_machine.lock.notify_all()
  223. self.state_machine.lock.release()
  224. return False # re-raise any exception
  225. if __name__ == '__main__':
  226. def callback(s, s2):
  227. print((1, s.transition('on', 'off', wait=0.0, func=callback, args=[s,s2])))
  228. print((2, s2.transition('off', 'on', func=callback, args=[s,s2])))
  229. return True
  230. s = StateMachine(('off', 'on'))
  231. s2 = StateMachine(('off', 'on'))
  232. print((3, s.transition('off', 'on', wait=0.0, func=callback, args=[s,s2]),))
  233. print((s.current_state(), s2.current_state()))