3 from __future__ import with_statement
16 def synchronized(lock):
18 Synchronization decorator.
21 >>> misc.validate_decorator(synchronized(object()))
27 def newFunction(*args, **kw):
37 @contextlib.contextmanager
38 def qlock(queue, gblock = True, gtimeout = None, pblock = True, ptimeout = None):
40 Locking with a queue, good for when you want to lock an item passed around
44 >>> lock = Queue.Queue()
46 >>> with qlock(lock) as i:
50 item = queue.get(gblock, gtimeout)
54 queue.put(item, pblock, ptimeout)
57 @contextlib.contextmanager
58 def flock(path, timeout=-1):
65 while timeSpent <= timeout or timeout == WAIT_FOREVER:
67 fd = os.open(path, os.O_CREAT | os.O_EXCL | os.O_RDWR)
71 if e.errno != errno.EEXIST:
76 assert acquired, "Failed to grab file-lock %s within timeout %d" % (path, timeout)
86 This decorator calls the method in a new thread, so execution returns straight away
89 >>> misc.validate_decorator(threaded)
93 def wrapper(*args, **kwargs):
94 t = threading.Thread(target=f, args=args, kwargs=kwargs)
102 Fork a function into a seperate process and block on it, for forcing reclaiming of resources for highly intensive functions
103 @return The original value through pickling. If it is unable to be pickled, then the pickling exception is passed through
104 @throws Through pickling, exceptions are passed back and re-raised
105 @note source: http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/511474
108 >>> misc.validate_decorator(fork)
112 def wrapper(*args, **kwds):
113 pread, pwrite = os.pipe()
117 with os.fdopen(pread, 'rb') as f:
118 status, result = cPickle.load(f)
127 result = f(*args, **kwds)
129 except Exception, exc:
132 with os.fdopen(pwrite, 'wb') as f:
134 cPickle.dump((status, result), f, cPickle.HIGHEST_PROTOCOL)
135 except cPickle.PicklingError, exc:
136 cPickle.dump((2, exc), f, cPickle.HIGHEST_PROTOCOL)
142 @contextlib.contextmanager
143 def qlock(queue, gblock = True, gtimeout = None, pblock = True, ptimeout = None):
145 Locking with a queue, good for when you want to lock an item passed around
149 >>> lock = Queue.Queue()
151 >>> with qlock(lock) as i:
155 item = queue.get(gblock, gtimeout)
157 queue.put(item, pblock, ptimeout)
160 class EventSource(object):
162 Asynchronous implementation of the observer pattern
164 >>> sourceRoot = EventSource()
165 >>> sourceChild1 = EventSource()
166 >>> sourceChild1.register_provided_events("1-event-0", "1-event-1")
167 >>> sourceChild2 = EventSource()
168 >>> sourceChild2.register_provided_events("1-event-0", "1-event-1")
169 >>> sourceRoot.add_children(sourceChild1, sourceChild2)
174 @warning Not thread safe
177 self.__callbackQueues = {}
180 def add_children(self, *childrenSources):
182 @warning Not thread safe
185 self.__children.extend(childrenSources)
187 def remove_children(self, *childrenSources):
189 @warning Not thread safe
192 for child in childrenSources:
193 self.__children.remove(child)
195 def register_provided_events(self, *events):
197 @warning Not thread safe
200 self.__callbackQueues.update(dict((event, []) for event in events))
202 def notify_observers(self, event, message):
204 @warning As threadsafe as the queue used. qlock is recommended for the message if it needs locking
207 for queue in self.__callbackQueues[event]:
210 def _register_queue(self, event, queue):
212 @warning Not thread safe
215 if event in self.__callbackQueues:
216 self.__callbackQueues[event].append(queue)
219 for child in self.__children:
220 source = child._register_queue(event, queue)
221 if source is not None:
226 def _unregister_queue(self, event, queue):
228 @warning Not thread safe
231 if event in self.__callbackQueues:
232 self.__callbackQueues[event].remove(queue)
235 for child in self.__children:
236 source = child._unregister_queue(event, queue)
237 if source is not None:
243 class StrongEventSourceProxy(object):
245 def __init__(self, source):
247 @warning Not thread safe
252 def register(self, event, queue):
254 @warning Not thread safe
257 actualSource = self.source._register_queue(event, queue)
258 ActualType = type(self)
259 return ActualType(actualSource)
261 def unregister(self, event, queue):
263 @warning Not thread safe
266 actualSource = self.source._unregister_queue(event, queue)
267 ActualType = type(self)
268 return ActualType(actualSource)
271 class WeakEventSourceProxy(object):
273 def __init__(self, source):
275 @warning Not thread safe
278 self.source = weakref.ref(source)
280 def register(self, event, queue):
282 @warning Not thread safe
285 actualSource = self.source()._register_queue(event, queue)
286 ActualType = type(self)
287 return ActualType(actualSource)
289 def unregister(self, event, queue):
291 @warning Not thread safe
294 actualSource = self.source()._unregister_queue(event, queue)
295 ActualType = type(self)
296 return ActualType(actualSource)
299 class EventObserver(object):
303 >>> class Observer(EventObserver):
304 ... def connect_to_source(self, eventSourceRoot):
305 ... self.queue = Queue.Queue()
306 ... self.source = eventSourceRoot.register("1-event-0", self.queue)
308 >>> sourceRoot = EventSource()
309 >>> sourceChild1 = EventSource()
310 >>> sourceChild1.register_provided_events("1-event-0", "1-event-1")
311 >>> sourceChild2 = EventSource()
312 >>> sourceChild2.register_provided_events("1-event-0", "1-event-1")
313 >>> sourceRoot.add_children(sourceChild1, sourceChild2)
316 >>> o1.connect_to_source(StrongEventSourceProxy(sourceRoot))
318 >>> o2.connect_to_source(WeakEventSourceProxy(sourceRoot))
320 >>> sourceChild1.notify_observers("1-event-0", "Hello World")
321 >>> o1.queue.get(False)
323 >>> o2.queue.get(False)
327 def connect_to_source(self, eventSourceRoot):
328 raise NotImplementedError