More on the path to generalizing quick add
[doneit] / src / coroutines.py
1 #!/usr/bin/env python
2
3 """
4 Uses for generators
5 * Pull pipelining (iterators)
6 * Push pipelining (coroutines)
7 * State machines (coroutines)
8 * "Cooperative multitasking" (coroutines)
9 * Algorithm -> Object transform for cohesiveness (for example context managers) (coroutines)
10
11 Design considerations
12 * When should a stage pass on exceptions or have it thrown within it?
13 * When should a stage pass on GeneratorExits?
14 * Is there a way to either turn a push generator into a iterator or to use
15         comprehensions syntax for push generators (I doubt it)
16 * When should the stage try and send data in both directions
17 * Since pull generators (generators), push generators (coroutines), subroutines, and coroutines are all coroutines, maybe we should rename the push generators to not confuse them, like signals/slots? and then refer to two-way generators as coroutines
18 ** If so, make s* and co* implementation of functions
19 """
20
21 import threading
22 import Queue
23 import pickle
24 import functools
25 import itertools
26 import xml.sax
27 import xml.parsers.expat
28 import uuid
29
30
31 def autostart(func):
32         """
33         >>> @autostart
34         ... def grep_sink(pattern):
35         ...     print "Looking for %s" % pattern
36         ...     while True:
37         ...             line = yield
38         ...             if pattern in line:
39         ...                     print line,
40         >>> g = grep_sink("python")
41         Looking for python
42         >>> g.send("Yeah but no but yeah but no")
43         >>> g.send("A series of tubes")
44         >>> g.send("python generators rock!")
45         python generators rock!
46         >>> g.close()
47         """
48
49         @functools.wraps(func)
50         def start(*args, **kwargs):
51                 cr = func(*args, **kwargs)
52                 cr.next()
53                 return cr
54
55         return start
56
57
58 @autostart
59 def printer_sink(format = "%s"):
60         """
61         >>> pr = printer_sink("%r")
62         >>> pr.send("Hello")
63         'Hello'
64         >>> pr.send("5")
65         '5'
66         >>> pr.send(5)
67         5
68         >>> p = printer_sink()
69         >>> p.send("Hello")
70         Hello
71         >>> p.send("World")
72         World
73         >>> # p.throw(RuntimeError, "Goodbye")
74         >>> # p.send("Meh")
75         >>> # p.close()
76         """
77         while True:
78                 item = yield
79                 print format % (item, )
80
81
82 @autostart
83 def null_sink():
84         """
85         Good for uses like with cochain to pick up any slack
86         """
87         while True:
88                 item = yield
89
90
91 def itr_source(itr, target):
92         """
93         >>> itr_source(xrange(2), printer_sink())
94         0
95         1
96         """
97         for item in itr:
98                 target.send(item)
99
100
101 @autostart
102 def cofilter(predicate, target):
103         """
104         >>> p = printer_sink()
105         >>> cf = cofilter(None, p)
106         >>> cf.send("")
107         >>> cf.send("Hello")
108         Hello
109         >>> cf.send([])
110         >>> cf.send([1, 2])
111         [1, 2]
112         >>> cf.send(False)
113         >>> cf.send(True)
114         True
115         >>> cf.send(0)
116         >>> cf.send(1)
117         1
118         >>> # cf.throw(RuntimeError, "Goodbye")
119         >>> # cf.send(False)
120         >>> # cf.send(True)
121         >>> # cf.close()
122         """
123         if predicate is None:
124                 predicate = bool
125
126         while True:
127                 try:
128                         item = yield
129                         if predicate(item):
130                                 target.send(item)
131                 except StandardError, e:
132                         target.throw(e.__class__, e.message)
133
134
135 @autostart
136 def comap(function, target):
137         """
138         >>> p = printer_sink()
139         >>> cm = comap(lambda x: x+1, p)
140         >>> cm.send(0)
141         1
142         >>> cm.send(1.0)
143         2.0
144         >>> cm.send(-2)
145         -1
146         >>> # cm.throw(RuntimeError, "Goodbye")
147         >>> # cm.send(0)
148         >>> # cm.send(1.0)
149         >>> # cm.close()
150         """
151         while True:
152                 item = yield
153                 mappedItem = function(item)
154                 target.send(mappedItem)
155
156
157 def func_sink(function):
158         return comap(function, null_sink())
159
160
161 @autostart
162 def append_sink(l):
163         """
164         >>> l = []
165         >>> apps = append_sink(l)
166         >>> apps.send(1)
167         >>> apps.send(2)
168         >>> apps.send(3)
169         >>> print l
170         [1, 2, 3]
171         """
172         while True:
173                 item = yield
174                 l.append(item)
175
176
177 @autostart
178 def last_n_sink(l, n = 1):
179         """
180         >>> l = []
181         >>> lns = last_n_sink(l)
182         >>> lns.send(1)
183         >>> lns.send(2)
184         >>> lns.send(3)
185         >>> print l
186         [3]
187         """
188         del l[:]
189         while True:
190                 item = yield
191                 extraCount = len(l) - n + 1
192                 if 0 < extraCount:
193                         del l[0:extraCount]
194                 l.append(item)
195
196
197 @autostart
198 def coreduce(target, function, initializer = None):
199         """
200         >>> reduceResult = []
201         >>> lns = last_n_sink(reduceResult)
202         >>> cr = coreduce(lns, lambda x, y: x + y, 0)
203         >>> cr.send(1)
204         >>> cr.send(2)
205         >>> cr.send(3)
206         >>> print reduceResult
207         [6]
208         >>> cr = coreduce(lns, lambda x, y: x + y)
209         >>> cr.send(1)
210         >>> cr.send(2)
211         >>> cr.send(3)
212         >>> print reduceResult
213         [6]
214         """
215         isFirst = True
216         cumulativeRef = initializer
217         while True:
218                 item = yield
219                 if isFirst and initializer is None:
220                         cumulativeRef = item
221                 else:
222                         cumulativeRef = function(cumulativeRef, item)
223                 target.send(cumulativeRef)
224                 isFirst = False
225
226
227 @autostart
228 def cotee(targets):
229         """
230         Takes a sequence of coroutines and sends the received items to all of them
231
232         >>> ct = cotee((printer_sink("1 %s"), printer_sink("2 %s")))
233         >>> ct.send("Hello")
234         1 Hello
235         2 Hello
236         >>> ct.send("World")
237         1 World
238         2 World
239         >>> # ct.throw(RuntimeError, "Goodbye")
240         >>> # ct.send("Meh")
241         >>> # ct.close()
242         """
243         while True:
244                 try:
245                         item = yield
246                         for target in targets:
247                                 target.send(item)
248                 except StandardError, e:
249                         for target in targets:
250                                 target.throw(e.__class__, e.message)
251
252
253 class CoTee(object):
254         """
255         >>> ct = CoTee()
256         >>> ct.register_sink(printer_sink("1 %s"))
257         >>> ct.register_sink(printer_sink("2 %s"))
258         >>> ct.stage.send("Hello")
259         1 Hello
260         2 Hello
261         >>> ct.stage.send("World")
262         1 World
263         2 World
264         >>> ct.register_sink(printer_sink("3 %s"))
265         >>> ct.stage.send("Foo")
266         1 Foo
267         2 Foo
268         3 Foo
269         >>> # ct.stage.throw(RuntimeError, "Goodbye")
270         >>> # ct.stage.send("Meh")
271         >>> # ct.stage.close()
272         """
273
274         def __init__(self):
275                 self.stage = self._stage()
276                 self._targets = []
277
278         def register_sink(self, sink):
279                 self._targets.append(sink)
280
281         def unregister_sink(self, sink):
282                 self._targets.remove(sink)
283
284         def restart(self):
285                 self.stage = self._stage()
286
287         @autostart
288         def _stage(self):
289                 while True:
290                         try:
291                                 item = yield
292                         except StandardError, e:
293                                 for target in self._targets:
294                                         target.throw(e.__class__, e.message)
295                         else:
296                                 for target in self._targets:
297                                         target.send(item)
298
299
300 class CoSwitch(object):
301         """
302         >>> apr = printer_sink("a %r")
303         >>> bpr = printer_sink("b %r")
304         >>> cs = CoSwitch(["a", "b"])
305         >>> cs.register_sink("a", apr)
306         >>> cs.register_sink("b", bpr)
307         >>> cs.stage.send(("a", 1, 2, 3))
308         a ('a', 1, 2, 3)
309         >>> cs.stage.send(("b", 1, 2, 3))
310         b ('b', 1, 2, 3)
311         >>> cs.stage.send(("b", 1, 2, 3))
312         b ('b', 1, 2, 3)
313         """
314
315         def __init__(self, signalKeys, key = None):
316                 self.stage = self._stage()
317                 self._key = key if key else lambda eventData: eventData[0]
318                 self._targets = {}
319
320                 for signalKey in signalKeys:
321                         self._targets[signalKey] = {}
322
323         def register_sink(self, signalKey, sink):
324                 id = uuid.uuid4()
325                 self._targets[signalKey][id] = sink
326                 return id
327
328         def unregister_sink(self, signalKey, id):
329                 del self._targets[signalKey][id]
330
331         def restart(self):
332                 self.stage = self._stage()
333
334         @autostart
335         def _stage(self):
336                 while True:
337                         try:
338                                 item = yield
339                         except StandardError, e:
340                                 for target in self._targets:
341                                         target.throw(e.__class__, e.message)
342                         else:
343                                 key = self._key(item)
344                                 for target in self._targets[key].itervalues():
345                                         target.send(item)
346
347
348 def _flush_queue(queue):
349         while not queue.empty():
350                 yield queue.get()
351
352
353 @autostart
354 def cocount(target, start = 0):
355         """
356         >>> cc = cocount(printer_sink("%s"))
357         >>> cc.send("a")
358         0
359         >>> cc.send(None)
360         1
361         >>> cc.send([])
362         2
363         >>> cc.send(0)
364         3
365         """
366         for i in itertools.count(start):
367                 item = yield
368                 target.send(i)
369
370
371 @autostart
372 def coenumerate(target, start = 0):
373         """
374         >>> ce = coenumerate(printer_sink("%r"))
375         >>> ce.send("a")
376         (0, 'a')
377         >>> ce.send(None)
378         (1, None)
379         >>> ce.send([])
380         (2, [])
381         >>> ce.send(0)
382         (3, 0)
383         """
384         for i in itertools.count(start):
385                 item = yield
386                 decoratedItem = i, item
387                 target.send(decoratedItem)
388
389
390 @autostart
391 def corepeat(target, elem):
392         """
393         >>> cr = corepeat(printer_sink("%s"), "Hello World")
394         >>> cr.send("a")
395         Hello World
396         >>> cr.send(None)
397         Hello World
398         >>> cr.send([])
399         Hello World
400         >>> cr.send(0)
401         Hello World
402         """
403         while True:
404                 item = yield
405                 target.send(elem)
406
407
408 @autostart
409 def cointercept(target, elems):
410         """
411         >>> cr = cointercept(printer_sink("%s"), [1, 2, 3, 4])
412         >>> cr.send("a")
413         1
414         >>> cr.send(None)
415         2
416         >>> cr.send([])
417         3
418         >>> cr.send(0)
419         4
420         >>> cr.send("Bye")
421         Traceback (most recent call last):
422           File "/usr/lib/python2.5/doctest.py", line 1228, in __run
423             compileflags, 1) in test.globs
424           File "<doctest __main__.cointercept[5]>", line 1, in <module>
425             cr.send("Bye")
426         StopIteration
427         """
428         item = yield
429         for elem in elems:
430                 target.send(elem)
431                 item = yield
432
433
434 @autostart
435 def codropwhile(target, pred):
436         """
437         >>> cdw = codropwhile(printer_sink("%s"), lambda x: x)
438         >>> cdw.send([0, 1, 2])
439         >>> cdw.send(1)
440         >>> cdw.send(True)
441         >>> cdw.send(False)
442         >>> cdw.send([0, 1, 2])
443         [0, 1, 2]
444         >>> cdw.send(1)
445         1
446         >>> cdw.send(True)
447         True
448         """
449         while True:
450                 item = yield
451                 if not pred(item):
452                         break
453
454         while True:
455                 item = yield
456                 target.send(item)
457
458
459 @autostart
460 def cotakewhile(target, pred):
461         """
462         >>> ctw = cotakewhile(printer_sink("%s"), lambda x: x)
463         >>> ctw.send([0, 1, 2])
464         [0, 1, 2]
465         >>> ctw.send(1)
466         1
467         >>> ctw.send(True)
468         True
469         >>> ctw.send(False)
470         >>> ctw.send([0, 1, 2])
471         >>> ctw.send(1)
472         >>> ctw.send(True)
473         """
474         while True:
475                 item = yield
476                 if not pred(item):
477                         break
478                 target.send(item)
479
480         while True:
481                 item = yield
482
483
484 @autostart
485 def coslice(target, lower, upper):
486         """
487         >>> cs = coslice(printer_sink("%r"), 3, 5)
488         >>> cs.send("0")
489         >>> cs.send("1")
490         >>> cs.send("2")
491         >>> cs.send("3")
492         '3'
493         >>> cs.send("4")
494         '4'
495         >>> cs.send("5")
496         >>> cs.send("6")
497         """
498         for i in xrange(lower):
499                 item = yield
500         for i in xrange(upper - lower):
501                 item = yield
502                 target.send(item)
503         while True:
504                 item = yield
505
506
507 @autostart
508 def cochain(targets):
509         """
510         >>> cr = cointercept(printer_sink("good %s"), [1, 2, 3, 4])
511         >>> cc = cochain([cr, printer_sink("end %s")])
512         >>> cc.send("a")
513         good 1
514         >>> cc.send(None)
515         good 2
516         >>> cc.send([])
517         good 3
518         >>> cc.send(0)
519         good 4
520         >>> cc.send("Bye")
521         end Bye
522         """
523         behind = []
524         for target in targets:
525                 try:
526                         while behind:
527                                 item = behind.pop()
528                                 target.send(item)
529                         while True:
530                                 item = yield
531                                 target.send(item)
532                 except StopIteration:
533                         behind.append(item)
534
535
536 @autostart
537 def queue_sink(queue):
538         """
539         >>> q = Queue.Queue()
540         >>> qs = queue_sink(q)
541         >>> qs.send("Hello")
542         >>> qs.send("World")
543         >>> qs.throw(RuntimeError, "Goodbye")
544         >>> qs.send("Meh")
545         >>> qs.close()
546         >>> print [i for i in _flush_queue(q)]
547         [(None, 'Hello'), (None, 'World'), (<type 'exceptions.RuntimeError'>, 'Goodbye'), (None, 'Meh'), (<type 'exceptions.GeneratorExit'>, None)]
548         """
549         while True:
550                 try:
551                         item = yield
552                         queue.put((None, item))
553                 except StandardError, e:
554                         queue.put((e.__class__, e.message))
555                 except GeneratorExit:
556                         queue.put((GeneratorExit, None))
557                         raise
558
559
560 def decode_item(item, target):
561         if item[0] is None:
562                 target.send(item[1])
563                 return False
564         elif item[0] is GeneratorExit:
565                 target.close()
566                 return True
567         else:
568                 target.throw(item[0], item[1])
569                 return False
570
571
572 def queue_source(queue, target):
573         """
574         >>> q = Queue.Queue()
575         >>> for i in [
576         ...     (None, 'Hello'),
577         ...     (None, 'World'),
578         ...     (GeneratorExit, None),
579         ...     ]:
580         ...     q.put(i)
581         >>> qs = queue_source(q, printer_sink())
582         Hello
583         World
584         """
585         isDone = False
586         while not isDone:
587                 item = queue.get()
588                 isDone = decode_item(item, target)
589
590
591 def threaded_stage(target, thread_factory = threading.Thread):
592         messages = Queue.Queue()
593
594         run_source = functools.partial(queue_source, messages, target)
595         thread_factory(target=run_source).start()
596
597         # Sink running in current thread
598         return functools.partial(queue_sink, messages)
599
600
601 @autostart
602 def pickle_sink(f):
603         while True:
604                 try:
605                         item = yield
606                         pickle.dump((None, item), f)
607                 except StandardError, e:
608                         pickle.dump((e.__class__, e.message), f)
609                 except GeneratorExit:
610                         pickle.dump((GeneratorExit, ), f)
611                         raise
612                 except StopIteration:
613                         f.close()
614                         return
615
616
617 def pickle_source(f, target):
618         try:
619                 isDone = False
620                 while not isDone:
621                         item = pickle.load(f)
622                         isDone = decode_item(item, target)
623         except EOFError:
624                 target.close()
625
626
627 class EventHandler(object, xml.sax.ContentHandler):
628
629         START = "start"
630         TEXT = "text"
631         END = "end"
632
633         def __init__(self, target):
634                 object.__init__(self)
635                 xml.sax.ContentHandler.__init__(self)
636                 self._target = target
637
638         def startElement(self, name, attrs):
639                 self._target.send((self.START, (name, attrs._attrs)))
640
641         def characters(self, text):
642                 self._target.send((self.TEXT, text))
643
644         def endElement(self, name):
645                 self._target.send((self.END, name))
646
647
648 def expat_parse(f, target):
649         parser = xml.parsers.expat.ParserCreate()
650         parser.buffer_size = 65536
651         parser.buffer_text = True
652         parser.returns_unicode = False
653         parser.StartElementHandler = lambda name, attrs: target.send(('start', (name, attrs)))
654         parser.EndElementHandler = lambda name: target.send(('end', name))
655         parser.CharacterDataHandler = lambda data: target.send(('text', data))
656         parser.ParseFile(f)
657
658
659 def gtk_source(widget, signalName, target):
660         """
661         >>> import gtk
662         >>> widget = gtk.Button("Source")
663         >>> pr = printer_sink("%r")
664         >>> gtk_source(widget, "clicked", pr)
665         """
666
667         def on_signal(*args):
668                 targetArgs = [signalName]
669                 targetArgs.extend(args)
670                 target.send(targetArgs)
671
672         return widget.connect(signalName, on_signal)
673
674 if __name__ == "__main__":
675         import doctest
676         doctest.testmod()