5 * Pull pipelining (iterators)
6 * Push pipelining (coroutines)
7 * State machines (coroutines)
8 * "Cooperative multitasking" (coroutines)
9 * Algorithm -> Object transform for cohesiveness (for example context managers) (coroutines)
12 * When should a stage pass on exceptions or have it thrown within it?
13 * When should a stage pass on GeneratorExits?
14 * Is there a way to either turn a push generator into a iterator or to use
15 comprehensions syntax for push generators (I doubt it)
16 * When should the stage try and send data in both directions
17 * Since pull generators (generators), push generators (coroutines), subroutines, and coroutines are all coroutines, maybe we should rename the push generators to not confuse them, like signals/slots? and then refer to two-way generators as coroutines
18 ** If so, make s* and co* implementation of functions
27 import xml.parsers.expat
34 ... def grep_sink(pattern):
35 ... print "Looking for %s" % pattern
38 ... if pattern in line:
40 >>> g = grep_sink("python")
42 >>> g.send("Yeah but no but yeah but no")
43 >>> g.send("A series of tubes")
44 >>> g.send("python generators rock!")
45 python generators rock!
49 @functools.wraps(func)
50 def start(*args, **kwargs):
51 cr = func(*args, **kwargs)
59 def printer_sink(format = "%s"):
61 >>> pr = printer_sink("%r")
68 >>> p = printer_sink()
73 >>> # p.throw(RuntimeError, "Goodbye")
79 print format % (item, )
85 Good for uses like with cochain to pick up any slack
91 def itr_source(itr, target):
93 >>> itr_source(xrange(2), printer_sink())
102 def cofilter(predicate, target):
104 >>> p = printer_sink()
105 >>> cf = cofilter(None, p)
118 >>> # cf.throw(RuntimeError, "Goodbye")
123 if predicate is None:
131 except StandardError, e:
132 target.throw(e.__class__, e.message)
136 def comap(function, target):
138 >>> p = printer_sink()
139 >>> cm = comap(lambda x: x+1, p)
146 >>> # cm.throw(RuntimeError, "Goodbye")
153 mappedItem = function(item)
154 target.send(mappedItem)
157 def func_sink(function):
158 return comap(function, null_sink())
165 >>> apps = append_sink(l)
178 def last_n_sink(l, n = 1):
181 >>> lns = last_n_sink(l)
191 extraCount = len(l) - n + 1
198 def coreduce(target, function, initializer = None):
200 >>> reduceResult = []
201 >>> lns = last_n_sink(reduceResult)
202 >>> cr = coreduce(lns, lambda x, y: x + y, 0)
206 >>> print reduceResult
208 >>> cr = coreduce(lns, lambda x, y: x + y)
212 >>> print reduceResult
216 cumulativeRef = initializer
219 if isFirst and initializer is None:
222 cumulativeRef = function(cumulativeRef, item)
223 target.send(cumulativeRef)
230 Takes a sequence of coroutines and sends the received items to all of them
232 >>> ct = cotee((printer_sink("1 %s"), printer_sink("2 %s")))
239 >>> # ct.throw(RuntimeError, "Goodbye")
246 for target in targets:
248 except StandardError, e:
249 for target in targets:
250 target.throw(e.__class__, e.message)
256 >>> ct.register_sink(printer_sink("1 %s"))
257 >>> ct.register_sink(printer_sink("2 %s"))
258 >>> ct.stage.send("Hello")
261 >>> ct.stage.send("World")
264 >>> ct.register_sink(printer_sink("3 %s"))
265 >>> ct.stage.send("Foo")
269 >>> # ct.stage.throw(RuntimeError, "Goodbye")
270 >>> # ct.stage.send("Meh")
271 >>> # ct.stage.close()
275 self.stage = self._stage()
278 def register_sink(self, sink):
279 self._targets.append(sink)
281 def unregister_sink(self, sink):
282 self._targets.remove(sink)
285 self.stage = self._stage()
292 except StandardError, e:
293 for target in self._targets:
294 target.throw(e.__class__, e.message)
296 for target in self._targets:
300 class CoSwitch(object):
302 >>> apr = printer_sink("a %r")
303 >>> bpr = printer_sink("b %r")
304 >>> cs = CoSwitch(["a", "b"])
305 >>> cs.register_sink("a", apr)
306 >>> cs.register_sink("b", bpr)
307 >>> cs.stage.send(("a", 1, 2, 3))
309 >>> cs.stage.send(("b", 1, 2, 3))
311 >>> cs.stage.send(("b", 1, 2, 3))
315 def __init__(self, signalKeys, key = None):
316 self.stage = self._stage()
317 self._key = key if key else lambda eventData: eventData[0]
320 for signalKey in signalKeys:
321 self._targets[signalKey] = {}
323 def register_sink(self, signalKey, sink):
325 self._targets[signalKey][id] = sink
328 def unregister_sink(self, signalKey, id):
329 del self._targets[signalKey][id]
332 self.stage = self._stage()
339 except StandardError, e:
340 for target in self._targets:
341 target.throw(e.__class__, e.message)
343 key = self._key(item)
344 for target in self._targets[key].itervalues():
348 def _flush_queue(queue):
349 while not queue.empty():
354 def cocount(target, start = 0):
356 >>> cc = cocount(printer_sink("%s"))
366 for i in itertools.count(start):
372 def coenumerate(target, start = 0):
374 >>> ce = coenumerate(printer_sink("%r"))
384 for i in itertools.count(start):
386 decoratedItem = i, item
387 target.send(decoratedItem)
391 def corepeat(target, elem):
393 >>> cr = corepeat(printer_sink("%s"), "Hello World")
409 def cointercept(target, elems):
411 >>> cr = cointercept(printer_sink("%s"), [1, 2, 3, 4])
421 Traceback (most recent call last):
422 File "/usr/lib/python2.5/doctest.py", line 1228, in __run
423 compileflags, 1) in test.globs
424 File "<doctest __main__.cointercept[5]>", line 1, in <module>
435 def codropwhile(target, pred):
437 >>> cdw = codropwhile(printer_sink("%s"), lambda x: x)
438 >>> cdw.send([0, 1, 2])
442 >>> cdw.send([0, 1, 2])
460 def cotakewhile(target, pred):
462 >>> ctw = cotakewhile(printer_sink("%s"), lambda x: x)
463 >>> ctw.send([0, 1, 2])
470 >>> ctw.send([0, 1, 2])
485 def coslice(target, lower, upper):
487 >>> cs = coslice(printer_sink("%r"), 3, 5)
498 for i in xrange(lower):
500 for i in xrange(upper - lower):
508 def cochain(targets):
510 >>> cr = cointercept(printer_sink("good %s"), [1, 2, 3, 4])
511 >>> cc = cochain([cr, printer_sink("end %s")])
524 for target in targets:
532 except StopIteration:
537 def queue_sink(queue):
539 >>> q = Queue.Queue()
540 >>> qs = queue_sink(q)
543 >>> qs.throw(RuntimeError, "Goodbye")
546 >>> print [i for i in _flush_queue(q)]
547 [(None, 'Hello'), (None, 'World'), (<type 'exceptions.RuntimeError'>, 'Goodbye'), (None, 'Meh'), (<type 'exceptions.GeneratorExit'>, None)]
552 queue.put((None, item))
553 except StandardError, e:
554 queue.put((e.__class__, e.message))
555 except GeneratorExit:
556 queue.put((GeneratorExit, None))
560 def decode_item(item, target):
564 elif item[0] is GeneratorExit:
568 target.throw(item[0], item[1])
572 def queue_source(queue, target):
574 >>> q = Queue.Queue()
578 ... (GeneratorExit, None),
581 >>> qs = queue_source(q, printer_sink())
588 isDone = decode_item(item, target)
591 def threaded_stage(target, thread_factory = threading.Thread):
592 messages = Queue.Queue()
594 run_source = functools.partial(queue_source, messages, target)
595 thread_factory(target=run_source).start()
597 # Sink running in current thread
598 return functools.partial(queue_sink, messages)
606 pickle.dump((None, item), f)
607 except StandardError, e:
608 pickle.dump((e.__class__, e.message), f)
609 except GeneratorExit:
610 pickle.dump((GeneratorExit, ), f)
612 except StopIteration:
617 def pickle_source(f, target):
621 item = pickle.load(f)
622 isDone = decode_item(item, target)
627 class EventHandler(object, xml.sax.ContentHandler):
633 def __init__(self, target):
634 object.__init__(self)
635 xml.sax.ContentHandler.__init__(self)
636 self._target = target
638 def startElement(self, name, attrs):
639 self._target.send((self.START, (name, attrs._attrs)))
641 def characters(self, text):
642 self._target.send((self.TEXT, text))
644 def endElement(self, name):
645 self._target.send((self.END, name))
648 def expat_parse(f, target):
649 parser = xml.parsers.expat.ParserCreate()
650 parser.buffer_size = 65536
651 parser.buffer_text = True
652 parser.returns_unicode = False
653 parser.StartElementHandler = lambda name, attrs: target.send(('start', (name, attrs)))
654 parser.EndElementHandler = lambda name: target.send(('end', name))
655 parser.CharacterDataHandler = lambda data: target.send(('text', data))
659 def gtk_source(widget, signalName, target):
662 >>> widget = gtk.Button("Source")
663 >>> pr = printer_sink("%r")
664 >>> gtk_source(widget, "clicked", pr)
667 def on_signal(*args):
668 targetArgs = [signalName]
669 targetArgs.extend(args)
670 target.send(targetArgs)
672 return widget.connect(signalName, on_signal)
674 if __name__ == "__main__":