1 #!/usr/bin/env python2.5
3 # Copyright (c) 2011 Neal H. Walfield <neal@walfield.org>
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
27 sys.stdout.write(*args)
28 sys.stdout.write("\n")
30 # The default priority. Like nice(), a smaller numeric priority
31 # corresponds to a higher priority class.
34 class JobRunner(threading.Thread):
35 def __init__(self, job_manager):
36 threading.Thread.__init__(self)
37 self.job_manager = job_manager
41 self.job_manager.lock.acquire ()
43 while self.job_manager.pause == 0 and not self.job_manager.do_quit:
45 _, key, job = heapq.heappop (self.job_manager.queue)
50 self.job_manager.in_progress.append (key)
51 self.job_manager.lock.release ()
57 except KeyboardInterrupt:
58 # This is handled below and doesn't require a
62 print ("Executing job %s (%s) from thread %s: %s"
63 % (str (key), str (job),
64 threading.currentThread(),
65 traceback.format_exc ()))
67 self.job_manager.lock.acquire ()
70 assert key in self.job_manager.in_progress
73 self.job_manager.in_progress.remove (key)
77 debug("Finished executing job %s (%s)" % (key, job,))
79 self.job_manager._stats_hooks_run ({'job':job, 'key':key})
80 except KeyboardInterrupt:
81 debug("%s: KeyboardInterrupt" % threading.currentThread())
82 thread.interrupt_main()
83 debug("%s: Forwarded KeyboardInterrupt to main thread"
84 % threading.currentThread())
87 self.job_manager.lock.release ()
89 assert self in self.job_manager.threads
90 self.job_manager.threads.remove (self)
92 debug ("Job runner %s (%d left) exiting."
93 % (threading.currentThread(),
94 len (self.job_manager.threads)))
97 def JobManager(start=False):
99 Return the job manager instance. The job manager will not start
100 executing jobs until this is called with start set to True. Note:
101 you can still queue jobs.
106 if start and not _jm.started:
109 _jm._stats_hooks_run ()
114 class _JobManager(object):
115 def __init__(self, started=False, num_threads=4):
117 Initialize the job manager.
119 If started is false, jobs may be queued, but jobs will not be
120 started until start() is called.
122 # A reentrant lock so that a job runner can call stat without
124 self.lock = threading.RLock()
126 # If we can start executing jobs.
127 self.started = started
129 # The maximum number of threads to use for executing jobs.
130 self.num_threads = num_threads
132 # List of jobs (priority, key, job) that are queued for
135 # List of keys of the jobs that are being executed.
136 self.in_progress = []
140 # If 0, jobs may execute, otherwise, job execution is paused.
143 # The total number of jobs that this manager ever executed.
146 # A list of status hooks to execute when the stats change.
147 self._stats_hooks = []
148 self._current_stats = self.stats ()
153 def wrapper(*args, **kwargs):
157 return f(*args, **kwargs)
165 Start executing jobs.
170 self._stats_hooks_run ()
176 Ensure that there are enough job runners for the number of
180 debug("%s.quit called, not creating new threads."
181 % self.__class__.__name__)
185 # Job execution is paused. Don't start any new threads.
186 debug("%s.tickle(): Not doing anything: paused"
187 % (self.__class__.__name__))
190 debug("%s.tickle: Have %d threads (can start %d); %d jobs queued"
191 % (self.__class__.__name__,
192 len (self.threads), self.num_threads, len (self.queue)))
193 if len (self.threads) < self.num_threads:
194 for _ in range (min (len (self.queue),
195 self.num_threads - len (self.threads))):
196 thread = JobRunner (self)
197 # Setting threads as daemons means faster shutdown
198 # when the main thread exists, but it results in
199 # exceptions and occassional setfaults.
200 # thread.setDaemon(True)
201 self.threads.append (thread)
203 debug("Now have %d threads" % len (self.threads))
206 def execute(self, job, key=None, priority=default_priority):
208 Enqueue a job for execution. job is a function to execute.
209 If key is not None, the job is only enqueued if there is no
210 job that is inprogress or enqueued with the same key.
211 priority is the job's priority. Like nice(), a smaller
212 numeric priority corresponds to a higher priority class. Jobs
213 are executed highest priority first, in the order that they
217 debug("%s.quit called, not enqueuing new jobs."
218 % self.__class__.__name__)
221 if key in self.in_progress:
223 for item in self.queue:
225 if item[0][0] < priority:
227 item[0][0] = priority
228 self.queue = heapq.heapify (self.queue)
231 # To ensure that jobs with the same priority are executed
232 # in the order they are added, we set the priority to
233 # [priority, next (monotomic counter)].
235 heapq.heappush (self.queue, [[priority, self.jobs], key, job])
238 self._stats_hooks_run ()
241 debug("%s not initialized. delaying execution of %s (%s)"
242 % (self.__class__.__name__, key, str (job),))
247 Increasement the pause count. When the pause count is greater
248 than 0, job execution is suspended.
253 self._stats_hooks_run ()
258 Decrement the pause count. If the pause count is greater than
259 0 and this decrement brings it to 0, enqueued jobs are
262 assert self.pause > 0
264 if not self.paused():
265 self._stats_hooks_run ()
271 Returns whether job execution is paused.
273 return self.pause > 0
278 Cancel any pending jobs.
281 self._stats_hooks_run ()
290 Return a dictionary consisting of:
292 - 'paused': whether execution is paused
293 - 'jobs': the total number of jobs this manager has
294 executed, is executing or are queued
295 - 'jobs-completed': the numer of jobs that have completed
296 - 'jobs-in-progress': the number of jobs in progress
297 - 'jobs-queued': the number of jobs currently queued
299 return {'paused': self.paused(),
302 self.jobs - len (self.in_progress) - len (self.queue),
303 'jobs-in-progress': len (self.in_progress),
304 'jobs-queued': len (self.queue)
307 def stats_hook_register(self, func, *args, **kwargs):
309 Registers a function to be called when the job status changes.
310 Passed the following parameters:
312 - the JobManager instance.
313 - the previous stats (as returned by stats)
315 - the job that was completed (or None)
317 Note: the hook may not be run in the main thread!
321 mainthread = kwargs['run_in_main_thread']
322 del kwargs['run_in_main_thread']
325 self._stats_hooks.append ([func, mainthread, args, kwargs])
327 def _stats_hooks_run(self, completed_job=None):
331 # if not self._stats_hooks:
336 old_stats = self._current_stats
337 self._current_stats = self.stats ()
338 current_stats = self._current_stats
342 debug("%s -> %s" % (str (old_stats), str (current_stats)))
344 for (f, run_in_main_thread, args, kwargs) in self._stats_hooks:
345 if run_in_main_thread:
346 debug("JobManager._stats_hooks_run: Running %s in main thread"
349 f, self, old_stats, current_stats, completed_job,
350 async=True, *args, **kwargs)
352 debug("JobManager._stats_hooks_run: Running %s in any thread"
354 f(self, old_stats, current_stats, completed_job,