From: Ed Page Date: Wed, 17 Mar 2010 01:42:28 +0000 (-0500) Subject: Adding an async pool to try and fix things up X-Git-Url: https://vcs.maemo.org/git/?a=commitdiff_plain;h=f9d1c94dd7f8fbb750f14c8183dd85e04a1cd6cf;hp=f7ab8d4307b8418dec52a4dbaa82d72c3cd81a36;p=theonering Adding an async pool to try and fix things up --- diff --git a/src/util/go_utils.py b/src/util/go_utils.py index 33ddfb5..ef6cb72 100644 --- a/src/util/go_utils.py +++ b/src/util/go_utils.py @@ -4,14 +4,17 @@ from __future__ import with_statement import time import functools +import threading +import Queue import logging import gobject +import algorithms import misc -_moduleLogger = logging.getLogger("go_utils") +_moduleLogger = logging.getLogger(__name__) def make_idler(func): @@ -85,8 +88,9 @@ class Async(object): self.cancel() try: self.__func() - finally: - return False + except Exception: + pass + return False class Timeout(object): @@ -122,8 +126,66 @@ class Timeout(object): self.cancel() try: self.__func() - finally: - return False + except Exception: + pass + return False + + +__QUEUE_EMPTY = object() + + +class AsyncPool(object): + + def __init__(self): + self.__workQueue = Queue.Queue() + self.__thread = threading.Thread( + name = type(self).__name__, + target = self.__consume_queue, + ) + self.__isRunning = True + + def start(self): + self.__thread.start() + + def stop(self): + self.__isRunning = False + for _ in algorithms.itr_available(self.__workQueue): + pass # eat up queue to cut down dumb work + self.__workQueue.put(__QUEUE_EMPTY) + + def add_task(self, func, args, kwds, on_success, on_error): + task = func, args, kwds, on_success, on_error + self.__workQueue.put(task) + + @misc.log_exception(_moduleLogger) + def __trampoline_callback(self, callback, result): + if self.__isRunning: + try: + callback(result) + except Exception: + pass + else: + _moduleLogger.info("Blocked call to %r" % (callback, )) + return False + + @misc.log_exception(_moduleLogger) + def __consume_queue(self): + while True: + task = self.__workQueue.get() + if task is __QUEUE_EMPTY: + break + func, args, kwds, on_success, on_error = task + + try: + result = func(*args, **kwds) + isError = False + except Exception, e: + result = e + isError = True + self.__workQueue.task_done() + + callback = on_success if not isError else on_error + gobject.idle_add(self.__trampoline_callback, callback, result) def throttled(minDelay, queue):