Updating utility modules
[ejpi] / src / libraries / recipes / concurrent.py
1 #!/usr/bin/env python
2
3 from __future__ import with_statement
4
5 import os
6 import sys
7 import cPickle
8 import weakref
9 import threading
10 import errno
11 import time
12 import functools
13 import contextlib
14
15
16 def synchronized(lock):
17         """
18         Synchronization decorator.
19
20         >>> import misc
21         >>> misc.validate_decorator(synchronized(object()))
22         """
23
24         def wrap(f):
25
26                 @functools.wraps(f)
27                 def newFunction(*args, **kw):
28                         lock.acquire()
29                         try:
30                                 return f(*args, **kw)
31                         finally:
32                                 lock.release()
33                 return newFunction
34         return wrap
35
36
37 @contextlib.contextmanager
38 def qlock(queue, gblock = True, gtimeout = None, pblock = True, ptimeout = None):
39         """
40         Locking with a queue, good for when you want to lock an item passed around
41
42         >>> import Queue
43         >>> item = 5
44         >>> lock = Queue.Queue()
45         >>> lock.put(item)
46         >>> with qlock(lock) as i:
47         ...     print i
48         5
49         """
50         item = queue.get(gblock, gtimeout)
51         try:
52                 yield item
53         finally:
54                 queue.put(item, pblock, ptimeout)
55
56
57 @contextlib.contextmanager
58 def flock(path, timeout=-1):
59         WAIT_FOREVER = -1
60         DELAY = 0.1
61         timeSpent = 0
62
63         acquired = False
64
65         while timeSpent <= timeout or timeout == WAIT_FOREVER:
66                 try:
67                         fd = os.open(path, os.O_CREAT | os.O_EXCL | os.O_RDWR)
68                         acquired = True
69                         break
70                 except OSError, e:
71                         if e.errno != errno.EEXIST:
72                                 raise
73                 time.sleep(DELAY)
74                 timeSpent += DELAY
75
76         assert acquired, "Failed to grab file-lock %s within timeout %d" % (path, timeout)
77
78         try:
79                 yield fd
80         finally:
81                 os.unlink(path)
82
83
84 def threaded(f):
85         """
86         This decorator calls the method in a new thread, so execution returns straight away
87
88         >>> import misc
89         >>> misc.validate_decorator(threaded)
90         """
91
92         @functools.wraps(f)
93         def wrapper(*args, **kwargs):
94                 t = threading.Thread(target=f, args=args, kwargs=kwargs)
95                 t.setDaemon(True)
96                 t.start()
97         return wrapper
98
99
100 def fork(f):
101         """
102         Fork a function into a seperate process and block on it, for forcing reclaiming of resources for highly intensive functions
103         @return The original value through pickling.  If it is unable to be pickled, then the pickling exception is passed through
104         @throws Through pickling, exceptions are passed back and re-raised
105         @note source: http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/511474
106
107         >>> import misc
108         >>> misc.validate_decorator(fork)
109         """
110
111         @functools.wraps(f)
112         def wrapper(*args, **kwds):
113                 pread, pwrite = os.pipe()
114                 pid = os.fork()
115                 if pid > 0:
116                         os.close(pwrite)
117                         with os.fdopen(pread, 'rb') as f:
118                                 status, result = cPickle.load(f)
119                         os.waitpid(pid, 0)
120                         if status == 0:
121                                 return result
122                         else:
123                                 raise result
124                 else:
125                         os.close(pread)
126                         try:
127                                 result = f(*args, **kwds)
128                                 status = 0
129                         except Exception, exc:
130                                 result = exc
131                                 status = 1
132                         with os.fdopen(pwrite, 'wb') as f:
133                                 try:
134                                         cPickle.dump((status, result), f, cPickle.HIGHEST_PROTOCOL)
135                                 except cPickle.PicklingError, exc:
136                                         cPickle.dump((2, exc), f, cPickle.HIGHEST_PROTOCOL)
137                         f.close()
138                         sys.exit(0)
139         return wrapper
140
141
142 @contextlib.contextmanager
143 def qlock(queue, gblock = True, gtimeout = None, pblock = True, ptimeout = None):
144         """
145         Locking with a queue, good for when you want to lock an item passed around
146
147         >>> import Queue
148         >>> item = 5
149         >>> lock = Queue.Queue()
150         >>> lock.put(item)
151         >>> with qlock(lock) as i:
152         ...     print i
153         5
154         """
155         item = queue.get(gblock, gtimeout)
156         yield item
157         queue.put(item, pblock, ptimeout)
158
159
160 class EventSource(object):
161         """
162         Asynchronous implementation of the observer pattern
163
164         >>> sourceRoot = EventSource()
165         >>> sourceChild1 = EventSource()
166         >>> sourceChild1.register_provided_events("1-event-0", "1-event-1")
167         >>> sourceChild2 = EventSource()
168         >>> sourceChild2.register_provided_events("1-event-0", "1-event-1")
169         >>> sourceRoot.add_children(sourceChild1, sourceChild2)
170         """
171
172         def __init__(self):
173                 """
174                 @warning Not thread safe
175                 """
176
177                 self.__callbackQueues = {}
178                 self.__children = []
179
180         def add_children(self, *childrenSources):
181                 """
182                 @warning Not thread safe
183                 """
184
185                 self.__children.extend(childrenSources)
186
187         def remove_children(self, *childrenSources):
188                 """
189                 @warning Not thread safe
190                 """
191
192                 for child in childrenSources:
193                         self.__children.remove(child)
194
195         def register_provided_events(self, *events):
196                 """
197                 @warning Not thread safe
198                 """
199
200                 self.__callbackQueues.update(dict((event, []) for event in events))
201
202         def notify_observers(self, event, message):
203                 """
204                 @warning As threadsafe as the queue used.  qlock is recommended for the message if it needs locking
205                 """
206
207                 for queue in self.__callbackQueues[event]:
208                         queue.put(message)
209
210         def _register_queue(self, event, queue):
211                 """
212                 @warning Not thread safe
213                 """
214
215                 if event in self.__callbackQueues:
216                         self.__callbackQueues[event].append(queue)
217                         return self
218                 else:
219                         for child in self.__children:
220                                 source = child._register_queue(event, queue)
221                                 if source is not None:
222                                         return source
223                         else:
224                                 return None
225
226         def _unregister_queue(self, event, queue):
227                 """
228                 @warning Not thread safe
229                 """
230
231                 if event in self.__callbackQueues:
232                         self.__callbackQueues[event].remove(queue)
233                         return self
234                 else:
235                         for child in self.__children:
236                                 source = child._unregister_queue(event, queue)
237                                 if source is not None:
238                                         return source
239                         else:
240                                 return None
241
242
243 class StrongEventSourceProxy(object):
244
245         def __init__(self, source):
246                 """
247                 @warning Not thread safe
248                 """
249
250                 self.source = source
251
252         def register(self, event, queue):
253                 """
254                 @warning Not thread safe
255                 """
256
257                 actualSource = self.source._register_queue(event, queue)
258                 ActualType = type(self)
259                 return ActualType(actualSource)
260
261         def unregister(self, event, queue):
262                 """
263                 @warning Not thread safe
264                 """
265
266                 actualSource = self.source._unregister_queue(event, queue)
267                 ActualType = type(self)
268                 return ActualType(actualSource)
269
270
271 class WeakEventSourceProxy(object):
272
273         def __init__(self, source):
274                 """
275                 @warning Not thread safe
276                 """
277
278                 self.source = weakref.ref(source)
279
280         def register(self, event, queue):
281                 """
282                 @warning Not thread safe
283                 """
284
285                 actualSource = self.source()._register_queue(event, queue)
286                 ActualType = type(self)
287                 return ActualType(actualSource)
288
289         def unregister(self, event, queue):
290                 """
291                 @warning Not thread safe
292                 """
293
294                 actualSource = self.source()._unregister_queue(event, queue)
295                 ActualType = type(self)
296                 return ActualType(actualSource)
297
298
299 class EventObserver(object):
300         """
301
302         >>> import Queue
303         >>> class Observer(EventObserver):
304         ...     def connect_to_source(self, eventSourceRoot):
305         ...             self.queue = Queue.Queue()
306         ...             self.source = eventSourceRoot.register("1-event-0", self.queue)
307         >>>
308         >>> sourceRoot = EventSource()
309         >>> sourceChild1 = EventSource()
310         >>> sourceChild1.register_provided_events("1-event-0", "1-event-1")
311         >>> sourceChild2 = EventSource()
312         >>> sourceChild2.register_provided_events("1-event-0", "1-event-1")
313         >>> sourceRoot.add_children(sourceChild1, sourceChild2)
314         >>>
315         >>> o1 = Observer()
316         >>> o1.connect_to_source(StrongEventSourceProxy(sourceRoot))
317         >>> o2 = Observer()
318         >>> o2.connect_to_source(WeakEventSourceProxy(sourceRoot))
319         >>>
320         >>> sourceChild1.notify_observers("1-event-0", "Hello World")
321         >>> o1.queue.get(False)
322         'Hello World'
323         >>> o2.queue.get(False)
324         'Hello World'
325         """
326
327         def connect_to_source(self, eventSourceRoot):
328                 raise NotImplementedError