X-Git-Url: https://vcs.maemo.org/git/?a=blobdiff_plain;f=src%2Futil%2Fconcurrent.py;h=f5f6e1dc0610cf0bbadfa1eba2ad0b4d5f28459f;hb=64aad9d6539eb92b595b9e86485103999021cf05;hp=a6499feb86118e2a3c0fd95bf96faa599a290421;hpb=11d6763a72bd261bba8461fa1171504ae908bea4;p=ejpi diff --git a/src/util/concurrent.py b/src/util/concurrent.py index a6499fe..f5f6e1d 100644 --- a/src/util/concurrent.py +++ b/src/util/concurrent.py @@ -15,12 +15,33 @@ import misc _moduleLogger = logging.getLogger(__name__) -class AsyncLinearExecution(object): +class AsyncTaskQueue(object): + + def __init__(self, taskPool): + self._asyncs = [] + self._taskPool = taskPool + + def add_async(self, func): + self.flush() + a = AsyncGeneratorTask(self._taskPool, func) + self._asyncs.append(a) + return a + + def flush(self): + self._asyncs = [a for a in self._asyncs if not a.isDone] + + +class AsyncGeneratorTask(object): def __init__(self, pool, func): self._pool = pool self._func = func self._run = None + self._isDone = False + + @property + def isDone(self): + return self._isDone def start(self, *args, **kwds): assert self._run is None, "Task already started" @@ -40,7 +61,7 @@ class AsyncLinearExecution(object): try: trampoline, args, kwds = self._run.send(result) except StopIteration, e: - pass + self._isDone = True else: self._pool.add_task( trampoline, @@ -56,7 +77,7 @@ class AsyncLinearExecution(object): try: trampoline, args, kwds = self._run.throw(error) except StopIteration, e: - pass + self._isDone = True else: self._pool.add_task( trampoline,