3 from __future__ import with_statement
14 def synchronized(lock):
16 Synchronization decorator.
19 >>> misc.validate_decorator(synchronized(object()))
25 def newFunction(*args, **kw):
37 This decorator calls the method in a new thread, so execution returns straight away
40 >>> misc.validate_decorator(threaded)
44 def wrapper(*args, **kwargs):
45 t = threading.Thread(target=f, args=args, kwargs=kwargs)
53 Fork a function into a seperate process and block on it, for forcing reclaiming of resources for highly intensive functions
54 @return The original value through pickling. If it is unable to be pickled, then the pickling exception is passed through
55 @throws Through pickling, exceptions are passed back and re-raised
56 @note source: http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/511474
59 >>> misc.validate_decorator(fork)
63 def wrapper(*args, **kwds):
64 pread, pwrite = os.pipe()
68 with os.fdopen(pread, 'rb') as f:
69 status, result = cPickle.load(f)
78 result = f(*args, **kwds)
80 except Exception, exc:
83 with os.fdopen(pwrite, 'wb') as f:
85 cPickle.dump((status, result), f, cPickle.HIGHEST_PROTOCOL)
86 except cPickle.PicklingError, exc:
87 cPickle.dump((2, exc), f, cPickle.HIGHEST_PROTOCOL)
93 @contextlib.contextmanager
94 def qlock(queue, gblock = True, gtimeout = None, pblock = True, ptimeout = None):
96 Locking with a queue, good for when you want to lock an item passed around
99 >>> lock = Queue.Queue()
101 >>> with qlock(lock) as i:
105 item = queue.get(gblock, gtimeout)
107 queue.put(item, pblock, ptimeout)
110 class EventSource(object):
112 Asynchronous implementation of the observer pattern
114 >>> sourceRoot = EventSource()
115 >>> sourceChild1 = EventSource()
116 >>> sourceChild1.register_provided_events("1-event-0", "1-event-1")
117 >>> sourceChild2 = EventSource()
118 >>> sourceChild2.register_provided_events("1-event-0", "1-event-1")
119 >>> sourceRoot.add_children(sourceChild1, sourceChild2)
124 @warning Not thread safe
127 self.__callbackQueues = {}
130 def add_children(self, *childrenSources):
132 @warning Not thread safe
135 self.__children.extend(childrenSources)
137 def remove_children(self, *childrenSources):
139 @warning Not thread safe
142 for child in childrenSources:
143 self.__children.remove(child)
145 def register_provided_events(self, *events):
147 @warning Not thread safe
150 self.__callbackQueues.update(dict((event, []) for event in events))
152 def notify_observers(self, event, message):
154 @warning As threadsafe as the queue used. qlock is recommended for the message if it needs locking
157 for queue in self.__callbackQueues[event]:
160 def _register_queue(self, event, queue):
162 @warning Not thread safe
165 if event in self.__callbackQueues:
166 self.__callbackQueues[event].append(queue)
169 for child in self.__children:
170 source = child._register_queue(event, queue)
171 if source is not None:
176 def _unregister_queue(self, event, queue):
178 @warning Not thread safe
181 if event in self.__callbackQueues:
182 self.__callbackQueues[event].remove(queue)
185 for child in self.__children:
186 source = child._unregister_queue(event, queue)
187 if source is not None:
193 class StrongEventSourceProxy(object):
195 def __init__(self, source):
197 @warning Not thread safe
202 def register(self, event, queue):
204 @warning Not thread safe
207 actualSource = self.source._register_queue(event, queue)
208 ActualType = type(self)
209 return ActualType(actualSource)
211 def unregister(self, event, queue):
213 @warning Not thread safe
216 actualSource = self.source._unregister_queue(event, queue)
217 ActualType = type(self)
218 return ActualType(actualSource)
221 class WeakEventSourceProxy(object):
223 def __init__(self, source):
225 @warning Not thread safe
228 self.source = weakref.ref(source)
230 def register(self, event, queue):
232 @warning Not thread safe
235 actualSource = self.source()._register_queue(event, queue)
236 ActualType = type(self)
237 return ActualType(actualSource)
239 def unregister(self, event, queue):
241 @warning Not thread safe
244 actualSource = self.source()._unregister_queue(event, queue)
245 ActualType = type(self)
246 return ActualType(actualSource)
249 class EventObserver(object):
253 >>> class Observer(EventObserver):
254 ... def connect_to_source(self, eventSourceRoot):
255 ... self.queue = Queue.Queue()
256 ... self.source = eventSourceRoot.register("1-event-0", self.queue)
258 >>> sourceRoot = EventSource()
259 >>> sourceChild1 = EventSource()
260 >>> sourceChild1.register_provided_events("1-event-0", "1-event-1")
261 >>> sourceChild2 = EventSource()
262 >>> sourceChild2.register_provided_events("1-event-0", "1-event-1")
263 >>> sourceRoot.add_children(sourceChild1, sourceChild2)
266 >>> o1.connect_to_source(StrongEventSourceProxy(sourceRoot))
268 >>> o2.connect_to_source(WeakEventSourceProxy(sourceRoot))
270 >>> sourceChild1.notify_observers("1-event-0", "Hello World")
271 >>> o1.queue.get(False)
273 >>> o2.queue.get(False)
277 def connect_to_source(self, eventSourceRoot):
278 raise NotImplementedError