Make the gtk frontend, the default frontend.
[feedingit] / src / jobmanager.py
1 #!/usr/bin/env python2.5
2
3 # Copyright (c) 2011 Neal H. Walfield <neal@walfield.org>
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 import threading
19 import thread
20 import traceback
21 import heapq
22 import sys
23 import mainthread
24
25 def debug(*args):
26     if False:
27         sys.stdout.write(*args)
28         sys.stdout.write("\n")
29
30 # The default priority.  Like nice(), a smaller numeric priority
31 # corresponds to a higher priority class.
32 default_priority = 0
33
34 class JobRunner(threading.Thread):
35     def __init__(self, job_manager):
36         threading.Thread.__init__(self)
37         self.job_manager = job_manager
38
39     def run (self):
40         have_lock = True
41         self.job_manager.lock.acquire ()
42         try:
43             while (self.job_manager.pause == 0
44                    and not self.job_manager.do_quit
45                    and (len (self.job_manager.threads)
46                         <= self.job_manager.num_threads)):
47                 try:
48                     _, key, job = heapq.heappop (self.job_manager.queue)
49                 except IndexError:
50                     return
51     
52                 try:
53                     self.job_manager.in_progress.append (key)
54                     self.job_manager.lock.release ()
55                     have_lock = False
56         
57                     # Execute the job.
58                     try:
59                         job ()
60                     except KeyboardInterrupt:
61                         # This is handled below and doesn't require a
62                         # traceback.
63                         raise
64                     except:
65                         print ("Executing job %s (%s) from thread %s: %s"
66                                % (str (key), str (job),
67                                   threading.currentThread(),
68                                   traceback.format_exc ()))
69         
70                     self.job_manager.lock.acquire ()
71                     have_lock = True
72     
73                     assert key in self.job_manager.in_progress
74                 finally:
75                     try:
76                         self.job_manager.in_progress.remove (key)
77                     except ValueError:
78                         pass
79     
80                 debug("Finished executing job %s (%s)" % (key, job,))
81     
82                 self.job_manager._stats_hooks_run ({'job':job, 'key':key})
83         except KeyboardInterrupt:
84             debug("%s: KeyboardInterrupt" % threading.currentThread())
85             thread.interrupt_main()
86             debug("%s: Forwarded KeyboardInterrupt to main thread"
87                   % threading.currentThread())
88         finally:
89             if have_lock:
90                 self.job_manager.lock.release ()
91
92             assert self in self.job_manager.threads
93             self.job_manager.threads.remove (self)
94
95             debug ("Job runner %s (%d left) exiting."
96                    % (threading.currentThread(),
97                       len (self.job_manager.threads)))
98
99 _jm = None
100 def JobManager(start=False):
101     """
102     Return the job manager instance.  The job manager will not start
103     executing jobs until this is called with start set to True.  Note:
104     you can still queue jobs.
105     """
106     global _jm
107     if _jm is None:
108         _jm = _JobManager ()
109     if start and not _jm.started:
110         _jm.started = True
111         if _jm.jobs > 0:
112             _jm._stats_hooks_run ()
113         _jm.tickle ()
114
115     return _jm
116
117 class _JobManager(object):
118     def __init__(self, started=False, num_threads=4):
119         """
120         Initialize the job manager.
121
122         If started is false, jobs may be queued, but jobs will not be
123         started until start() is called.
124         """
125         # A reentrant lock so that a job runner can call stat without
126         # dropping the lock.
127         self.lock = threading.RLock()
128
129         # If we can start executing jobs.
130         self.started = started
131
132         # The maximum number of threads to use for executing jobs.
133         self._num_threads = num_threads
134
135         # List of jobs (priority, key, job) that are queued for
136         # execution.
137         self.queue = []
138         # List of keys of the jobs that are being executed.
139         self.in_progress = []
140         # List of threads.
141         self.threads = []
142
143         # If 0, jobs may execute, otherwise, job execution is paused.
144         self.pause = 0
145
146         # The total number of jobs that this manager ever executed.
147         self.jobs = 0
148
149         # A list of status hooks to execute when the stats change.
150         self._stats_hooks = []
151         self._current_stats = self.stats ()
152
153         self.do_quit = False
154
155     def _lock(f):
156         def wrapper(*args, **kwargs):
157             self = args[0]
158             self.lock.acquire ()
159             try:
160                 return f(*args, **kwargs)
161             finally:
162                 self.lock.release()
163         return wrapper
164
165     def get_num_threads(self):
166         return self._num_threads
167     def set_num_threads(self, value):
168         self._num_threads = value
169         self.tickle ()
170     num_threads = property(get_num_threads, set_num_threads)
171
172     @_lock
173     def start(self):
174         """
175         Start executing jobs.
176         """
177         if self.started:
178             return
179         if self.jobs > 0:
180             self._stats_hooks_run ()
181         self.tickle ()
182
183     @_lock
184     def tickle(self):
185         """
186         Ensure that there are enough job runners for the number of
187         pending jobs.
188         """
189         if self.do_quit:
190             debug("%s.quit called, not creating new threads."
191                   % self.__class__.__name__)
192             return
193
194         if self.pause > 0:
195             # Job execution is paused.  Don't start any new threads.
196             debug("%s.tickle(): Not doing anything: paused"
197                   % (self.__class__.__name__))
198             return
199
200         debug("%s.tickle: Have %d threads (can start %d); %d jobs queued"
201               % (self.__class__.__name__,
202                  len (self.threads), self.num_threads, len (self.queue)))
203         if len (self.threads) < self.num_threads:
204             for _ in range (min (len (self.queue),
205                                  self.num_threads - len (self.threads))):
206                 thread = JobRunner (self)
207                 # Setting threads as daemons means faster shutdown
208                 # when the main thread exists, but it results in
209                 # exceptions and occassional setfaults.
210                 # thread.setDaemon(True)
211                 self.threads.append (thread)
212                 thread.start ()
213                 debug("Now have %d threads" % len (self.threads))
214
215     @_lock
216     def execute(self, job, key=None, priority=default_priority):
217         """
218         Enqueue a job for execution.  job is a function to execute.
219         If key is not None, the job is only enqueued if there is no
220         job that is inprogress or enqueued with the same key.
221         priority is the job's priority.  Like nice(), a smaller
222         numeric priority corresponds to a higher priority class.  Jobs
223         are executed highest priority first, in the order that they
224         were added.
225         """
226         if self.do_quit:
227             debug("%s.quit called, not enqueuing new jobs."
228                   % self.__class__.__name__)
229
230         if key is not None:
231             if key in self.in_progress:
232                 return
233             for item in self.queue:
234                 if item[1] == key:
235                     if item[0][0] < priority:
236                         # Priority raised.
237                         item[0][0] = priority
238                         self.queue = heapq.heapify (self.queue)
239                     return
240
241         # To ensure that jobs with the same priority are executed
242         # in the order they are added, we set the priority to
243         # [priority, next (monotomic counter)].
244         self.jobs += 1
245         heapq.heappush (self.queue, [[priority, self.jobs], key, job])
246
247         if self.started:
248             self._stats_hooks_run ()
249             self.tickle ()
250         else:
251             debug("%s not initialized. delaying execution of %s (%s)"
252                   % (self.__class__.__name__, key, str (job),))
253
254     @_lock
255     def pause(self):
256         """
257         Increasement the pause count.  When the pause count is greater
258         than 0, job execution is suspended.
259         """
260         self.pause += 1
261
262         if self.pause == 1:
263             self._stats_hooks_run ()
264
265     @_lock
266     def resume(self):
267         """
268         Decrement the pause count.  If the pause count is greater than
269         0 and this decrement brings it to 0, enqueued jobs are
270         resumed.
271         """
272         assert self.pause > 0
273         self.pause -= 1
274         if not self.paused():
275             self._stats_hooks_run ()
276             self.tickle ()
277
278     @_lock
279     def paused(self):
280         """
281         Returns whether job execution is paused.
282         """
283         return self.pause > 0
284
285     @_lock
286     def cancel(self):
287         """
288         Cancel any pending jobs.
289         """
290         self.queue = []
291         self._stats_hooks_run ()
292
293     def quit(self):
294         self.cancel ()
295         self.do_quit = True
296
297     @_lock
298     def stats(self):
299         """
300         Return a dictionary consisting of:
301
302           - 'paused': whether execution is paused
303           - 'jobs': the total number of jobs this manager has
304               executed, is executing or are queued
305           - 'jobs-completed': the numer of jobs that have completed
306           - 'jobs-in-progress': the number of jobs in progress
307           - 'jobs-queued': the number of jobs currently queued
308         """
309         return {'paused': self.paused(),
310                 'jobs': self.jobs,
311                 'jobs-completed':
312                   self.jobs - len (self.in_progress) - len (self.queue),
313                 'jobs-in-progress': len (self.in_progress),
314                 'jobs-queued': len (self.queue)
315                 }
316
317     def stats_hook_register(self, func, *args, **kwargs):
318         """
319         Registers a function to be called when the job status changes.
320         Passed the following parameters:
321
322           - the JobManager instance.
323           - the previous stats (as returned by stats)
324           - the current stats
325           - the job that was completed (or None)
326
327         Note: the hook may not be run in the main thread!
328         """
329         mainthread=False
330         try:
331             mainthread = kwargs['run_in_main_thread']
332             del kwargs['run_in_main_thread']
333         except KeyError:
334             pass
335         self._stats_hooks.append ([func, mainthread, args, kwargs])
336
337     def _stats_hooks_run(self, completed_job=None):
338         """
339         Run the stats hooks.
340         """
341         # if not self._stats_hooks:
342         #     return
343
344         self.lock.acquire ()
345         try:
346             old_stats = self._current_stats
347             self._current_stats = self.stats ()
348             current_stats = self._current_stats
349         finally:
350             self.lock.release ()
351
352         debug("%s -> %s" % (str (old_stats), str (current_stats)))
353
354         for (f, run_in_main_thread, args, kwargs) in self._stats_hooks:
355             if run_in_main_thread:
356                 debug("JobManager._stats_hooks_run: Running %s in main thread"
357                       % f)
358                 mainthread.execute(
359                     f, self, old_stats, current_stats, completed_job,
360                     async=True, *args, **kwargs)
361             else:
362                 debug("JobManager._stats_hooks_run: Running %s in any thread"
363                       % f)
364                 f(self, old_stats, current_stats, completed_job,
365                   *args, **kwargs)