1 #!/usr/bin/env python2.5
3 # Copyright (c) 2011 Neal H. Walfield <neal@walfield.org>
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
25 logger = logging.getLogger(__name__)
29 logger.debug(' '.join(args))
31 # The default priority. Like nice(), a smaller numeric priority
32 # corresponds to a higher priority class.
35 class JobRunner(threading.Thread):
36 def __init__(self, job_manager):
37 threading.Thread.__init__(self)
38 self.job_manager = job_manager
42 self.job_manager.lock.acquire ()
44 while (self.job_manager.pause == 0
45 and not self.job_manager.do_quit
46 and (len (self.job_manager.threads)
47 <= self.job_manager.num_threads)):
49 _, key, job = heapq.heappop (self.job_manager.queue)
54 self.job_manager.in_progress.append (key)
55 self.job_manager.lock.release ()
61 except KeyboardInterrupt:
62 # This is handled below and doesn't require a
66 print ("Executing job %s (%s) from thread %s: %s"
67 % (str (key), str (job),
68 threading.currentThread(),
69 traceback.format_exc ()))
71 self.job_manager.lock.acquire ()
74 assert key in self.job_manager.in_progress
77 self.job_manager.in_progress.remove (key)
81 debug("Finished executing job %s (%s)" % (key, job,))
83 self.job_manager._stats_hooks_run ({'job':job, 'key':key})
84 except KeyboardInterrupt:
85 debug("%s: KeyboardInterrupt" % threading.currentThread())
86 thread.interrupt_main()
87 debug("%s: Forwarded KeyboardInterrupt to main thread"
88 % threading.currentThread())
91 self.job_manager.lock.release ()
93 assert self in self.job_manager.threads
94 self.job_manager.threads.remove (self)
96 debug ("Job runner %s (%d left) exiting."
97 % (threading.currentThread(),
98 len (self.job_manager.threads)))
101 def JobManager(start=False):
103 Return the job manager instance. The job manager will not start
104 executing jobs until this is called with start set to True. Note:
105 you can still queue jobs.
110 if start and not _jm.started:
113 _jm._stats_hooks_run ()
118 class _JobManager(object):
119 def __init__(self, started=False, num_threads=4):
121 Initialize the job manager.
123 If started is false, jobs may be queued, but jobs will not be
124 started until start() is called.
126 # A reentrant lock so that a job runner can call stat without
128 self.lock = threading.RLock()
130 # If we can start executing jobs.
131 self.started = started
133 # The maximum number of threads to use for executing jobs.
134 self._num_threads = num_threads
136 # List of jobs (priority, key, job) that are queued for
139 # List of keys of the jobs that are being executed.
140 self.in_progress = []
144 # If 0, jobs may execute, otherwise, job execution is paused.
147 # The total number of jobs that this manager ever executed.
150 # A list of status hooks to execute when the stats change.
151 self._stats_hooks = []
152 self._current_stats = self.stats ()
157 def wrapper(*args, **kwargs):
161 return f(*args, **kwargs)
166 def get_num_threads(self):
167 return self._num_threads
168 def set_num_threads(self, value):
169 self._num_threads = value
171 num_threads = property(get_num_threads, set_num_threads)
176 Start executing jobs.
181 self._stats_hooks_run ()
187 Ensure that there are enough job runners for the number of
191 debug("%s.quit called, not creating new threads."
192 % self.__class__.__name__)
196 # Job execution is paused. Don't start any new threads.
197 debug("%s.tickle(): Not doing anything: paused"
198 % (self.__class__.__name__))
201 debug("%s.tickle: Have %d threads (can start %d); %d jobs queued"
202 % (self.__class__.__name__,
203 len (self.threads), self.num_threads, len (self.queue)))
204 if len (self.threads) < self.num_threads:
205 for _ in range (min (len (self.queue),
206 self.num_threads - len (self.threads))):
207 thread = JobRunner (self)
208 # Setting threads as daemons means faster shutdown
209 # when the main thread exists, but it results in
210 # exceptions and occassional setfaults.
211 # thread.setDaemon(True)
212 self.threads.append (thread)
214 debug("Now have %d threads" % len (self.threads))
217 def execute(self, job, key=None, priority=default_priority):
219 Enqueue a job for execution. job is a function to execute.
220 If key is not None, the job is only enqueued if there is no
221 job that is inprogress or enqueued with the same key.
222 priority is the job's priority. Like nice(), a smaller
223 numeric priority corresponds to a higher priority class. Jobs
224 are executed highest priority first, in the order that they
228 debug("%s.quit called, not enqueuing new jobs."
229 % self.__class__.__name__)
232 if key in self.in_progress:
234 for item in self.queue:
236 if item[0][0] < priority:
238 item[0][0] = priority
239 self.queue = heapq.heapify (self.queue)
242 # To ensure that jobs with the same priority are executed
243 # in the order they are added, we set the priority to
244 # [priority, next (monotomic counter)].
246 heapq.heappush (self.queue, [[priority, self.jobs], key, job])
249 self._stats_hooks_run ()
252 debug("%s not initialized. delaying execution of %s (%s)"
253 % (self.__class__.__name__, key, str (job),))
258 Increasement the pause count. When the pause count is greater
259 than 0, job execution is suspended.
264 self._stats_hooks_run ()
269 Decrement the pause count. If the pause count is greater than
270 0 and this decrement brings it to 0, enqueued jobs are
273 assert self.pause > 0
275 if not self.paused():
276 self._stats_hooks_run ()
282 Returns whether job execution is paused.
284 return self.pause > 0
289 Cancel any pending jobs.
292 self._stats_hooks_run ()
301 Return a dictionary consisting of:
303 - 'paused': whether execution is paused
304 - 'jobs': the total number of jobs this manager has
305 executed, is executing or are queued
306 - 'jobs-completed': the numer of jobs that have completed
307 - 'jobs-in-progress': the number of jobs in progress
308 - 'jobs-queued': the number of jobs currently queued
310 return {'paused': self.paused(),
313 self.jobs - len (self.in_progress) - len (self.queue),
314 'jobs-in-progress': len (self.in_progress),
315 'jobs-queued': len (self.queue)
318 def stats_hook_register(self, func, *args, **kwargs):
320 Registers a function to be called when the job status changes.
321 Passed the following parameters:
323 - the JobManager instance.
324 - the previous stats (as returned by stats)
326 - the job that was completed (or None)
328 Note: the hook may not be run in the main thread!
332 mainthread = kwargs['run_in_main_thread']
333 del kwargs['run_in_main_thread']
336 self._stats_hooks.append ([func, mainthread, args, kwargs])
338 def _stats_hooks_run(self, completed_job=None):
342 # if not self._stats_hooks:
347 old_stats = self._current_stats
348 self._current_stats = self.stats ()
349 current_stats = self._current_stats
353 debug("%s -> %s" % (str (old_stats), str (current_stats)))
355 for (f, run_in_main_thread, args, kwargs) in self._stats_hooks:
356 if run_in_main_thread:
357 debug("JobManager._stats_hooks_run: Running %s in main thread"
360 f, self, old_stats, current_stats, completed_job,
361 async=True, *args, **kwargs)
363 debug("JobManager._stats_hooks_run: Running %s in any thread"
365 f(self, old_stats, current_stats, completed_job,