Move download management from frontends to rss_sqlite.py.
[feedingit] / src / jobmanager.py
1 #!/usr/bin/env python2.5
2
3 # Copyright (c) 2011 Neal H. Walfield <neal@walfield.org>
4 #
5 # This program is free software: you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
9 #
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13 # GNU General Public License for more details.
14 #
15 # You should have received a copy of the GNU General Public License
16 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18 import threading
19 import thread
20 import traceback
21 import heapq
22 import sys
23 import mainthread
24
25 def debug(*args):
26     if False:
27         sys.stdout.write(*args)
28         sys.stdout.write("\n")
29
30 # The default priority.  Like nice(), a smaller numeric priority
31 # corresponds to a higher priority class.
32 default_priority = 0
33
34 class JobRunner(threading.Thread):
35     def __init__(self, job_manager):
36         threading.Thread.__init__(self)
37         self.job_manager = job_manager
38
39     def run (self):
40         have_lock = True
41         self.job_manager.lock.acquire ()
42         try:
43             while self.job_manager.pause == 0 and not self.job_manager.do_quit:
44                 try:
45                     _, key, job = heapq.heappop (self.job_manager.queue)
46                 except IndexError:
47                     return
48     
49                 try:
50                     self.job_manager.in_progress.append (key)
51                     self.job_manager.lock.release ()
52                     have_lock = False
53         
54                     # Execute the job.
55                     try:
56                         job ()
57                     except KeyboardInterrupt:
58                         # This is handled below and doesn't require a
59                         # traceback.
60                         raise
61                     except:
62                         print ("Executing job %s (%s) from thread %s: %s"
63                                % (str (key), str (job),
64                                   threading.currentThread(),
65                                   traceback.format_exc ()))
66         
67                     self.job_manager.lock.acquire ()
68                     have_lock = True
69     
70                     assert key in self.job_manager.in_progress
71                 finally:
72                     try:
73                         self.job_manager.in_progress.remove (key)
74                     except ValueError:
75                         pass
76     
77                 debug("Finished executing job %s (%s)" % (key, job,))
78     
79                 self.job_manager._stats_hooks_run ({'job':job, 'key':key})
80         except KeyboardInterrupt:
81             debug("%s: KeyboardInterrupt" % threading.currentThread())
82             thread.interrupt_main()
83             debug("%s: Forwarded KeyboardInterrupt to main thread"
84                   % threading.currentThread())
85         finally:
86             if have_lock:
87                 self.job_manager.lock.release ()
88
89             assert self in self.job_manager.threads
90             self.job_manager.threads.remove (self)
91
92             debug ("Job runner %s (%d left) exiting."
93                    % (threading.currentThread(),
94                       len (self.job_manager.threads)))
95
96 _jm = None
97 def JobManager(start=False):
98     """
99     Return the job manager instance.  The job manager will not start
100     executing jobs until this is called with start set to True.  Note:
101     you can still queue jobs.
102     """
103     global _jm
104     if _jm is None:
105         _jm = _JobManager ()
106     if start and not _jm.started:
107         _jm.started = True
108         if _jm.jobs > 0:
109             _jm._stats_hooks_run ()
110         _jm.tickle ()
111
112     return _jm
113
114 class _JobManager(object):
115     def __init__(self, started=False, num_threads=4):
116         """
117         Initialize the job manager.
118
119         If started is false, jobs may be queued, but jobs will not be
120         started until start() is called.
121         """
122         # A reentrant lock so that a job runner can call stat without
123         # dropping the lock.
124         self.lock = threading.RLock()
125
126         # If we can start executing jobs.
127         self.started = started
128
129         # The maximum number of threads to use for executing jobs.
130         self.num_threads = num_threads
131
132         # List of jobs (priority, key, job) that are queued for
133         # execution.
134         self.queue = []
135         # List of keys of the jobs that are being executed.
136         self.in_progress = []
137         # List of threads.
138         self.threads = []
139
140         # If 0, jobs may execute, otherwise, job execution is paused.
141         self.pause = 0
142
143         # The total number of jobs that this manager ever executed.
144         self.jobs = 0
145
146         # A list of status hooks to execute when the stats change.
147         self._stats_hooks = []
148         self._current_stats = self.stats ()
149
150         self.do_quit = False
151
152     def _lock(f):
153         def wrapper(*args, **kwargs):
154             self = args[0]
155             self.lock.acquire ()
156             try:
157                 return f(*args, **kwargs)
158             finally:
159                 self.lock.release()
160         return wrapper
161
162     @_lock
163     def start(self):
164         """
165         Start executing jobs.
166         """
167         if self.started:
168             return
169         if self.jobs > 0:
170             self._stats_hooks_run ()
171         self.tickle ()
172
173     @_lock
174     def tickle(self):
175         """
176         Ensure that there are enough job runners for the number of
177         pending jobs.
178         """
179         if self.do_quit:
180             debug("%s.quit called, not creating new threads."
181                   % self.__class__.__name__)
182             return
183
184         if self.pause > 0:
185             # Job execution is paused.  Don't start any new threads.
186             debug("%s.tickle(): Not doing anything: paused"
187                   % (self.__class__.__name__))
188             return
189
190         debug("%s.tickle: Have %d threads (can start %d); %d jobs queued"
191               % (self.__class__.__name__,
192                  len (self.threads), self.num_threads, len (self.queue)))
193         if len (self.threads) < self.num_threads:
194             for _ in range (min (len (self.queue),
195                                  self.num_threads - len (self.threads))):
196                 thread = JobRunner (self)
197                 # Setting threads as daemons means faster shutdown
198                 # when the main thread exists, but it results in
199                 # exceptions and occassional setfaults.
200                 # thread.setDaemon(True)
201                 self.threads.append (thread)
202                 thread.start ()
203                 debug("Now have %d threads" % len (self.threads))
204
205     @_lock
206     def execute(self, job, key=None, priority=default_priority):
207         """
208         Enqueue a job for execution.  job is a function to execute.
209         If key is not None, the job is only enqueued if there is no
210         job that is inprogress or enqueued with the same key.
211         priority is the job's priority.  Like nice(), a smaller
212         numeric priority corresponds to a higher priority class.  Jobs
213         are executed highest priority first, in the order that they
214         were added.
215         """
216         if self.do_quit:
217             debug("%s.quit called, not enqueuing new jobs."
218                   % self.__class__.__name__)
219
220         if key is not None:
221             if key in self.in_progress:
222                 return
223             for item in self.queue:
224                 if item[1] == key:
225                     if item[0][0] < priority:
226                         # Priority raised.
227                         item[0][0] = priority
228                         self.queue = heapq.heapify (self.queue)
229                     return
230
231         # To ensure that jobs with the same priority are executed
232         # in the order they are added, we set the priority to
233         # [priority, next (monotomic counter)].
234         self.jobs += 1
235         heapq.heappush (self.queue, [[priority, self.jobs], key, job])
236
237         if self.started:
238             self._stats_hooks_run ()
239             self.tickle ()
240         else:
241             debug("%s not initialized. delaying execution of %s (%s)"
242                   % (self.__class__.__name__, key, str (job),))
243
244     @_lock
245     def pause(self):
246         """
247         Increasement the pause count.  When the pause count is greater
248         than 0, job execution is suspended.
249         """
250         self.pause += 1
251
252         if self.pause == 1:
253             self._stats_hooks_run ()
254
255     @_lock
256     def resume(self):
257         """
258         Decrement the pause count.  If the pause count is greater than
259         0 and this decrement brings it to 0, enqueued jobs are
260         resumed.
261         """
262         assert self.pause > 0
263         self.pause -= 1
264         if not self.paused():
265             self._stats_hooks_run ()
266             self.tickle ()
267
268     @_lock
269     def paused(self):
270         """
271         Returns whether job execution is paused.
272         """
273         return self.pause > 0
274
275     @_lock
276     def cancel(self):
277         """
278         Cancel any pending jobs.
279         """
280         self.queue = []
281         self._stats_hooks_run ()
282
283     def quit(self):
284         self.cancel ()
285         self.do_quit = True
286
287     @_lock
288     def stats(self):
289         """
290         Return a dictionary consisting of:
291
292           - 'paused': whether execution is paused
293           - 'jobs': the total number of jobs this manager has
294               executed, is executing or are queued
295           - 'jobs-completed': the numer of jobs that have completed
296           - 'jobs-in-progress': the number of jobs in progress
297           - 'jobs-queued': the number of jobs currently queued
298         """
299         return {'paused': self.paused(),
300                 'jobs': self.jobs,
301                 'jobs-completed':
302                   self.jobs - len (self.in_progress) - len (self.queue),
303                 'jobs-in-progress': len (self.in_progress),
304                 'jobs-queued': len (self.queue)
305                 }
306
307     def stats_hook_register(self, func, *args, **kwargs):
308         """
309         Registers a function to be called when the job status changes.
310         Passed the following parameters:
311
312           - the JobManager instance.
313           - the previous stats (as returned by stats)
314           - the current stats
315           - the job that was completed (or None)
316
317         Note: the hook may not be run in the main thread!
318         """
319         mainthread=False
320         try:
321             mainthread = kwargs['run_in_main_thread']
322             del kwargs['run_in_main_thread']
323         except KeyError:
324             pass
325         self._stats_hooks.append ([func, mainthread, args, kwargs])
326
327     def _stats_hooks_run(self, completed_job=None):
328         """
329         Run the stats hooks.
330         """
331         # if not self._stats_hooks:
332         #     return
333
334         self.lock.acquire ()
335         try:
336             old_stats = self._current_stats
337             self._current_stats = self.stats ()
338             current_stats = self._current_stats
339         finally:
340             self.lock.release ()
341
342         debug("%s -> %s" % (str (old_stats), str (current_stats)))
343
344         for (f, run_in_main_thread, args, kwargs) in self._stats_hooks:
345             if run_in_main_thread:
346                 debug("JobManager._stats_hooks_run: Running %s in main thread"
347                       % f)
348                 mainthread.execute(
349                     f, self, old_stats, current_stats, completed_job,
350                     async=True, *args, **kwargs)
351             else:
352                 debug("JobManager._stats_hooks_run: Running %s in any thread"
353                       % f)
354                 f(self, old_stats, current_stats, completed_job,
355                   *args, **kwargs)