3 from __future__ import with_statement
15 @contextlib.contextmanager
16 def flock(path, timeout=-1):
23 while timeSpent <= timeout or timeout == WAIT_FOREVER:
25 fd = os.open(path, os.O_CREAT | os.O_EXCL | os.O_RDWR)
29 if e.errno != errno.EEXIST:
34 assert acquired, "Failed to grab file-lock %s within timeout %d" % (path, timeout)
44 Decorator that makes a generator-function into a function that will continue execution on next call
48 @functools.wraps(func)
49 def decorated_func(*args, **kwds):
51 a.append(func(*args, **kwds))
65 ... def grep_sink(pattern):
66 ... print "Looking for %s" % pattern
69 ... if pattern in line:
71 >>> g = grep_sink("python")
73 >>> g.send("Yeah but no but yeah but no")
74 >>> g.send("A series of tubes")
75 >>> g.send("python generators rock!")
76 python generators rock!
80 @functools.wraps(func)
81 def start(*args, **kwargs):
82 cr = func(*args, **kwargs)
90 def printer_sink(format = "%s"):
92 >>> pr = printer_sink("%r")
99 >>> p = printer_sink()
104 >>> # p.throw(RuntimeError, "Goodbye")
110 print format % (item, )
116 Good for uses like with cochain to pick up any slack
123 def comap(function, target):
125 >>> p = printer_sink()
126 >>> cm = comap(lambda x: x+1, p)
137 mappedItem = function(*item)
138 target.send(mappedItem)
140 logging.exception("Forwarding exception!")
141 target.throw(e.__class__, str(e))
144 def _flush_queue(queue):
145 while not queue.empty():
150 def queue_sink(queue):
152 >>> q = Queue.Queue()
153 >>> qs = queue_sink(q)
156 >>> qs.throw(RuntimeError, "Goodbye")
159 >>> print [i for i in _flush_queue(q)]
160 [(None, 'Hello'), (None, 'World'), (<type 'exceptions.RuntimeError'>, 'Goodbye'), (None, 'Meh'), (<type 'exceptions.GeneratorExit'>, None)]
165 queue.put((None, item))
167 queue.put((e.__class__, str(e)))
168 except GeneratorExit:
169 queue.put((GeneratorExit, None))
173 def decode_item(item, target):
177 elif item[0] is GeneratorExit:
181 target.throw(item[0], item[1])
185 def nonqueue_source(queue, target):
189 isDone = decode_item(item, target)
190 while not queue.empty():
194 def threaded_stage(target, thread_factory = threading.Thread):
195 messages = Queue.Queue()
197 run_source = functools.partial(nonqueue_source, messages, target)
198 thread = thread_factory(target=run_source)
199 thread.setDaemon(True)
202 # Sink running in current thread
203 return queue_sink(messages)
206 def safecall(f, errorDisplay=None, default=None, exception=Exception):
208 Returns modified f. When the modified f is called and throws an
209 exception, the default value is returned
211 def _safecall(*args, **argv):
213 return f(*args,**argv)
215 if errorDisplay is not None:
216 errorDisplay.push_exception(e)