+#!/usr/bin/env python2.5
+
+# Copyright (c) 2011 Neal H. Walfield <neal@walfield.org>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import threading
+import thread
+import traceback
+import heapq
+import sys
+import mainthread
+
+def debug(*args):
+ if False:
+ sys.stdout.write(*args)
+ sys.stdout.write("\n")
+
+# The default priority. Like nice(), a smaller numeric priority
+# corresponds to a higher priority class.
+default_priority = 0
+
+class JobRunner(threading.Thread):
+ def __init__(self, job_manager):
+ threading.Thread.__init__(self)
+ self.job_manager = job_manager
+
+ def run (self):
+ have_lock = True
+ self.job_manager.lock.acquire ()
+ try:
+ while self.job_manager.pause == 0 and not self.job_manager.do_quit:
+ try:
+ _, key, job = heapq.heappop (self.job_manager.queue)
+ except IndexError:
+ return
+
+ try:
+ self.job_manager.in_progress.append (key)
+ self.job_manager.lock.release ()
+ have_lock = False
+
+ # Execute the job.
+ try:
+ job ()
+ except KeyboardInterrupt:
+ # This is handled below and doesn't require a
+ # traceback.
+ raise
+ except:
+ print ("Executing job %s (%s) from thread %s: %s"
+ % (str (key), str (job),
+ threading.currentThread(),
+ traceback.format_exc ()))
+
+ self.job_manager.lock.acquire ()
+ have_lock = True
+
+ assert key in self.job_manager.in_progress
+ finally:
+ try:
+ self.job_manager.in_progress.remove (key)
+ except ValueError:
+ pass
+
+ debug("Finished executing job %s (%s)" % (key, job,))
+
+ self.job_manager._stats_hooks_run ({'job':job, 'key':key})
+ except KeyboardInterrupt:
+ debug("%s: KeyboardInterrupt" % threading.currentThread())
+ thread.interrupt_main()
+ debug("%s: Forwarded KeyboardInterrupt to main thread"
+ % threading.currentThread())
+ finally:
+ if have_lock:
+ self.job_manager.lock.release ()
+
+ assert self in self.job_manager.threads
+ self.job_manager.threads.remove (self)
+
+ debug ("Job runner %s (%d left) exiting."
+ % (threading.currentThread(),
+ len (self.job_manager.threads)))
+
+_jm = None
+def JobManager(start=False):
+ """
+ Return the job manager instance. The job manager will not start
+ executing jobs until this is called with start set to True. Note:
+ you can still queue jobs.
+ """
+ global _jm
+ if _jm is None:
+ _jm = _JobManager ()
+ if start and not _jm.started:
+ _jm.started = True
+ if _jm.jobs > 0:
+ _jm._stats_hooks_run ()
+ _jm.tickle ()
+
+ return _jm
+
+class _JobManager(object):
+ def __init__(self, started=False, num_threads=4):
+ """
+ Initialize the job manager.
+
+ If started is false, jobs may be queued, but jobs will not be
+ started until start() is called.
+ """
+ # A reentrant lock so that a job runner can call stat without
+ # dropping the lock.
+ self.lock = threading.RLock()
+
+ # If we can start executing jobs.
+ self.started = started
+
+ # The maximum number of threads to use for executing jobs.
+ self.num_threads = num_threads
+
+ # List of jobs (priority, key, job) that are queued for
+ # execution.
+ self.queue = []
+ # List of keys of the jobs that are being executed.
+ self.in_progress = []
+ # List of threads.
+ self.threads = []
+
+ # If 0, jobs may execute, otherwise, job execution is paused.
+ self.pause = 0
+
+ # The total number of jobs that this manager ever executed.
+ self.jobs = 0
+
+ # A list of status hooks to execute when the stats change.
+ self._stats_hooks = []
+ self._current_stats = self.stats ()
+
+ self.do_quit = False
+
+ def _lock(f):
+ def wrapper(*args, **kwargs):
+ self = args[0]
+ self.lock.acquire ()
+ try:
+ return f(*args, **kwargs)
+ finally:
+ self.lock.release()
+ return wrapper
+
+ @_lock
+ def start(self):
+ """
+ Start executing jobs.
+ """
+ if self.started:
+ return
+ if self.jobs > 0:
+ self._stats_hooks_run ()
+ self.tickle ()
+
+ @_lock
+ def tickle(self):
+ """
+ Ensure that there are enough job runners for the number of
+ pending jobs.
+ """
+ if self.do_quit:
+ debug("%s.quit called, not creating new threads."
+ % self.__class__.__name__)
+ return
+
+ if self.pause > 0:
+ # Job execution is paused. Don't start any new threads.
+ debug("%s.tickle(): Not doing anything: paused"
+ % (self.__class__.__name__))
+ return
+
+ debug("%s.tickle: Have %d threads (can start %d); %d jobs queued"
+ % (self.__class__.__name__,
+ len (self.threads), self.num_threads, len (self.queue)))
+ if len (self.threads) < self.num_threads:
+ for _ in range (min (len (self.queue),
+ self.num_threads - len (self.threads))):
+ thread = JobRunner (self)
+ # Setting threads as daemons means faster shutdown
+ # when the main thread exists, but it results in
+ # exceptions and occassional setfaults.
+ # thread.setDaemon(True)
+ self.threads.append (thread)
+ thread.start ()
+ debug("Now have %d threads" % len (self.threads))
+
+ @_lock
+ def execute(self, job, key=None, priority=default_priority):
+ """
+ Enqueue a job for execution. job is a function to execute.
+ If key is not None, the job is only enqueued if there is no
+ job that is inprogress or enqueued with the same key.
+ priority is the job's priority. Like nice(), a smaller
+ numeric priority corresponds to a higher priority class. Jobs
+ are executed highest priority first, in the order that they
+ were added.
+ """
+ if self.do_quit:
+ debug("%s.quit called, not enqueuing new jobs."
+ % self.__class__.__name__)
+
+ if key is not None:
+ if key in self.in_progress:
+ return
+ for item in self.queue:
+ if item[1] == key:
+ if item[0][0] < priority:
+ # Priority raised.
+ item[0][0] = priority
+ self.queue = heapq.heapify (self.queue)
+ return
+
+ # To ensure that jobs with the same priority are executed
+ # in the order they are added, we set the priority to
+ # [priority, next (monotomic counter)].
+ self.jobs += 1
+ heapq.heappush (self.queue, [[priority, self.jobs], key, job])
+
+ if self.started:
+ self._stats_hooks_run ()
+ self.tickle ()
+ else:
+ debug("%s not initialized. delaying execution of %s (%s)"
+ % (self.__class__.__name__, key, str (job),))
+
+ @_lock
+ def pause(self):
+ """
+ Increasement the pause count. When the pause count is greater
+ than 0, job execution is suspended.
+ """
+ self.pause += 1
+
+ if self.pause == 1:
+ self._stats_hooks_run ()
+
+ @_lock
+ def resume(self):
+ """
+ Decrement the pause count. If the pause count is greater than
+ 0 and this decrement brings it to 0, enqueued jobs are
+ resumed.
+ """
+ assert self.pause > 0
+ self.pause -= 1
+ if not self.paused():
+ self._stats_hooks_run ()
+ self.tickle ()
+
+ @_lock
+ def paused(self):
+ """
+ Returns whether job execution is paused.
+ """
+ return self.pause > 0
+
+ @_lock
+ def cancel(self):
+ """
+ Cancel any pending jobs.
+ """
+ self.queue = []
+ self._stats_hooks_run ()
+
+ def quit(self):
+ self.cancel ()
+ self.do_quit = True
+
+ @_lock
+ def stats(self):
+ """
+ Return a dictionary consisting of:
+
+ - 'paused': whether execution is paused
+ - 'jobs': the total number of jobs this manager has
+ executed, is executing or are queued
+ - 'jobs-completed': the numer of jobs that have completed
+ - 'jobs-in-progress': the number of jobs in progress
+ - 'jobs-queued': the number of jobs currently queued
+ """
+ return {'paused': self.paused(),
+ 'jobs': self.jobs,
+ 'jobs-completed':
+ self.jobs - len (self.in_progress) - len (self.queue),
+ 'jobs-in-progress': len (self.in_progress),
+ 'jobs-queued': len (self.queue)
+ }
+
+ def stats_hook_register(self, func, *args, **kwargs):
+ """
+ Registers a function to be called when the job status changes.
+ Passed the following parameters:
+
+ - the JobManager instance.
+ - the previous stats (as returned by stats)
+ - the current stats
+ - the job that was completed (or None)
+
+ Note: the hook may not be run in the main thread!
+ """
+ mainthread=False
+ try:
+ mainthread = kwargs['run_in_main_thread']
+ del kwargs['run_in_main_thread']
+ except KeyError:
+ pass
+ self._stats_hooks.append ([func, mainthread, args, kwargs])
+
+ def _stats_hooks_run(self, completed_job=None):
+ """
+ Run the stats hooks.
+ """
+ # if not self._stats_hooks:
+ # return
+
+ self.lock.acquire ()
+ try:
+ old_stats = self._current_stats
+ self._current_stats = self.stats ()
+ current_stats = self._current_stats
+ finally:
+ self.lock.release ()
+
+ debug("%s -> %s" % (str (old_stats), str (current_stats)))
+
+ for (f, run_in_main_thread, args, kwargs) in self._stats_hooks:
+ if run_in_main_thread:
+ debug("JobManager._stats_hooks_run: Running %s in main thread"
+ % f)
+ mainthread.execute(
+ f, self, old_stats, current_stats, completed_job,
+ async=True, *args, **kwargs)
+ else:
+ debug("JobManager._stats_hooks_run: Running %s in any thread"
+ % f)
+ f(self, old_stats, current_stats, completed_job,
+ *args, **kwargs)