Adding support for coroutines for async ops through trampolines
[theonering] / src / util / go_utils.py
index ef6cb72..38e20c2 100644 (file)
@@ -131,7 +131,7 @@ class Timeout(object):
                return False
 
 
-__QUEUE_EMPTY = object()
+_QUEUE_EMPTY = object()
 
 
 class AsyncPool(object):
@@ -151,28 +151,31 @@ class AsyncPool(object):
                self.__isRunning = False
                for _ in algorithms.itr_available(self.__workQueue):
                        pass # eat up queue to cut down dumb work
-               self.__workQueue.put(__QUEUE_EMPTY)
+               self.__workQueue.put(_QUEUE_EMPTY)
 
        def add_task(self, func, args, kwds, on_success, on_error):
                task = func, args, kwds, on_success, on_error
                self.__workQueue.put(task)
 
        @misc.log_exception(_moduleLogger)
-       def __trampoline_callback(self, callback, result):
-               if self.__isRunning:
-                       try:
-                               callback(result)
-                       except Exception:
-                               pass
-               else:
-                       _moduleLogger.info("Blocked call to %r" % (callback, ))
+       def __trampoline_callback(self, on_success, on_error, isError, result):
+               if not self.__isRunning:
+                       if isError:
+                               _moduleLogger.error("Masking: %s" % (result, ))
+                       isError = True
+                       result = StopIteration("Cancelling all callbacks")
+               callback = on_success if not isError else on_error
+               try:
+                       callback(result)
+               except Exception:
+                       pass
                return False
 
        @misc.log_exception(_moduleLogger)
        def __consume_queue(self):
                while True:
                        task = self.__workQueue.get()
-                       if task is __QUEUE_EMPTY:
+                       if task is _QUEUE_EMPTY:
                                break
                        func, args, kwds, on_success, on_error = task
 
@@ -184,8 +187,41 @@ class AsyncPool(object):
                                isError = True
                        self.__workQueue.task_done()
 
-                       callback = on_success if not isError else on_error
-                       gobject.idle_add(self.__trampoline_callback, callback, result)
+                       gobject.idle_add(self.__trampoline_callback, on_success, on_error, isError, result)
+
+
+class LinearExecution(object):
+
+       def __init__(self, func):
+               self._func = func
+               self._run = None
+
+       def start(self, *args, **kwds):
+               assert self._run is None
+               kwds["on_success"] = self.on_success
+               kwds["on_error"] = self.on_error
+               self._run = self._func(*args, **kwds)
+               trampoline, args, kwds = self._run.send(None) # priming the function
+               trampoline(*args, **kwds)
+
+       @misc.log_exception(_moduleLogger)
+       def on_success(self, *args, **kwds):
+               assert not kwds
+               try:
+                       trampoline, args, kwds = self._run.send(args)
+               except StopIteration, e:
+                       pass
+               else:
+                       trampoline(*args, **kwds)
+
+       @misc.log_exception(_moduleLogger)
+       def on_error(self, error):
+               try:
+                       trampoline, args, kwds = self._run.throw(error)
+               except StopIteration, e:
+                       pass
+               else:
+                       trampoline(*args, **kwds)
 
 
 def throttled(minDelay, queue):