3 from __future__ import with_statement
17 _moduleLogger = logging.getLogger(__name__)
22 Decorator that makes a generator-function into a function that will continue execution on next call
26 @functools.wraps(func)
27 def decorated_func(*args, **kwds):
29 a.append(func(*args, **kwds))
42 Make a function mainloop friendly. the function will be called at the
43 next mainloop idle state.
46 >>> misc.validate_decorator(async)
49 @functools.wraps(func)
50 def new_function(*args, **kwargs):
56 gobject.idle_add(async_function)
63 def __init__(self, func, once = True):
69 assert self.__idleId is None
71 self.__idleId = gobject.idle_add(self._on_once)
73 self.__idleId = gobject.idle_add(self.__func)
76 return self.__idleId is not None
79 if self.__idleId is not None:
80 gobject.source_remove(self.__idleId)
86 @misc.log_exception(_moduleLogger)
96 class Timeout(object):
98 def __init__(self, func, once = True):
100 self.__timeoutId = None
103 def start(self, **kwds):
104 assert self.__timeoutId is None
106 callback = self._on_once if self.__once else self.__func
108 assert len(kwds) == 1
109 timeoutInSeconds = kwds["seconds"]
110 assert 0 <= timeoutInSeconds
112 if timeoutInSeconds == 0:
113 self.__timeoutId = gobject.idle_add(callback)
115 self.__timeoutId = timeout_add_seconds(timeoutInSeconds, callback)
117 def is_running(self):
118 return self.__timeoutId is not None
121 if self.__timeoutId is not None:
122 gobject.source_remove(self.__timeoutId)
123 self.__timeoutId = None
125 def __call__(self, **kwds):
126 return self.start(**kwds)
128 @misc.log_exception(_moduleLogger)
138 _QUEUE_EMPTY = object()
141 class AsyncPool(object):
144 self.__workQueue = Queue.Queue()
145 self.__thread = threading.Thread(
146 name = type(self).__name__,
147 target = self.__consume_queue,
149 self.__isRunning = True
152 self.__thread.start()
155 self.__isRunning = False
156 for _ in algorithms.itr_available(self.__workQueue):
157 pass # eat up queue to cut down dumb work
158 self.__workQueue.put(_QUEUE_EMPTY)
160 def clear_tasks(self):
161 for _ in algorithms.itr_available(self.__workQueue):
162 pass # eat up queue to cut down dumb work
164 def add_task(self, func, args, kwds, on_success, on_error):
165 task = func, args, kwds, on_success, on_error
166 self.__workQueue.put(task)
168 @misc.log_exception(_moduleLogger)
169 def __trampoline_callback(self, on_success, on_error, isError, result):
170 if not self.__isRunning:
172 _moduleLogger.error("Masking: %s" % (result, ))
174 result = StopIteration("Cancelling all callbacks")
175 callback = on_success if not isError else on_error
179 _moduleLogger.exception("Callback errored")
182 @misc.log_exception(_moduleLogger)
183 def __consume_queue(self):
185 task = self.__workQueue.get()
186 if task is _QUEUE_EMPTY:
188 func, args, kwds, on_success, on_error = task
191 result = func(*args, **kwds)
194 _moduleLogger.exception("Error, passing it back to the main thread")
197 self.__workQueue.task_done()
199 gobject.idle_add(self.__trampoline_callback, on_success, on_error, isError, result)
200 _moduleLogger.debug("Shutting down worker thread")
203 class AsyncLinearExecution(object):
205 def __init__(self, pool, func):
210 def start(self, *args, **kwds):
211 assert self._run is None
212 self._run = self._func(*args, **kwds)
213 trampoline, args, kwds = self._run.send(None) # priming the function
222 @misc.log_exception(_moduleLogger)
223 def on_success(self, result):
224 #_moduleLogger.debug("Processing success for: %r", self._func)
226 trampoline, args, kwds = self._run.send(result)
227 except StopIteration, e:
238 @misc.log_exception(_moduleLogger)
239 def on_error(self, error):
240 #_moduleLogger.debug("Processing error for: %r", self._func)
242 trampoline, args, kwds = self._run.throw(error)
243 except StopIteration, e:
255 class AutoSignal(object):
257 def __init__(self, toplevel):
258 self.__disconnectPool = []
259 toplevel.connect("destroy", self.__on_destroy)
261 def connect_auto(self, widget, *args):
262 id = widget.connect(*args)
263 self.__disconnectPool.append((widget, id))
265 @misc.log_exception(_moduleLogger)
266 def __on_destroy(self, widget):
267 _moduleLogger.info("Destroy: %r (%s to clean up)" % (self, len(self.__disconnectPool)))
268 for widget, id in self.__disconnectPool:
269 widget.disconnect(id)
270 del self.__disconnectPool[:]
273 def throttled(minDelay, queue):
275 Throttle the calls to a function by queueing all the calls that happen
276 before the minimum delay
280 >>> misc.validate_decorator(throttled(0, Queue.Queue()))
283 def actual_decorator(func):
285 lastCallTime = [None]
289 func, args, kwargs = queue.pop(0)
290 lastCallTime[0] = time.time() * 1000
291 func(*args, **kwargs)
294 @functools.wraps(func)
295 def new_function(*args, **kwargs):
296 now = time.time() * 1000
298 lastCallTime[0] is None or
299 (now - lastCallTime >= minDelay)
301 lastCallTime[0] = now
302 func(*args, **kwargs)
304 queue.append((func, args, kwargs))
305 lastCallDelta = now - lastCallTime[0]
306 processQueueTimeout = int(minDelay * len(queue) - lastCallDelta)
307 gobject.timeout_add(processQueueTimeout, process_queue)
311 return actual_decorator
314 def _old_timeout_add_seconds(timeout, callback):
315 return gobject.timeout_add(timeout * 1000, callback)
318 def _timeout_add_seconds(timeout, callback):
319 return gobject.timeout_add_seconds(timeout, callback)
323 gobject.timeout_add_seconds
324 timeout_add_seconds = _timeout_add_seconds
325 except AttributeError:
326 timeout_add_seconds = _old_timeout_add_seconds