3 from __future__ import with_statement
14 def synchronized(lock):
16 Synchronization decorator.
19 >>> misc.validate_decorator(synchronized(object()))
25 def newFunction(*args, **kw):
37 This decorator calls the method in a new thread, so execution returns straight away
40 >>> misc.validate_decorator(threaded)
44 def wrapper(*args, **kwargs):
45 t = threading.Thread(target=f, args=args, kwargs=kwargs)
53 Fork a function into a seperate process and block on it, for forcing reclaiming of resources for highly intensive functions
54 @return The original value through pickling. If it is unable to be pickled, then the pickling exception is passed through
55 @throws Through pickling, exceptions are passed back and re-raised
56 @note source: http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/511474
59 >>> misc.validate_decorator(fork)
63 def wrapper(*args, **kwds):
64 pread, pwrite = os.pipe()
68 with os.fdopen(pread, 'rb') as f:
69 status, result = cPickle.load(f)
78 result = f(*args, **kwds)
80 except Exception, exc:
83 with os.fdopen(pwrite, 'wb') as f:
85 cPickle.dump((status, result), f, cPickle.HIGHEST_PROTOCOL)
86 except cPickle.PicklingError, exc:
87 cPickle.dump((2, exc), f, cPickle.HIGHEST_PROTOCOL)
93 @contextlib.contextmanager
94 def qlock(queue, gblock = True, gtimeout = None, pblock = True, ptimeout = None):
96 Locking with a queue, good for when you want to lock an item passed around
100 >>> lock = Queue.Queue()
102 >>> with qlock(lock) as i:
106 item = queue.get(gblock, gtimeout)
108 queue.put(item, pblock, ptimeout)
111 class EventSource(object):
113 Asynchronous implementation of the observer pattern
115 >>> sourceRoot = EventSource()
116 >>> sourceChild1 = EventSource()
117 >>> sourceChild1.register_provided_events("1-event-0", "1-event-1")
118 >>> sourceChild2 = EventSource()
119 >>> sourceChild2.register_provided_events("1-event-0", "1-event-1")
120 >>> sourceRoot.add_children(sourceChild1, sourceChild2)
125 @warning Not thread safe
128 self.__callbackQueues = {}
131 def add_children(self, *childrenSources):
133 @warning Not thread safe
136 self.__children.extend(childrenSources)
138 def remove_children(self, *childrenSources):
140 @warning Not thread safe
143 for child in childrenSources:
144 self.__children.remove(child)
146 def register_provided_events(self, *events):
148 @warning Not thread safe
151 self.__callbackQueues.update(dict((event, []) for event in events))
153 def notify_observers(self, event, message):
155 @warning As threadsafe as the queue used. qlock is recommended for the message if it needs locking
158 for queue in self.__callbackQueues[event]:
161 def _register_queue(self, event, queue):
163 @warning Not thread safe
166 if event in self.__callbackQueues:
167 self.__callbackQueues[event].append(queue)
170 for child in self.__children:
171 source = child._register_queue(event, queue)
172 if source is not None:
177 def _unregister_queue(self, event, queue):
179 @warning Not thread safe
182 if event in self.__callbackQueues:
183 self.__callbackQueues[event].remove(queue)
186 for child in self.__children:
187 source = child._unregister_queue(event, queue)
188 if source is not None:
194 class StrongEventSourceProxy(object):
196 def __init__(self, source):
198 @warning Not thread safe
203 def register(self, event, queue):
205 @warning Not thread safe
208 actualSource = self.source._register_queue(event, queue)
209 ActualType = type(self)
210 return ActualType(actualSource)
212 def unregister(self, event, queue):
214 @warning Not thread safe
217 actualSource = self.source._unregister_queue(event, queue)
218 ActualType = type(self)
219 return ActualType(actualSource)
222 class WeakEventSourceProxy(object):
224 def __init__(self, source):
226 @warning Not thread safe
229 self.source = weakref.ref(source)
231 def register(self, event, queue):
233 @warning Not thread safe
236 actualSource = self.source()._register_queue(event, queue)
237 ActualType = type(self)
238 return ActualType(actualSource)
240 def unregister(self, event, queue):
242 @warning Not thread safe
245 actualSource = self.source()._unregister_queue(event, queue)
246 ActualType = type(self)
247 return ActualType(actualSource)
250 class EventObserver(object):
254 >>> class Observer(EventObserver):
255 ... def connect_to_source(self, eventSourceRoot):
256 ... self.queue = Queue.Queue()
257 ... self.source = eventSourceRoot.register("1-event-0", self.queue)
259 >>> sourceRoot = EventSource()
260 >>> sourceChild1 = EventSource()
261 >>> sourceChild1.register_provided_events("1-event-0", "1-event-1")
262 >>> sourceChild2 = EventSource()
263 >>> sourceChild2.register_provided_events("1-event-0", "1-event-1")
264 >>> sourceRoot.add_children(sourceChild1, sourceChild2)
267 >>> o1.connect_to_source(StrongEventSourceProxy(sourceRoot))
269 >>> o2.connect_to_source(WeakEventSourceProxy(sourceRoot))
271 >>> sourceChild1.notify_observers("1-event-0", "Hello World")
272 >>> o1.queue.get(False)
274 >>> o2.queue.get(False)
278 def connect_to_source(self, eventSourceRoot):
279 raise NotImplementedError