X-Git-Url: http://vcs.maemo.org/git/?a=blobdiff_plain;f=src%2Futil%2Fconcurrent.py;h=a6499feb86118e2a3c0fd95bf96faa599a290421;hb=52a003279ee401971e6e2e863fe51db208e7b191;hp=503a1b444013b448cd41f2b43dbbc093bc2b3c35;hpb=57b08f900cc042ef06b5f7592a157249c1845f17;p=ejpi diff --git a/src/util/concurrent.py b/src/util/concurrent.py index 503a1b4..a6499fe 100644 --- a/src/util/concurrent.py +++ b/src/util/concurrent.py @@ -7,6 +7,76 @@ import errno import time import functools import contextlib +import logging + +import misc + + +_moduleLogger = logging.getLogger(__name__) + + +class AsyncLinearExecution(object): + + def __init__(self, pool, func): + self._pool = pool + self._func = func + self._run = None + + def start(self, *args, **kwds): + assert self._run is None, "Task already started" + self._run = self._func(*args, **kwds) + trampoline, args, kwds = self._run.send(None) # priming the function + self._pool.add_task( + trampoline, + args, + kwds, + self.on_success, + self.on_error, + ) + + @misc.log_exception(_moduleLogger) + def on_success(self, result): + _moduleLogger.debug("Processing success for: %r", self._func) + try: + trampoline, args, kwds = self._run.send(result) + except StopIteration, e: + pass + else: + self._pool.add_task( + trampoline, + args, + kwds, + self.on_success, + self.on_error, + ) + + @misc.log_exception(_moduleLogger) + def on_error(self, error): + _moduleLogger.debug("Processing error for: %r", self._func) + try: + trampoline, args, kwds = self._run.throw(error) + except StopIteration, e: + pass + else: + self._pool.add_task( + trampoline, + args, + kwds, + self.on_success, + self.on_error, + ) + + def __repr__(self): + return "" % (self._func.__name__, id(self)) + + def __hash__(self): + return hash(self._func) + + def __eq__(self, other): + return self._func == other._func + + def __ne__(self, other): + return self._func != other._func def synchronized(lock):