Bringing doctests upto snuff with Python26
[ejpi] / src / libraries / recipes / concurrent.py
1 #!/usr/bin/env python
2
3 from __future__ import with_statement
4
5 import os
6 import sys
7 import cPickle
8 import weakref
9 import threading
10 import functools
11 import contextlib
12
13
14 def synchronized(lock):
15         """
16         Synchronization decorator.
17
18         >>> import misc
19         >>> misc.validate_decorator(synchronized(object()))
20         """
21
22         def wrap(f):
23
24                 @functools.wraps(f)
25                 def newFunction(*args, **kw):
26                         lock.acquire()
27                         try:
28                                 return f(*args, **kw)
29                         finally:
30                                 lock.release()
31                 return newFunction
32         return wrap
33
34
35 def threaded(f):
36         """
37         This decorator calls the method in a new thread, so execution returns straight away
38
39         >>> import misc
40         >>> misc.validate_decorator(threaded)
41         """
42
43         @functools.wraps(f)
44         def wrapper(*args, **kwargs):
45                 t = threading.Thread(target=f, args=args, kwargs=kwargs)
46                 t.setDaemon(True)
47                 t.start()
48         return wrapper
49
50
51 def fork(f):
52         """
53         Fork a function into a seperate process and block on it, for forcing reclaiming of resources for highly intensive functions
54         @return The original value through pickling.  If it is unable to be pickled, then the pickling exception is passed through
55         @throws Through pickling, exceptions are passed back and re-raised
56         @note source: http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/511474
57
58         >>> import misc
59         >>> misc.validate_decorator(fork)
60         """
61
62         @functools.wraps(f)
63         def wrapper(*args, **kwds):
64                 pread, pwrite = os.pipe()
65                 pid = os.fork()
66                 if pid > 0:
67                         os.close(pwrite)
68                         with os.fdopen(pread, 'rb') as f:
69                                 status, result = cPickle.load(f)
70                         os.waitpid(pid, 0)
71                         if status == 0:
72                                 return result
73                         else:
74                                 raise result
75                 else:
76                         os.close(pread)
77                         try:
78                                 result = f(*args, **kwds)
79                                 status = 0
80                         except Exception, exc:
81                                 result = exc
82                                 status = 1
83                         with os.fdopen(pwrite, 'wb') as f:
84                                 try:
85                                         cPickle.dump((status, result), f, cPickle.HIGHEST_PROTOCOL)
86                                 except cPickle.PicklingError, exc:
87                                         cPickle.dump((2, exc), f, cPickle.HIGHEST_PROTOCOL)
88                         f.close()
89                         sys.exit(0)
90         return wrapper
91
92
93 @contextlib.contextmanager
94 def qlock(queue, gblock = True, gtimeout = None, pblock = True, ptimeout = None):
95         """
96         Locking with a queue, good for when you want to lock an item passed around
97
98         >>> import Queue
99         >>> item = 5
100         >>> lock = Queue.Queue()
101         >>> lock.put(item)
102         >>> with qlock(lock) as i:
103         ...     print i
104         5
105         """
106         item = queue.get(gblock, gtimeout)
107         yield item
108         queue.put(item, pblock, ptimeout)
109
110
111 class EventSource(object):
112         """
113         Asynchronous implementation of the observer pattern
114
115         >>> sourceRoot = EventSource()
116         >>> sourceChild1 = EventSource()
117         >>> sourceChild1.register_provided_events("1-event-0", "1-event-1")
118         >>> sourceChild2 = EventSource()
119         >>> sourceChild2.register_provided_events("1-event-0", "1-event-1")
120         >>> sourceRoot.add_children(sourceChild1, sourceChild2)
121         """
122
123         def __init__(self):
124                 """
125                 @warning Not thread safe
126                 """
127
128                 self.__callbackQueues = {}
129                 self.__children = []
130
131         def add_children(self, *childrenSources):
132                 """
133                 @warning Not thread safe
134                 """
135
136                 self.__children.extend(childrenSources)
137
138         def remove_children(self, *childrenSources):
139                 """
140                 @warning Not thread safe
141                 """
142
143                 for child in childrenSources:
144                         self.__children.remove(child)
145
146         def register_provided_events(self, *events):
147                 """
148                 @warning Not thread safe
149                 """
150
151                 self.__callbackQueues.update(dict((event, []) for event in events))
152
153         def notify_observers(self, event, message):
154                 """
155                 @warning As threadsafe as the queue used.  qlock is recommended for the message if it needs locking
156                 """
157
158                 for queue in self.__callbackQueues[event]:
159                         queue.put(message)
160
161         def _register_queue(self, event, queue):
162                 """
163                 @warning Not thread safe
164                 """
165
166                 if event in self.__callbackQueues:
167                         self.__callbackQueues[event].append(queue)
168                         return self
169                 else:
170                         for child in self.__children:
171                                 source = child._register_queue(event, queue)
172                                 if source is not None:
173                                         return source
174                         else:
175                                 return None
176
177         def _unregister_queue(self, event, queue):
178                 """
179                 @warning Not thread safe
180                 """
181
182                 if event in self.__callbackQueues:
183                         self.__callbackQueues[event].remove(queue)
184                         return self
185                 else:
186                         for child in self.__children:
187                                 source = child._unregister_queue(event, queue)
188                                 if source is not None:
189                                         return source
190                         else:
191                                 return None
192
193
194 class StrongEventSourceProxy(object):
195
196         def __init__(self, source):
197                 """
198                 @warning Not thread safe
199                 """
200
201                 self.source = source
202
203         def register(self, event, queue):
204                 """
205                 @warning Not thread safe
206                 """
207
208                 actualSource = self.source._register_queue(event, queue)
209                 ActualType = type(self)
210                 return ActualType(actualSource)
211
212         def unregister(self, event, queue):
213                 """
214                 @warning Not thread safe
215                 """
216
217                 actualSource = self.source._unregister_queue(event, queue)
218                 ActualType = type(self)
219                 return ActualType(actualSource)
220
221
222 class WeakEventSourceProxy(object):
223
224         def __init__(self, source):
225                 """
226                 @warning Not thread safe
227                 """
228
229                 self.source = weakref.ref(source)
230
231         def register(self, event, queue):
232                 """
233                 @warning Not thread safe
234                 """
235
236                 actualSource = self.source()._register_queue(event, queue)
237                 ActualType = type(self)
238                 return ActualType(actualSource)
239
240         def unregister(self, event, queue):
241                 """
242                 @warning Not thread safe
243                 """
244
245                 actualSource = self.source()._unregister_queue(event, queue)
246                 ActualType = type(self)
247                 return ActualType(actualSource)
248
249
250 class EventObserver(object):
251         """
252
253         >>> import Queue
254         >>> class Observer(EventObserver):
255         ...     def connect_to_source(self, eventSourceRoot):
256         ...             self.queue = Queue.Queue()
257         ...             self.source = eventSourceRoot.register("1-event-0", self.queue)
258         >>>
259         >>> sourceRoot = EventSource()
260         >>> sourceChild1 = EventSource()
261         >>> sourceChild1.register_provided_events("1-event-0", "1-event-1")
262         >>> sourceChild2 = EventSource()
263         >>> sourceChild2.register_provided_events("1-event-0", "1-event-1")
264         >>> sourceRoot.add_children(sourceChild1, sourceChild2)
265         >>>
266         >>> o1 = Observer()
267         >>> o1.connect_to_source(StrongEventSourceProxy(sourceRoot))
268         >>> o2 = Observer()
269         >>> o2.connect_to_source(WeakEventSourceProxy(sourceRoot))
270         >>>
271         >>> sourceChild1.notify_observers("1-event-0", "Hello World")
272         >>> o1.queue.get(False)
273         'Hello World'
274         >>> o2.queue.get(False)
275         'Hello World'
276         """
277
278         def connect_to_source(self, eventSourceRoot):
279                 raise NotImplementedError