1 #!/usr/bin/env python2.5
3 # Copyright (c) 2011 Neal H. Walfield <neal@walfield.org>
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
27 sys.stdout.write(*args)
28 sys.stdout.write("\n")
30 # The default priority. Like nice(), a smaller numeric priority
31 # corresponds to a higher priority class.
34 class JobRunner(threading.Thread):
35 def __init__(self, job_manager):
36 threading.Thread.__init__(self)
37 self.job_manager = job_manager
41 self.job_manager.lock.acquire ()
43 while (self.job_manager.pause == 0
44 and not self.job_manager.do_quit
45 and (len (self.job_manager.threads)
46 <= self.job_manager.num_threads)):
48 _, key, job = heapq.heappop (self.job_manager.queue)
53 self.job_manager.in_progress.append (key)
54 self.job_manager.lock.release ()
60 except KeyboardInterrupt:
61 # This is handled below and doesn't require a
65 print ("Executing job %s (%s) from thread %s: %s"
66 % (str (key), str (job),
67 threading.currentThread(),
68 traceback.format_exc ()))
70 self.job_manager.lock.acquire ()
73 assert key in self.job_manager.in_progress
76 self.job_manager.in_progress.remove (key)
80 debug("Finished executing job %s (%s)" % (key, job,))
82 self.job_manager._stats_hooks_run ({'job':job, 'key':key})
83 except KeyboardInterrupt:
84 debug("%s: KeyboardInterrupt" % threading.currentThread())
85 thread.interrupt_main()
86 debug("%s: Forwarded KeyboardInterrupt to main thread"
87 % threading.currentThread())
90 self.job_manager.lock.release ()
92 assert self in self.job_manager.threads
93 self.job_manager.threads.remove (self)
95 debug ("Job runner %s (%d left) exiting."
96 % (threading.currentThread(),
97 len (self.job_manager.threads)))
100 def JobManager(start=False):
102 Return the job manager instance. The job manager will not start
103 executing jobs until this is called with start set to True. Note:
104 you can still queue jobs.
109 if start and not _jm.started:
112 _jm._stats_hooks_run ()
117 class _JobManager(object):
118 def __init__(self, started=False, num_threads=4):
120 Initialize the job manager.
122 If started is false, jobs may be queued, but jobs will not be
123 started until start() is called.
125 # A reentrant lock so that a job runner can call stat without
127 self.lock = threading.RLock()
129 # If we can start executing jobs.
130 self.started = started
132 # The maximum number of threads to use for executing jobs.
133 self._num_threads = num_threads
135 # List of jobs (priority, key, job) that are queued for
138 # List of keys of the jobs that are being executed.
139 self.in_progress = []
143 # If 0, jobs may execute, otherwise, job execution is paused.
146 # The total number of jobs that this manager ever executed.
149 # A list of status hooks to execute when the stats change.
150 self._stats_hooks = []
151 self._current_stats = self.stats ()
156 def wrapper(*args, **kwargs):
160 return f(*args, **kwargs)
165 def get_num_threads(self):
166 return self._num_threads
167 def set_num_threads(self, value):
168 self._num_threads = value
170 num_threads = property(get_num_threads, set_num_threads)
175 Start executing jobs.
180 self._stats_hooks_run ()
186 Ensure that there are enough job runners for the number of
190 debug("%s.quit called, not creating new threads."
191 % self.__class__.__name__)
195 # Job execution is paused. Don't start any new threads.
196 debug("%s.tickle(): Not doing anything: paused"
197 % (self.__class__.__name__))
200 debug("%s.tickle: Have %d threads (can start %d); %d jobs queued"
201 % (self.__class__.__name__,
202 len (self.threads), self.num_threads, len (self.queue)))
203 if len (self.threads) < self.num_threads:
204 for _ in range (min (len (self.queue),
205 self.num_threads - len (self.threads))):
206 thread = JobRunner (self)
207 # Setting threads as daemons means faster shutdown
208 # when the main thread exists, but it results in
209 # exceptions and occassional setfaults.
210 # thread.setDaemon(True)
211 self.threads.append (thread)
213 debug("Now have %d threads" % len (self.threads))
216 def execute(self, job, key=None, priority=default_priority):
218 Enqueue a job for execution. job is a function to execute.
219 If key is not None, the job is only enqueued if there is no
220 job that is inprogress or enqueued with the same key.
221 priority is the job's priority. Like nice(), a smaller
222 numeric priority corresponds to a higher priority class. Jobs
223 are executed highest priority first, in the order that they
227 debug("%s.quit called, not enqueuing new jobs."
228 % self.__class__.__name__)
231 if key in self.in_progress:
233 for item in self.queue:
235 if item[0][0] < priority:
237 item[0][0] = priority
238 self.queue = heapq.heapify (self.queue)
241 # To ensure that jobs with the same priority are executed
242 # in the order they are added, we set the priority to
243 # [priority, next (monotomic counter)].
245 heapq.heappush (self.queue, [[priority, self.jobs], key, job])
248 self._stats_hooks_run ()
251 debug("%s not initialized. delaying execution of %s (%s)"
252 % (self.__class__.__name__, key, str (job),))
257 Increasement the pause count. When the pause count is greater
258 than 0, job execution is suspended.
263 self._stats_hooks_run ()
268 Decrement the pause count. If the pause count is greater than
269 0 and this decrement brings it to 0, enqueued jobs are
272 assert self.pause > 0
274 if not self.paused():
275 self._stats_hooks_run ()
281 Returns whether job execution is paused.
283 return self.pause > 0
288 Cancel any pending jobs.
291 self._stats_hooks_run ()
300 Return a dictionary consisting of:
302 - 'paused': whether execution is paused
303 - 'jobs': the total number of jobs this manager has
304 executed, is executing or are queued
305 - 'jobs-completed': the numer of jobs that have completed
306 - 'jobs-in-progress': the number of jobs in progress
307 - 'jobs-queued': the number of jobs currently queued
309 return {'paused': self.paused(),
312 self.jobs - len (self.in_progress) - len (self.queue),
313 'jobs-in-progress': len (self.in_progress),
314 'jobs-queued': len (self.queue)
317 def stats_hook_register(self, func, *args, **kwargs):
319 Registers a function to be called when the job status changes.
320 Passed the following parameters:
322 - the JobManager instance.
323 - the previous stats (as returned by stats)
325 - the job that was completed (or None)
327 Note: the hook may not be run in the main thread!
331 mainthread = kwargs['run_in_main_thread']
332 del kwargs['run_in_main_thread']
335 self._stats_hooks.append ([func, mainthread, args, kwargs])
337 def _stats_hooks_run(self, completed_job=None):
341 # if not self._stats_hooks:
346 old_stats = self._current_stats
347 self._current_stats = self.stats ()
348 current_stats = self._current_stats
352 debug("%s -> %s" % (str (old_stats), str (current_stats)))
354 for (f, run_in_main_thread, args, kwargs) in self._stats_hooks:
355 if run_in_main_thread:
356 debug("JobManager._stats_hooks_run: Running %s in main thread"
359 f, self, old_stats, current_stats, completed_job,
360 async=True, *args, **kwargs)
362 debug("JobManager._stats_hooks_run: Running %s in any thread"
364 f(self, old_stats, current_stats, completed_job,