X-Git-Url: https://vcs.maemo.org/git/?a=blobdiff_plain;f=src%2Futil%2Fconcurrent.py;fp=src%2Futil%2Fconcurrent.py;h=0000000000000000000000000000000000000000;hb=310d67249f9804fa36d35931080bf320f8a2d8a1;hp=f5f6e1dc0610cf0bbadfa1eba2ad0b4d5f28459f;hpb=64aad9d6539eb92b595b9e86485103999021cf05;p=ejpi diff --git a/src/util/concurrent.py b/src/util/concurrent.py deleted file mode 100644 index f5f6e1d..0000000 --- a/src/util/concurrent.py +++ /dev/null @@ -1,168 +0,0 @@ -#!/usr/bin/env python - -from __future__ import with_statement - -import os -import errno -import time -import functools -import contextlib -import logging - -import misc - - -_moduleLogger = logging.getLogger(__name__) - - -class AsyncTaskQueue(object): - - def __init__(self, taskPool): - self._asyncs = [] - self._taskPool = taskPool - - def add_async(self, func): - self.flush() - a = AsyncGeneratorTask(self._taskPool, func) - self._asyncs.append(a) - return a - - def flush(self): - self._asyncs = [a for a in self._asyncs if not a.isDone] - - -class AsyncGeneratorTask(object): - - def __init__(self, pool, func): - self._pool = pool - self._func = func - self._run = None - self._isDone = False - - @property - def isDone(self): - return self._isDone - - def start(self, *args, **kwds): - assert self._run is None, "Task already started" - self._run = self._func(*args, **kwds) - trampoline, args, kwds = self._run.send(None) # priming the function - self._pool.add_task( - trampoline, - args, - kwds, - self.on_success, - self.on_error, - ) - - @misc.log_exception(_moduleLogger) - def on_success(self, result): - _moduleLogger.debug("Processing success for: %r", self._func) - try: - trampoline, args, kwds = self._run.send(result) - except StopIteration, e: - self._isDone = True - else: - self._pool.add_task( - trampoline, - args, - kwds, - self.on_success, - self.on_error, - ) - - @misc.log_exception(_moduleLogger) - def on_error(self, error): - _moduleLogger.debug("Processing error for: %r", self._func) - try: - trampoline, args, kwds = self._run.throw(error) - except StopIteration, e: - self._isDone = True - else: - self._pool.add_task( - trampoline, - args, - kwds, - self.on_success, - self.on_error, - ) - - def __repr__(self): - return "" % (self._func.__name__, id(self)) - - def __hash__(self): - return hash(self._func) - - def __eq__(self, other): - return self._func == other._func - - def __ne__(self, other): - return self._func != other._func - - -def synchronized(lock): - """ - Synchronization decorator. - - >>> import misc - >>> misc.validate_decorator(synchronized(object())) - """ - - def wrap(f): - - @functools.wraps(f) - def newFunction(*args, **kw): - lock.acquire() - try: - return f(*args, **kw) - finally: - lock.release() - return newFunction - return wrap - - -@contextlib.contextmanager -def qlock(queue, gblock = True, gtimeout = None, pblock = True, ptimeout = None): - """ - Locking with a queue, good for when you want to lock an item passed around - - >>> import Queue - >>> item = 5 - >>> lock = Queue.Queue() - >>> lock.put(item) - >>> with qlock(lock) as i: - ... print i - 5 - """ - item = queue.get(gblock, gtimeout) - try: - yield item - finally: - queue.put(item, pblock, ptimeout) - - -@contextlib.contextmanager -def flock(path, timeout=-1): - WAIT_FOREVER = -1 - DELAY = 0.1 - timeSpent = 0 - - acquired = False - - while timeSpent <= timeout or timeout == WAIT_FOREVER: - try: - fd = os.open(path, os.O_CREAT | os.O_EXCL | os.O_RDWR) - acquired = True - break - except OSError, e: - if e.errno != errno.EEXIST: - raise - time.sleep(DELAY) - timeSpent += DELAY - - assert acquired, "Failed to grab file-lock %s within timeout %d" % (path, timeout) - - try: - yield fd - finally: - os.unlink(path)