d4f9c913197dd052e5e8c317235a675b182a4cf6
[gc-dialer] / hand_tests / threading.py
1 #!/usr/bin/env python
2
3 from __future__ import with_statement
4 from __future__ import division
5
6 import time
7
8 import PyQt4.QtCore as QtCore
9
10
11 class QThread44(QtCore.QThread):
12         """
13         This is to imitate QThread in Qt 4.4+ for when running on older version
14         See http://labs.trolltech.com/blogs/2010/06/17/youre-doing-it-wrong
15         (On Lucid I have Qt 4.7 and this is still an issue)
16         """
17
18         def __init__(self, parent = None):
19                 QtCore.QThread.__init__(self, parent)
20
21         def run(self):
22                 self.exec_()
23
24
25 class Producer(QtCore.QObject):
26
27         data = QtCore.pyqtSignal(int)
28         done = QtCore.pyqtSignal()
29
30         def __init__(self):
31                 QtCore.QObject.__init__(self)
32
33         @QtCore.pyqtSlot()
34         def process(self):
35                 print "Starting producer"
36                 for i in xrange(10):
37                         self.data.emit(i)
38                         time.sleep(0.1)
39                 self.done.emit()
40
41
42 class Consumer(QtCore.QObject):
43
44         def __init__(self):
45                 QtCore.QObject.__init__(self)
46
47         @QtCore.pyqtSlot()
48         def process(self):
49                 print "Starting consumer"
50
51         @QtCore.pyqtSlot()
52         def print_done(self):
53                 print "Done"
54
55         @QtCore.pyqtSlot(int)
56         def print_data(self, i):
57                 print i
58
59
60 if __name__ == "__main__":
61         app = QtCore.QCoreApplication([])
62
63         producerThread = QThread44()
64         producer = Producer()
65         producer.moveToThread(producerThread)
66         producerThread.started.connect(producer.process)
67
68         consumerThread = QThread44()
69         consumer = Consumer()
70         consumer.moveToThread(consumerThread)
71         consumerThread.started.connect(consumer.process)
72
73         producer.data.connect(consumer.print_data)
74         producer.done.connect(consumer.print_done)
75
76         @QtCore.pyqtSlot()
77         def producer_done():
78                 print "Shutting down"
79                 producerThread.quit()
80                 consumerThread.quit()
81                 print "Done"
82         producer.done.connect(producer_done)
83
84         count = [0]
85
86         @QtCore.pyqtSlot()
87         def thread_done():
88                 print "Thread done"
89                 count[0] += 1
90                 if count[0] == 2:
91                         print "Quitting"
92                         app.exit(0)
93                 print "Done"
94         producerThread.finished.connect(thread_done)
95         consumerThread.finished.connect(thread_done)
96
97         producerThread.start()
98         consumerThread.start()
99         print "Status", app.exec_()