From 11d0a4dc4522a19275081aa36abdb967e6cbfed9 Mon Sep 17 00:00:00 2001 From: Ed Page Date: Tue, 15 Dec 2009 22:03:34 -0600 Subject: [PATCH] Starting the work on the polling state machine --- hand_tests/sm.py | 68 ++++++++++++++++++++ src/gvoice/state_machine.py | 144 +++++++++++++++++++++++++++++++++++++++++++ src/util/algorithms.py | 7 +++ 3 files changed, 219 insertions(+) create mode 100755 hand_tests/sm.py create mode 100644 src/gvoice/state_machine.py diff --git a/hand_tests/sm.py b/hand_tests/sm.py new file mode 100755 index 0000000..f3c1e3b --- /dev/null +++ b/hand_tests/sm.py @@ -0,0 +1,68 @@ +#!/usr/bin/env python + +import threading +import datetime +import time + +import gtk + +import sys +sys.path.insert(0,"../src") +import gvoice.state_machine as state_machine + + +class _I(object): + + def __init__(self, startTime): + self._startTime = startTime + + def update(self, force = False): + print "%s\t%r: force=%r" % (datetime.datetime.now() - self._startTime, self, force) + + +def loop(state): + + def actual(): + while state[0]: + gtk.main_iteration(block=False) + time.sleep(0.1) + + return actual + + +def main(): + startTime = datetime.datetime.now() + + state = [True] + mainLoop = threading.Thread(target=loop(state)) + mainLoop.setDaemon(False) + mainLoop.start() + try: + state_machine.StateMachine._IS_DAEMON = False + + initial = _I(startTime) + print "Initial:", initial + regular = _I(startTime) + print "Regular:", regular + + sm = state_machine.StateMachine([initial], [regular]) + print "Starting", datetime.datetime.now() - startTime + sm.start() + time.sleep(60.0) # seconds + print "Reseting timers", datetime.datetime.now() - startTime + sm.reset_timers() + time.sleep(60.0) # seconds + print "Switching to IDLE", datetime.datetime.now() - startTime + sm.set_state(state_machine.StateMachine.STATE_IDLE) + time.sleep(10.0) # seconds + print "Stopping", datetime.datetime.now() - startTime + sm.stop() + finally: + state[0] = False + + +if __name__ == "__main__": + print state_machine.StateMachine._INITIAL_ACTIVE_PERIOD + print state_machine.StateMachine._FINAL_ACTIVE_PERIOD + print state_machine.StateMachine._IDLE_PERIOD + main() diff --git a/src/gvoice/state_machine.py b/src/gvoice/state_machine.py new file mode 100644 index 0000000..09ed28b --- /dev/null +++ b/src/gvoice/state_machine.py @@ -0,0 +1,144 @@ +#!/usr/bin/env python + +import Queue +import threading +import logging + +import gobject + +import util.algorithms as algorithms +import util.coroutines as coroutines + +_moduleLogger = logging.getLogger("gvoice.state_machine") + + +def _to_milliseconds(**kwd): + if "milliseconds" in kwd: + return kwd["milliseconds"] + elif "seconds" in kwd: + return kwd["seconds"] * 1000 + elif "minutes" in kwd: + return kwd["minutes"] * 1000 * 60 + raise KeyError("Unknown arg: %r" % kwd) + + +class StateMachine(object): + + STATE_ACTIVE = "active" + STATE_IDLE = "idle" + STATE_DND = "dnd" + + _ACTION_UPDATE = "update" + _ACTION_RESET = "reset" + _ACTION_STOP = "stop" + + _INITIAL_ACTIVE_PERIOD = int(_to_milliseconds(seconds=5)) + _FINAL_ACTIVE_PERIOD = int(_to_milliseconds(minutes=2)) + _IDLE_PERIOD = int(_to_milliseconds(minutes=10)) + _INFINITE_PERIOD = -1 + + _IS_DAEMON = True + + def __init__(self, initItems, updateItems): + self._initItems = initItems + self._updateItems = updateItems + + self._actions = Queue.Queue() + self._state = self.STATE_ACTIVE + self._timeoutId = None + self._thread = None + self._currentPeriod = self._INITIAL_ACTIVE_PERIOD + self._set_initial_period() + + def start(self): + assert self._thread is None + self._thread = threading.Thread(target=self._run) + self._thread.setDaemon(self._IS_DAEMON) + self._thread.start() + + def stop(self): + if self._thread is not None: + self._actions.put(self._ACTION_STOP) + self._thread = None + else: + _moduleLogger.info("Stopping an already stopped state machine") + + def set_state(self, state): + self._state = state + self.reset_timers() + + def get_state(self): + return self._state + + def reset_timers(self): + self._actions.put(self._ACTION_RESET) + + @coroutines.func_sink + def request_reset_timers(self, args): + self.reset_timers() + + def _run(self): + for item in self._initItems: + try: + item.update() + except Exception: + _moduleLogger.exception("Initial update failed for %r" % item) + + # empty the task queue + actions = list(algorithms.itr_available(self._actions, initiallyBlock = False)) + self._schedule_update() + if len(self._updateItems) == 0: + self.stop() + + while True: + # block till we get a task, or get all the tasks if we were late + actions = list(algorithms.itr_available(self._actions, initiallyBlock = True)) + + if self._ACTION_STOP in actions: + self._stop_update() + break + elif self._ACTION_RESET in actions: + self._reset_timers() + elif self._ACTION_UPDATE in actions: + for item in self._updateItems: + try: + item.update(force=True) + except Exception: + _moduleLogger.exception("Update failed for %r" % item) + self._schedule_update() + + def _set_initial_period(self): + self._currentPeriod = self._INITIAL_ACTIVE_PERIOD / 2 # We will double it later + + def _reset_timers(self): + self._stop_update() + self._set_initial_period() + self._schedule_update() + + def _schedule_update(self): + nextTimeout = self._calculate_step(self._state, self._currentPeriod) + nextTimeout = int(nextTimeout) + if nextTimeout != self._INFINITE_PERIOD: + self._timeoutId = gobject.timeout_add(nextTimeout, self._on_timeout) + self._currentPeriod = nextTimeout + + def _stop_update(self): + if self._timeoutId is None: + return + gobject.source_remove(self._timeoutId) + self._timeoutId = None + + def _on_timeout(self): + self._actions.put(self._ACTION_UPDATE) + return False # do not continue + + @classmethod + def _calculate_step(cls, state, period): + if state == cls.STATE_ACTIVE: + return min(period * 2, cls._FINAL_ACTIVE_PERIOD) + elif state == cls.STATE_IDLE: + return cls._IDLE_PERIOD + elif state == cls.STATE_DND: + return cls._INFINITE_PERIOD + else: + raise RuntimeError("Unknown state: %r" % (state, )) diff --git a/src/util/algorithms.py b/src/util/algorithms.py index 040967a..7052b98 100644 --- a/src/util/algorithms.py +++ b/src/util/algorithms.py @@ -572,6 +572,13 @@ def pushback_itr(itr): maybePushedBack = yield item +def itr_available(queue, initiallyBlock = False): + if initiallyBlock: + yield queue.get() + while not queue.empty(): + yield queue.get_nowait() + + if __name__ == "__main__": import doctest print doctest.testmod() -- 1.7.9.5