5 * Pull pipelining (iterators)
6 * Push pipelining (coroutines)
7 * State machines (coroutines)
8 * "Cooperative multitasking" (coroutines)
9 * Algorithm -> Object transform for cohesiveness (for example context managers) (coroutines)
12 * When should a stage pass on exceptions or have it thrown within it?
13 * When should a stage pass on GeneratorExits?
14 * Is there a way to either turn a push generator into a iterator or to use
15 comprehensions syntax for push generators (I doubt it)
16 * When should the stage try and send data in both directions
17 * Since pull generators (generators), push generators (coroutines), subroutines, and coroutines are all coroutines, maybe we should rename the push generators to not confuse them, like signals/slots? and then refer to two-way generators as coroutines
18 ** If so, make s* and co* implementation of functions
27 import xml.parsers.expat
33 ... def grep_sink(pattern):
34 ... print "Looking for %s" % pattern
37 ... if pattern in line:
39 >>> g = grep_sink("python")
41 >>> g.send("Yeah but no but yeah but no")
42 >>> g.send("A series of tubes")
43 >>> g.send("python generators rock!")
44 python generators rock!
48 @functools.wraps(func)
49 def start(*args, **kwargs):
50 cr = func(*args, **kwargs)
58 def printer_sink(format = "%s"):
60 >>> pr = printer_sink("%r")
67 >>> p = printer_sink()
72 >>> # p.throw(RuntimeError, "Goodbye")
78 print format % (item, )
84 Good for uses like with cochain to pick up any slack
90 def itr_source(itr, target):
92 >>> itr_source(xrange(2), printer_sink())
101 def cofilter(predicate, target):
103 >>> p = printer_sink()
104 >>> cf = cofilter(None, p)
117 >>> # cf.throw(RuntimeError, "Goodbye")
122 if predicate is None:
130 except StandardError, e:
131 target.throw(e.__class__, e.message)
135 def comap(function, target):
137 >>> p = printer_sink()
138 >>> cm = comap(lambda x: x+1, p)
145 >>> # cm.throw(RuntimeError, "Goodbye")
152 mappedItem = function(item)
153 target.send(mappedItem)
156 def func_sink(function):
157 return comap(function, null_sink())
164 >>> apps = append_sink(l)
177 def last_n_sink(l, n = 1):
180 >>> lns = last_n_sink(l)
190 extraCount = len(l) - n + 1
197 def coreduce(target, function, initializer = None):
199 >>> reduceResult = []
200 >>> lns = last_n_sink(reduceResult)
201 >>> cr = coreduce(lns, lambda x, y: x + y, 0)
205 >>> print reduceResult
207 >>> cr = coreduce(lns, lambda x, y: x + y)
211 >>> print reduceResult
215 cumulativeRef = initializer
218 if isFirst and initializer is None:
221 cumulativeRef = function(cumulativeRef, item)
222 target.send(cumulativeRef)
229 Takes a sequence of coroutines and sends the received items to all of them
231 >>> ct = cotee((printer_sink("1 %s"), printer_sink("2 %s")))
238 >>> # ct.throw(RuntimeError, "Goodbye")
245 for target in targets:
247 except StandardError, e:
248 for target in targets:
249 target.throw(e.__class__, e.message)
255 >>> ct.register_sink(printer_sink("1 %s"))
256 >>> ct.register_sink(printer_sink("2 %s"))
257 >>> ct.stage.send("Hello")
260 >>> ct.stage.send("World")
263 >>> ct.register_sink(printer_sink("3 %s"))
264 >>> ct.stage.send("Foo")
268 >>> # ct.stage.throw(RuntimeError, "Goodbye")
269 >>> # ct.stage.send("Meh")
270 >>> # ct.stage.close()
274 self.stage = self._stage()
277 def register_sink(self, sink):
278 self._targets.append(sink)
280 def unregister_sink(self, sink):
281 self._targets.remove(sink)
284 self.stage = self._stage()
291 for target in self._targets:
293 except StandardError, e:
294 for target in self._targets:
295 target.throw(e.__class__, e.message)
298 def _flush_queue(queue):
299 while not queue.empty():
304 def cocount(target, start = 0):
306 >>> cc = cocount(printer_sink("%s"))
316 for i in itertools.count(start):
322 def coenumerate(target, start = 0):
324 >>> ce = coenumerate(printer_sink("%r"))
334 for i in itertools.count(start):
336 decoratedItem = i, item
337 target.send(decoratedItem)
341 def corepeat(target, elem):
343 >>> cr = corepeat(printer_sink("%s"), "Hello World")
359 def cointercept(target, elems):
361 >>> cr = cointercept(printer_sink("%s"), [1, 2, 3, 4])
371 Traceback (most recent call last):
372 File "/usr/lib/python2.5/doctest.py", line 1228, in __run
373 compileflags, 1) in test.globs
374 File "<doctest __main__.cointercept[5]>", line 1, in <module>
385 def codropwhile(target, pred):
387 >>> cdw = codropwhile(printer_sink("%s"), lambda x: x)
388 >>> cdw.send([0, 1, 2])
392 >>> cdw.send([0, 1, 2])
410 def cotakewhile(target, pred):
412 >>> ctw = cotakewhile(printer_sink("%s"), lambda x: x)
413 >>> ctw.send([0, 1, 2])
420 >>> ctw.send([0, 1, 2])
435 def coslice(target, lower, upper):
437 >>> cs = coslice(printer_sink("%r"), 3, 5)
448 for i in xrange(lower):
450 for i in xrange(upper - lower):
458 def cochain(targets):
460 >>> cr = cointercept(printer_sink("good %s"), [1, 2, 3, 4])
461 >>> cc = cochain([cr, printer_sink("end %s")])
474 for target in targets:
482 except StopIteration:
487 def queue_sink(queue):
489 >>> q = Queue.Queue()
490 >>> qs = queue_sink(q)
493 >>> qs.throw(RuntimeError, "Goodbye")
496 >>> print [i for i in _flush_queue(q)]
497 [(None, 'Hello'), (None, 'World'), (<type 'exceptions.RuntimeError'>, 'Goodbye'), (None, 'Meh'), (<type 'exceptions.GeneratorExit'>, None)]
502 queue.put((None, item))
503 except StandardError, e:
504 queue.put((e.__class__, e.message))
505 except GeneratorExit:
506 queue.put((GeneratorExit, None))
510 def decode_item(item, target):
514 elif item[0] is GeneratorExit:
518 target.throw(item[0], item[1])
522 def queue_source(queue, target):
524 >>> q = Queue.Queue()
528 ... (GeneratorExit, None),
531 >>> qs = queue_source(q, printer_sink())
538 isDone = decode_item(item, target)
541 def threaded_stage(target, thread_factory = threading.Thread):
542 messages = Queue.Queue()
544 run_source = functools.partial(queue_source, messages, target)
545 thread_factory(target=run_source).start()
547 # Sink running in current thread
548 return functools.partial(queue_sink, messages)
556 pickle.dump((None, item), f)
557 except StandardError, e:
558 pickle.dump((e.__class__, e.message), f)
559 except GeneratorExit:
560 pickle.dump((GeneratorExit, ), f)
562 except StopIteration:
567 def pickle_source(f, target):
571 item = pickle.load(f)
572 isDone = decode_item(item, target)
577 class EventHandler(object, xml.sax.ContentHandler):
583 def __init__(self, target):
584 object.__init__(self)
585 xml.sax.ContentHandler.__init__(self)
586 self._target = target
588 def startElement(self, name, attrs):
589 self._target.send((self.START, (name, attrs._attrs)))
591 def characters(self, text):
592 self._target.send((self.TEXT, text))
594 def endElement(self, name):
595 self._target.send((self.END, name))
598 def expat_parse(f, target):
599 parser = xml.parsers.expat.ParserCreate()
600 parser.buffer_size = 65536
601 parser.buffer_text = True
602 parser.returns_unicode = False
603 parser.StartElementHandler = lambda name, attrs: target.send(('start', (name, attrs)))
604 parser.EndElementHandler = lambda name: target.send(('end', name))
605 parser.CharacterDataHandler = lambda data: target.send(('text', data))
609 if __name__ == "__main__":