Initial checkin
[ejpi] / src / libraries / recipes / concurrent.py
1 #!/usr/bin/env python
2
3 from __future__ import with_statement
4
5 import os
6 import sys
7 import cPickle
8 import weakref
9 import threading
10 import functools
11 import contextlib
12
13
14 def synchronized(lock):
15         """
16         Synchronization decorator.
17
18         >>> import misc
19         >>> misc.validate_decorator(synchronized(object()))
20         """
21
22         def wrap(f):
23
24                 @functools.wraps(f)
25                 def newFunction(*args, **kw):
26                         lock.acquire()
27                         try:
28                                 return f(*args, **kw)
29                         finally:
30                                 lock.release()
31                 return newFunction
32         return wrap
33
34
35 def threaded(f):
36         """
37         This decorator calls the method in a new thread, so execution returns straight away
38
39         >>> import misc
40         >>> misc.validate_decorator(threaded)
41         """
42
43         @functools.wraps(f)
44         def wrapper(*args, **kwargs):
45                 t = threading.Thread(target=f, args=args, kwargs=kwargs)
46                 t.setDaemon(True)
47                 t.start()
48         return wrapper
49
50
51 def fork(f):
52         """
53         Fork a function into a seperate process and block on it, for forcing reclaiming of resources for highly intensive functions
54         @return The original value through pickling.  If it is unable to be pickled, then the pickling exception is passed through
55         @throws Through pickling, exceptions are passed back and re-raised
56         @note source: http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/511474
57
58         >>> import misc
59         >>> misc.validate_decorator(fork)
60         """
61
62         @functools.wraps(f)
63         def wrapper(*args, **kwds):
64                 pread, pwrite = os.pipe()
65                 pid = os.fork()
66                 if pid > 0:
67                         os.close(pwrite)
68                         with os.fdopen(pread, 'rb') as f:
69                                 status, result = cPickle.load(f)
70                         os.waitpid(pid, 0)
71                         if status == 0:
72                                 return result
73                         else:
74                                 raise result
75                 else:
76                         os.close(pread)
77                         try:
78                                 result = f(*args, **kwds)
79                                 status = 0
80                         except Exception, exc:
81                                 result = exc
82                                 status = 1
83                         with os.fdopen(pwrite, 'wb') as f:
84                                 try:
85                                         cPickle.dump((status, result), f, cPickle.HIGHEST_PROTOCOL)
86                                 except cPickle.PicklingError, exc:
87                                         cPickle.dump((2, exc), f, cPickle.HIGHEST_PROTOCOL)
88                         f.close()
89                         sys.exit(0)
90         return wrapper
91
92
93 @contextlib.contextmanager
94 def qlock(queue, gblock = True, gtimeout = None, pblock = True, ptimeout = None):
95         """
96         Locking with a queue, good for when you want to lock an item passed around
97
98         >>> item = 5
99         >>> lock = Queue.Queue()
100         >>> lock.put(item)
101         >>> with qlock(lock) as i:
102         ...     print i
103         5
104         """
105         item = queue.get(gblock, gtimeout)
106         yield item
107         queue.put(item, pblock, ptimeout)
108
109
110 class EventSource(object):
111         """
112         Asynchronous implementation of the observer pattern
113
114         >>> sourceRoot = EventSource()
115         >>> sourceChild1 = EventSource()
116         >>> sourceChild1.register_provided_events("1-event-0", "1-event-1")
117         >>> sourceChild2 = EventSource()
118         >>> sourceChild2.register_provided_events("1-event-0", "1-event-1")
119         >>> sourceRoot.add_children(sourceChild1, sourceChild2)
120         """
121
122         def __init__(self):
123                 """
124                 @warning Not thread safe
125                 """
126
127                 self.__callbackQueues = {}
128                 self.__children = []
129
130         def add_children(self, *childrenSources):
131                 """
132                 @warning Not thread safe
133                 """
134
135                 self.__children.extend(childrenSources)
136
137         def remove_children(self, *childrenSources):
138                 """
139                 @warning Not thread safe
140                 """
141
142                 for child in childrenSources:
143                         self.__children.remove(child)
144
145         def register_provided_events(self, *events):
146                 """
147                 @warning Not thread safe
148                 """
149
150                 self.__callbackQueues.update(dict((event, []) for event in events))
151
152         def notify_observers(self, event, message):
153                 """
154                 @warning As threadsafe as the queue used.  qlock is recommended for the message if it needs locking
155                 """
156
157                 for queue in self.__callbackQueues[event]:
158                         queue.put(message)
159
160         def _register_queue(self, event, queue):
161                 """
162                 @warning Not thread safe
163                 """
164
165                 if event in self.__callbackQueues:
166                         self.__callbackQueues[event].append(queue)
167                         return self
168                 else:
169                         for child in self.__children:
170                                 source = child._register_queue(event, queue)
171                                 if source is not None:
172                                         return source
173                         else:
174                                 return None
175
176         def _unregister_queue(self, event, queue):
177                 """
178                 @warning Not thread safe
179                 """
180
181                 if event in self.__callbackQueues:
182                         self.__callbackQueues[event].remove(queue)
183                         return self
184                 else:
185                         for child in self.__children:
186                                 source = child._unregister_queue(event, queue)
187                                 if source is not None:
188                                         return source
189                         else:
190                                 return None
191
192
193 class StrongEventSourceProxy(object):
194
195         def __init__(self, source):
196                 """
197                 @warning Not thread safe
198                 """
199
200                 self.source = source
201
202         def register(self, event, queue):
203                 """
204                 @warning Not thread safe
205                 """
206
207                 actualSource = self.source._register_queue(event, queue)
208                 ActualType = type(self)
209                 return ActualType(actualSource)
210
211         def unregister(self, event, queue):
212                 """
213                 @warning Not thread safe
214                 """
215
216                 actualSource = self.source._unregister_queue(event, queue)
217                 ActualType = type(self)
218                 return ActualType(actualSource)
219
220
221 class WeakEventSourceProxy(object):
222
223         def __init__(self, source):
224                 """
225                 @warning Not thread safe
226                 """
227
228                 self.source = weakref.ref(source)
229
230         def register(self, event, queue):
231                 """
232                 @warning Not thread safe
233                 """
234
235                 actualSource = self.source()._register_queue(event, queue)
236                 ActualType = type(self)
237                 return ActualType(actualSource)
238
239         def unregister(self, event, queue):
240                 """
241                 @warning Not thread safe
242                 """
243
244                 actualSource = self.source()._unregister_queue(event, queue)
245                 ActualType = type(self)
246                 return ActualType(actualSource)
247
248
249 class EventObserver(object):
250         """
251
252         >>> import Queue
253         >>> class Observer(EventObserver):
254         ...     def connect_to_source(self, eventSourceRoot):
255         ...             self.queue = Queue.Queue()
256         ...             self.source = eventSourceRoot.register("1-event-0", self.queue)
257         >>>
258         >>> sourceRoot = EventSource()
259         >>> sourceChild1 = EventSource()
260         >>> sourceChild1.register_provided_events("1-event-0", "1-event-1")
261         >>> sourceChild2 = EventSource()
262         >>> sourceChild2.register_provided_events("1-event-0", "1-event-1")
263         >>> sourceRoot.add_children(sourceChild1, sourceChild2)
264         >>>
265         >>> o1 = Observer()
266         >>> o1.connect_to_source(StrongEventSourceProxy(sourceRoot))
267         >>> o2 = Observer()
268         >>> o2.connect_to_source(WeakEventSourceProxy(sourceRoot))
269         >>>
270         >>> sourceChild1.notify_observers("1-event-0", "Hello World")
271         >>> o1.queue.get(False)
272         'Hello World'
273         >>> o2.queue.get(False)
274         'Hello World'
275         """
276
277         def connect_to_source(self, eventSourceRoot):
278                 raise NotImplementedError