From 4b02f7be73c2ff0d831c273d6be416ccfe126a33 Mon Sep 17 00:00:00 2001 From: "Neal H. Walfield" Date: Sat, 23 Jul 2011 10:38:18 +0200 Subject: [PATCH] Move download management from frontends to rss_sqlite.py. - Move download logic to rss_sqlite.py: - Move thread management to jobmanager.py. - Queue jobs in rss_sqlite.py. - Manage per-feed DBus locks in rss_sqlite.py. - Get config settings in rss_sqlite.py on demand. - Make downloading more robust: - Only call non-thread-safe code (in particular, DBus and gconf functionality) from the main thread using mainthread.py. - Improve responsiveness of the frontend but yielding during CPU intense activities and only allowing a single slave to parse a feed at a time. - When downloads are canceled, do our best to quit cleanly and quickly. - Update frontends to use the new functionality. - Remove redundant code, in particular, download functionality. - Rework FeedingIt.py's download bar. --- src/FeedingIt-Web.py | 40 +--- src/FeedingIt.py | 283 +++++++++++++++++----------- src/config.py | 2 + src/httpprogresshandler.py | 143 ++++++++++++++ src/jobmanager.py | 355 +++++++++++++++++++++++++++++++++++ src/mainthread.py | 157 ++++++++++++++++ src/rss_sqlite.py | 441 ++++++++++++++++++++++++++++++-------------- src/update_feeds.py | 124 +++++-------- src/updatedbus.py | 7 +- 9 files changed, 1191 insertions(+), 361 deletions(-) create mode 100644 src/httpprogresshandler.py create mode 100644 src/jobmanager.py create mode 100644 src/mainthread.py diff --git a/src/FeedingIt-Web.py b/src/FeedingIt-Web.py index 7f6b2a9..4fc6965 100644 --- a/src/FeedingIt-Web.py +++ b/src/FeedingIt-Web.py @@ -44,7 +44,7 @@ def sanitize(text): def start_server(): global listing - listing = Listing(CONFIGDIR) + listing = Listing(config, CONFIGDIR) httpd = BaseHTTPServer.HTTPServer(("127.0.0.1", PORT), Handler) httpd.serve_forever() @@ -69,37 +69,11 @@ class App(): # download = Download(listing, feeds) # download.start() -class Download(Thread): - def __init__(self, listing, keys): - Thread.__init__(self) - self.listing = listing - self.keys = keys - - def run (self): - updateDbusHandler.UpdateStarted() - for key in self.keys: - print "Start update: %s" % key - updatingFeeds.append(key) - (use_proxy, proxy) = config.getProxy() - try: - if use_proxy: - self.listing.updateFeed(key, proxy=proxy, imageCache=config.getImageCache() ) - else: - self.listing.updateFeed(key, imageCache=config.getImageCache() ) - except: - print "Error updating feed: %s" %key - updatingFeeds.remove(key) - print "End update: %s" % key - updateDbusHandler.UpdateFinished() - class Handler(BaseHTTPServer.BaseHTTPRequestHandler): def updateAll(self): - feeds = [] for cat in listing.getListOfCategories(): for feed in listing.getSortedListOfKeys("Manual", category=cat): - feeds.append(feed) - download = Download(listing, feeds) - download.start() + listing.updateFeed(feed) def openTaskSwitch(self): import subprocess @@ -304,9 +278,6 @@ import thread -# Start the HTTP server in a new thread -thread.start_new_thread(start_server, ()) - # Initialize the glib mainloop, for dbus purposes from feedingitdbus import ServerObject from updatedbus import UpdateServerObject, get_lock @@ -315,11 +286,18 @@ import gobject gobject.threads_init() import dbus.mainloop.glib dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) +import mainthread +mainthread.init() +from jobmanager import JobManager +JobManager(True) global updateDbusHandler, dbusHandler app = App() dbusHandler = ServerObject(app) updateDbusHandler = UpdateServerObject(app) +# Start the HTTP server in a new thread +thread.start_new_thread(start_server, ()) + mainloop = gobject.MainLoop() mainloop.run() diff --git a/src/FeedingIt.py b/src/FeedingIt.py index 9ba9469..f7a45b3 100644 --- a/src/FeedingIt.py +++ b/src/FeedingIt.py @@ -2,6 +2,7 @@ # # Copyright (c) 2007-2008 INdT. +# Copyright (c) 2011 Neal H. Walfield # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or @@ -44,11 +45,13 @@ from feedingitdbus import ServerObject from updatedbus import UpdateServerObject, get_lock from config import Config from cgi import escape +import weakref from rss_sqlite import Listing from opml import GetOpmlData, ExportOpmlData -from urllib2 import install_opener, build_opener +import mainthread +from jobmanager import JobManager from socket import setdefaulttimeout timeout = 5 @@ -270,90 +273,95 @@ class AddCategoryWizard(gtk.Dialog): def getData(self): return self.nameEntry.get_text() -class Download(Thread): - def __init__(self, listing, key, config): - Thread.__init__(self) - self.listing = listing - self.key = key - self.config = config - - def run (self): - (use_proxy, proxy) = self.config.getProxy() - key_lock = get_lock(self.key) - if key_lock != None: - if use_proxy: - self.listing.updateFeed(self.key, self.config.getExpiry(), proxy=proxy, imageCache=self.config.getImageCache() ) - else: - self.listing.updateFeed(self.key, self.config.getExpiry(), imageCache=self.config.getImageCache() ) - del key_lock - - class DownloadBar(gtk.ProgressBar): - def __init__(self, parent, listing, listOfKeys, config, single=False): - - update_lock = get_lock("update_lock") - if update_lock != None: - gtk.ProgressBar.__init__(self) - self.listOfKeys = listOfKeys[:] - self.listing = listing - self.total = len(self.listOfKeys) - self.config = config - self.current = 0 - self.single = single - (use_proxy, proxy) = self.config.getProxy() - if use_proxy: - opener = build_opener(proxy) - else: - opener = build_opener() - - opener.addheaders = [('User-agent', USER_AGENT)] - install_opener(opener) - - if self.total>0: - # In preparation for i18n/l10n - def N_(a, b, n): - return (a if n == 1 else b) - - self.set_text(N_('Updating %d feed', 'Updating %d feeds', self.total) % self.total) - - self.fraction = 0 - self.set_fraction(self.fraction) - self.show_all() - # Create a timeout - self.timeout_handler_id = gobject.timeout_add(50, self.update_progress_bar) - - def update_progress_bar(self): - #self.progress_bar.pulse() - if activeCount() < 4: - x = activeCount() - 1 - k = len(self.listOfKeys) - fin = self.total - k - x - fraction = float(fin)/float(self.total) + float(x)/(self.total*2.) - #print x, k, fin, fraction - self.set_fraction(fraction) - - if len(self.listOfKeys)>0: - self.current = self.current+1 - key = self.listOfKeys.pop() - #if self.single == True: - # Check if the feed is being displayed - download = Download(self.listing, key, self.config) - download.start() - return True - elif activeCount() > 1: - return True + @classmethod + def class_init(cls): + if hasattr (cls, 'class_init_done'): + return + + jm = JobManager () + jm.stats_hook_register (cls.update_progress, + run_in_main_thread=True) + + cls.downloadbars = [] + # Total number of jobs we are monitoring. + cls.total = 0 + # Number of jobs complete (of those that we are monitoring). + cls.done = 0 + # Percent complete. + cls.progress = 0 + + cls.class_init_done = True + + def __init__(self, parent): + self.class_init () + + gtk.ProgressBar.__init__(self) + + self.downloadbars.append(weakref.ref (self)) + self.set_fraction(0) + self.__class__.update_bars() + self.show_all() + + @classmethod + def downloading(cls): + return hasattr (cls, 'jobs_at_start') + + @classmethod + def update_progress(cls, jm, old_stats, new_stats, updated_feed): + if not cls.downloading(): + cls.jobs_at_start = old_stats['jobs-completed'] + + if not cls.downloadbars: + return + + if new_stats['jobs-in-progress'] + new_stats['jobs-queued'] == 0: + del cls.jobs_at_start + for ref in cls.downloadbars: + bar = ref () + if bar is None: + # The download bar disappeared. + cls.downloadbars.remove (ref) + else: + bar.emit("download-done", None) + return + + # This should never be called if new_stats['jobs'] is 0, but + # just in case... + cls.total = max (1, new_stats['jobs'] - cls.jobs_at_start) + cls.done = new_stats['jobs-completed'] - cls.jobs_at_start + cls.progress = 1 - (new_stats['jobs-in-progress'] / 2. + + new_stats['jobs-queued']) / cls.total + cls.update_bars() + + if updated_feed: + for ref in cls.downloadbars: + bar = ref () + if bar is None: + # The download bar disappeared. + cls.downloadbars.remove (ref) + else: + bar.emit("download-done", updated_feed) + + @classmethod + def update_bars(cls): + # In preparation for i18n/l10n + def N_(a, b, n): + return (a if n == 1 else b) + + text = (N_('Updated %d of %d feeds ', 'Updated %d of %d feeds', + cls.total) + % (cls.done, cls.total)) + + for ref in cls.downloadbars: + bar = ref () + if bar is None: + # The download bar disappeared. + cls.downloadbars.remove (ref) else: - #self.waitingWindow.destroy() - #self.destroy() - try: - del self.update_lock - except: - pass - self.emit("download-done", "success") - return False - return True - - + bar.set_text(text) + bar.set_fraction(cls.progress) + class SortList(hildon.StackableWindow): def __init__(self, parent, listing, feedingit, after_closing, category=None): hildon.StackableWindow.__init__(self) @@ -741,7 +749,14 @@ class DisplayFeed(hildon.StackableWindow): self.set_app_menu(menu) menu.show_all() + self.main_vbox = gtk.VBox(False, 0) + self.add(self.main_vbox) + + self.pannableFeed = None self.displayFeed() + + if DownloadBar.downloading (): + self.show_download_bar () self.connect('configure-event', self.on_configure_event) self.connect("destroy", self.destroyWindow) @@ -771,6 +786,9 @@ class DisplayFeed(hildon.StackableWindow): return escape(unescape(title).replace("","").replace("","").replace("","").replace("","").replace("","")) def displayFeed(self): + if self.pannableFeed: + self.pannableFeed.destroy() + self.pannableFeed = hildon.PannableArea() self.pannableFeed.set_property('hscrollbar-policy', gtk.POLICY_NEVER) @@ -842,7 +860,7 @@ class DisplayFeed(hildon.StackableWindow): markup = ENTRY_TEMPLATE % (self.config.getFontSize(), "No Articles To Display") self.feedItems.append((markup, "")) - self.add(self.pannableFeed) + self.main_vbox.pack_start(self.pannableFeed) self.show_all() def clear(self): @@ -935,22 +953,24 @@ class DisplayFeed(hildon.StackableWindow): self.displayFeed() def button_update_clicked(self, button): - #bar = DownloadBar(self, self.listing, [self.key,], self.config ) + self.listing.updateFeed (self.key, priority=-1) + + def show_download_bar(self): if not type(self.downloadDialog).__name__=="DownloadBar": - self.pannableFeed.destroy() - self.vbox = gtk.VBox(False, 10) - self.downloadDialog = DownloadBar(self.window, self.listing, [self.key,], self.config, single=True ) - self.downloadDialog.connect("download-done", self.onDownloadsDone) - self.vbox.pack_start(self.downloadDialog, expand=False, fill=False) - self.add(self.vbox) + self.downloadDialog = DownloadBar(self.window) + self.downloadDialog.connect("download-done", self.onDownloadDone) + self.main_vbox.pack_end(self.downloadDialog, + expand=False, fill=False) self.show_all() - - def onDownloadsDone(self, *widget): - self.vbox.destroy() - self.feed = self.listing.getFeed(self.key) - self.displayFeed() - self.updateDbusHandler.ArticleCountUpdated() + def onDownloadDone(self, widget, feed): + if feed == self.feed or feed is None: + self.downloadDialog.destroy() + self.downloadDialog = False + self.feed = self.listing.getFeed(self.key) + self.displayFeed() + self.updateDbusHandler.ArticleCountUpdated() + def buttonReadAllClicked(self, button): #self.clear() self.feed.markAllAsRead() @@ -1007,8 +1027,8 @@ class FeedingIt: self.stopButton.destroy() except: pass - self.listing = Listing(CONFIGDIR) - + self.listing = Listing(self.config, CONFIGDIR) + self.downloadDialog = False try: self.orientation = FremantleRotation(__appname__, main_window=self.window, app=self) @@ -1080,8 +1100,29 @@ class FeedingIt: self.checkAutoUpdate() hildon.hildon_gtk_window_set_progress_indicator(self.window, 0) - gobject.idle_add(self.enableDbus) + gobject.idle_add(self.late_init) + def job_manager_update(self, jm, old_stats, new_stats, updated_feed): + if (not self.downloadDialog + and new_stats['jobs-in-progress'] + new_stats['jobs-queued'] > 0): + self.updateDbusHandler.UpdateStarted() + + self.downloadDialog = DownloadBar(self.window) + self.downloadDialog.connect("download-done", self.onDownloadDone) + self.mainVbox.pack_end(self.downloadDialog, expand=False, fill=False) + self.mainVbox.show_all() + + if self.__dict__.get ('disp', None): + self.disp.show_download_bar () + + def onDownloadDone(self, widget, feed): + if feed is None: + self.downloadDialog.destroy() + self.downloadDialog = False + self.displayListing() + self.updateDbusHandler.UpdateFinished() + self.updateDbusHandler.ArticleCountUpdated() + def stop_running_update(self, button): self.stopButton.set_sensitive(False) import dbus @@ -1092,10 +1133,15 @@ class FeedingIt: iface = dbus.Interface(remote_object, 'org.marcoz.feedingit') iface.StopUpdate() - def enableDbus(self): + def late_init(self): self.dbusHandler = ServerObject(self) self.updateDbusHandler = UpdateServerObject(self) + jm = JobManager() + jm.stats_hook_register (self.job_manager_update, + run_in_main_thread=True) + JobManager(True) + def button_markAll(self, button): for key in self.listing.getListOfFeeds(): feed = self.listing.getFeed(key) @@ -1142,12 +1188,8 @@ class FeedingIt: SortList(self.window, self.listing, self, after_closing) def button_update_clicked(self, button, key): - if not type(self.downloadDialog).__name__=="DownloadBar": - self.updateDbusHandler.UpdateStarted() - self.downloadDialog = DownloadBar(self.window, self.listing, self.listing.getListOfFeeds(), self.config ) - self.downloadDialog.connect("download-done", self.onDownloadsDone) - self.mainVbox.pack_end(self.downloadDialog, expand=False, fill=False) - self.mainVbox.show_all() + for k in self.listing.getListOfFeeds(): + self.listing.updateFeed (k) #self.displayListing() def onDownloadsDone(self, *widget): @@ -1309,11 +1351,28 @@ class FeedingIt: def onFeedClosedTimeout(self): del self.feed_lock self.updateDbusHandler.ArticleCountUpdated() - + + def quit(self, *args): + self.window.hide() + + if hasattr (self, 'app_lock'): + del self.app_lock + + # Wait until all slave threads have properly exited before + # terminating the mainloop. + jm = JobManager() + jm.quit () + stats = jm.stats() + if stats['jobs-in-progress'] == 0 and stats['jobs-queued'] == 0: + gtk.main_quit () + else: + gobject.timeout_add(500, self.quit) + + return False + def run(self): - self.window.connect("destroy", gtk.main_quit) + self.window.connect("destroy", self.quit) gtk.main() - del self.app_lock def prefsClosed(self, *widget): try: @@ -1353,7 +1412,7 @@ class FeedingIt: def stopUpdate(self): # Not implemented in the app (see update_feeds.py) try: - self.downloadDialog.listOfKeys = [] + JobManager().cancel () except: pass @@ -1367,6 +1426,8 @@ class FeedingIt: return status if __name__ == "__main__": + mainthread.init () + gobject.signal_new("feed-closed", DisplayFeed, gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, (gobject.TYPE_PYOBJECT,)) gobject.signal_new("article-closed", DisplayArticle, gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, (gobject.TYPE_PYOBJECT,)) gobject.signal_new("article-deleted", DisplayArticle, gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, (gobject.TYPE_PYOBJECT,)) diff --git a/src/config.py b/src/config.py index 8aa45ce..65fbddd 100644 --- a/src/config.py +++ b/src/config.py @@ -32,6 +32,7 @@ from ConfigParser import RawConfigParser from gconf import client_get_default from urllib2 import ProxyHandler +from mainthread import mainthread VERSION = "52" @@ -280,6 +281,7 @@ class Config(): return ranges["orientation"].index(self.config["orientation"]) def getImageCache(self): return self.config["imageCache"] + @mainthread def getProxy(self): if self.config["proxy"] == False: return (False, None) diff --git a/src/httpprogresshandler.py b/src/httpprogresshandler.py new file mode 100644 index 0000000..78eb39c --- /dev/null +++ b/src/httpprogresshandler.py @@ -0,0 +1,143 @@ +#!/usr/bin/env python2.5 + +# Copyright (c) 2011 Neal H. Walfield +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import urllib2 +import httplib +import time + +class ProgressSocket(object): + """ + Monitor what is being sent and received. + """ + def __init__(self, socket, connection): + self.socket = socket + self.connection = connection + + def __getattribute__(self, attr): + # print "%s.__getattribute__(%s)" % (self.__class__.__name__, attr) + + def send(data): + # 100k at a time. + bs = 100 * 1024 + sent = 0 + while sent < len (data): + remaining = len (data) - sent + if remaining < bs: + amount = remaining + else: + amount = bs + + self.socket.sendall(data[sent:sent+amount]) + sent += amount + self.connection.stats['sent'] += amount + self.connection.opener.stats['sent'] += amount + + if self.connection.callback is not None: + self.connection.callback () + + def read(*args, **kwargs): + data = self.socket.read (*args, **kwargs) + # print "GOT: %s" % (data[0:240],) + self.connection.stats['received'] += len (data) + self.connection.opener.stats['received'] += len (data) + if self.connection.callback is not None: + self.connection.callback () + return data + + if attr == 'send' or attr == 'sendall': + return send + if attr == 'read': + return read + + try: + return super (ProgressSocket, self).__getattribute__(attr) + except AttributeError: + socket = super (ProgressSocket, self).__getattribute__('socket') + return socket.__getattribute__(attr) + + def makefile(self, mode, bufsize): + return ProgressSocket (socket=self.socket.makefile(mode, bufsize), + connection=self.connection) + + def close(self): + return self.socket.close () + +def HTTPProgressConnectionBuilder(callback, opener): + class HTTPProgressConnection(httplib.HTTPConnection): + def __init__(self, *args, **kwargs): + self.method = None + self.url = None + return httplib.HTTPConnection.__init__ (self, *args, **kwargs) + + def putrequest(self, method, url, *args, **kwargs): + self.method = method + self.url = url + return httplib.HTTPConnection.putrequest ( + self, method, url, *args, **kwargs) + + def connect(self): + httplib.HTTPConnection.connect(self) + # Wrap the socket. + self.sock = ProgressSocket(socket=self.sock, + connection=self) + + HTTPProgressConnection.callback = callback + HTTPProgressConnection.opener = opener + HTTPProgressConnection.stats \ + = {'sent': 0, 'received': 0, 'started':time.time()} + return HTTPProgressConnection + +class HTTPProgressHandler(urllib2.HTTPHandler): + def __init__(self, callback): + self.callback = callback + self.stats = {'sent': 0, 'received': 0, 'started':time.time()} + return urllib2.HTTPHandler.__init__(self) + + def http_open(self, request): + return self.do_open( + HTTPProgressConnectionBuilder(self.callback, self), + request) + +if __name__ == '__main__': + def callback(connection): + req = "" + if connection.method: + req += connection.method + " " + req += connection.host + ':' + str (connection.port) + if connection.url: + req += connection.url + + cstats = connection.stats + ostats = connection.opener.stats + + print (("%s: connection: %d sent, %d received: %d kb/s; " + + "opener: %d sent, %d received, %d kb/s") + % (req, + cstats['sent'], cstats['received'], + ((cstats['sent'] + cstats['received']) + / (time.time() - cstats['started']) / 1024), + ostats['sent'], ostats['received'], + ((ostats['sent'] + ostats['received']) + / (time.time() - ostats['started']) / 1024))) + + opener = urllib2.build_opener(HTTPProgressHandler(callback)) + + data = opener.open ('http://google.com') + downloaded = 0 + for d in data: + downloaded += len (d) + print "Document is %d bytes in size" % (downloaded,) diff --git a/src/jobmanager.py b/src/jobmanager.py new file mode 100644 index 0000000..8ca4df8 --- /dev/null +++ b/src/jobmanager.py @@ -0,0 +1,355 @@ +#!/usr/bin/env python2.5 + +# Copyright (c) 2011 Neal H. Walfield +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import threading +import thread +import traceback +import heapq +import sys +import mainthread + +def debug(*args): + if False: + sys.stdout.write(*args) + sys.stdout.write("\n") + +# The default priority. Like nice(), a smaller numeric priority +# corresponds to a higher priority class. +default_priority = 0 + +class JobRunner(threading.Thread): + def __init__(self, job_manager): + threading.Thread.__init__(self) + self.job_manager = job_manager + + def run (self): + have_lock = True + self.job_manager.lock.acquire () + try: + while self.job_manager.pause == 0 and not self.job_manager.do_quit: + try: + _, key, job = heapq.heappop (self.job_manager.queue) + except IndexError: + return + + try: + self.job_manager.in_progress.append (key) + self.job_manager.lock.release () + have_lock = False + + # Execute the job. + try: + job () + except KeyboardInterrupt: + # This is handled below and doesn't require a + # traceback. + raise + except: + print ("Executing job %s (%s) from thread %s: %s" + % (str (key), str (job), + threading.currentThread(), + traceback.format_exc ())) + + self.job_manager.lock.acquire () + have_lock = True + + assert key in self.job_manager.in_progress + finally: + try: + self.job_manager.in_progress.remove (key) + except ValueError: + pass + + debug("Finished executing job %s (%s)" % (key, job,)) + + self.job_manager._stats_hooks_run ({'job':job, 'key':key}) + except KeyboardInterrupt: + debug("%s: KeyboardInterrupt" % threading.currentThread()) + thread.interrupt_main() + debug("%s: Forwarded KeyboardInterrupt to main thread" + % threading.currentThread()) + finally: + if have_lock: + self.job_manager.lock.release () + + assert self in self.job_manager.threads + self.job_manager.threads.remove (self) + + debug ("Job runner %s (%d left) exiting." + % (threading.currentThread(), + len (self.job_manager.threads))) + +_jm = None +def JobManager(start=False): + """ + Return the job manager instance. The job manager will not start + executing jobs until this is called with start set to True. Note: + you can still queue jobs. + """ + global _jm + if _jm is None: + _jm = _JobManager () + if start and not _jm.started: + _jm.started = True + if _jm.jobs > 0: + _jm._stats_hooks_run () + _jm.tickle () + + return _jm + +class _JobManager(object): + def __init__(self, started=False, num_threads=4): + """ + Initialize the job manager. + + If started is false, jobs may be queued, but jobs will not be + started until start() is called. + """ + # A reentrant lock so that a job runner can call stat without + # dropping the lock. + self.lock = threading.RLock() + + # If we can start executing jobs. + self.started = started + + # The maximum number of threads to use for executing jobs. + self.num_threads = num_threads + + # List of jobs (priority, key, job) that are queued for + # execution. + self.queue = [] + # List of keys of the jobs that are being executed. + self.in_progress = [] + # List of threads. + self.threads = [] + + # If 0, jobs may execute, otherwise, job execution is paused. + self.pause = 0 + + # The total number of jobs that this manager ever executed. + self.jobs = 0 + + # A list of status hooks to execute when the stats change. + self._stats_hooks = [] + self._current_stats = self.stats () + + self.do_quit = False + + def _lock(f): + def wrapper(*args, **kwargs): + self = args[0] + self.lock.acquire () + try: + return f(*args, **kwargs) + finally: + self.lock.release() + return wrapper + + @_lock + def start(self): + """ + Start executing jobs. + """ + if self.started: + return + if self.jobs > 0: + self._stats_hooks_run () + self.tickle () + + @_lock + def tickle(self): + """ + Ensure that there are enough job runners for the number of + pending jobs. + """ + if self.do_quit: + debug("%s.quit called, not creating new threads." + % self.__class__.__name__) + return + + if self.pause > 0: + # Job execution is paused. Don't start any new threads. + debug("%s.tickle(): Not doing anything: paused" + % (self.__class__.__name__)) + return + + debug("%s.tickle: Have %d threads (can start %d); %d jobs queued" + % (self.__class__.__name__, + len (self.threads), self.num_threads, len (self.queue))) + if len (self.threads) < self.num_threads: + for _ in range (min (len (self.queue), + self.num_threads - len (self.threads))): + thread = JobRunner (self) + # Setting threads as daemons means faster shutdown + # when the main thread exists, but it results in + # exceptions and occassional setfaults. + # thread.setDaemon(True) + self.threads.append (thread) + thread.start () + debug("Now have %d threads" % len (self.threads)) + + @_lock + def execute(self, job, key=None, priority=default_priority): + """ + Enqueue a job for execution. job is a function to execute. + If key is not None, the job is only enqueued if there is no + job that is inprogress or enqueued with the same key. + priority is the job's priority. Like nice(), a smaller + numeric priority corresponds to a higher priority class. Jobs + are executed highest priority first, in the order that they + were added. + """ + if self.do_quit: + debug("%s.quit called, not enqueuing new jobs." + % self.__class__.__name__) + + if key is not None: + if key in self.in_progress: + return + for item in self.queue: + if item[1] == key: + if item[0][0] < priority: + # Priority raised. + item[0][0] = priority + self.queue = heapq.heapify (self.queue) + return + + # To ensure that jobs with the same priority are executed + # in the order they are added, we set the priority to + # [priority, next (monotomic counter)]. + self.jobs += 1 + heapq.heappush (self.queue, [[priority, self.jobs], key, job]) + + if self.started: + self._stats_hooks_run () + self.tickle () + else: + debug("%s not initialized. delaying execution of %s (%s)" + % (self.__class__.__name__, key, str (job),)) + + @_lock + def pause(self): + """ + Increasement the pause count. When the pause count is greater + than 0, job execution is suspended. + """ + self.pause += 1 + + if self.pause == 1: + self._stats_hooks_run () + + @_lock + def resume(self): + """ + Decrement the pause count. If the pause count is greater than + 0 and this decrement brings it to 0, enqueued jobs are + resumed. + """ + assert self.pause > 0 + self.pause -= 1 + if not self.paused(): + self._stats_hooks_run () + self.tickle () + + @_lock + def paused(self): + """ + Returns whether job execution is paused. + """ + return self.pause > 0 + + @_lock + def cancel(self): + """ + Cancel any pending jobs. + """ + self.queue = [] + self._stats_hooks_run () + + def quit(self): + self.cancel () + self.do_quit = True + + @_lock + def stats(self): + """ + Return a dictionary consisting of: + + - 'paused': whether execution is paused + - 'jobs': the total number of jobs this manager has + executed, is executing or are queued + - 'jobs-completed': the numer of jobs that have completed + - 'jobs-in-progress': the number of jobs in progress + - 'jobs-queued': the number of jobs currently queued + """ + return {'paused': self.paused(), + 'jobs': self.jobs, + 'jobs-completed': + self.jobs - len (self.in_progress) - len (self.queue), + 'jobs-in-progress': len (self.in_progress), + 'jobs-queued': len (self.queue) + } + + def stats_hook_register(self, func, *args, **kwargs): + """ + Registers a function to be called when the job status changes. + Passed the following parameters: + + - the JobManager instance. + - the previous stats (as returned by stats) + - the current stats + - the job that was completed (or None) + + Note: the hook may not be run in the main thread! + """ + mainthread=False + try: + mainthread = kwargs['run_in_main_thread'] + del kwargs['run_in_main_thread'] + except KeyError: + pass + self._stats_hooks.append ([func, mainthread, args, kwargs]) + + def _stats_hooks_run(self, completed_job=None): + """ + Run the stats hooks. + """ + # if not self._stats_hooks: + # return + + self.lock.acquire () + try: + old_stats = self._current_stats + self._current_stats = self.stats () + current_stats = self._current_stats + finally: + self.lock.release () + + debug("%s -> %s" % (str (old_stats), str (current_stats))) + + for (f, run_in_main_thread, args, kwargs) in self._stats_hooks: + if run_in_main_thread: + debug("JobManager._stats_hooks_run: Running %s in main thread" + % f) + mainthread.execute( + f, self, old_stats, current_stats, completed_job, + async=True, *args, **kwargs) + else: + debug("JobManager._stats_hooks_run: Running %s in any thread" + % f) + f(self, old_stats, current_stats, completed_job, + *args, **kwargs) diff --git a/src/mainthread.py b/src/mainthread.py new file mode 100644 index 0000000..a85cf5a --- /dev/null +++ b/src/mainthread.py @@ -0,0 +1,157 @@ +#!/usr/bin/env python2.5 + +# Copyright (c) 2011 Neal H. Walfield +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import threading +import traceback + +_run_in_main_thread = None +_main_thread = None + +def init(run_in_main_thread=None): + """ + run_in_main_thread is a function that takes a single argument, a + callable and returns False. run_in_main_thread should run the + function in the main thread. + + If you are using glib, gobject.idle_add (the default) is + sufficient. (gobject.idle_add is thread-safe.) + """ + if run_in_main_thread is None: + import gobject + run_in_main_thread = gobject.idle_add + + global _run_in_main_thread + assert _run_in_main_thread is None + _run_in_main_thread = run_in_main_thread + + global _main_thread + _main_thread = threading.currentThread () + +def execute(func, *args, **kwargs): + """ + Execute FUNC in the main thread. + + If kwargs['async'] exists and is True, the function is executed + asynchronously (i.e., the thread does not wait for the function to + return in which case the function's return value is discarded). + Otherwise, this function waits until the function is executed and + returns its return value. + """ + async = False + try: + async = kwargs['async'] + del kwargs['async'] + except KeyError: + pass + + if threading.currentThread() == _main_thread: + if async: + try: + func (*args, **kwargs) + except: + print ("mainthread.execute: Executing %s: %s" + % (func, traceback.format_exc ())) + return + else: + return func (*args, **kwargs) + + assert _run_in_main_thread is not None, \ + "You can't call this function from a non-main thread until you've called init()" + + if not async: + cond = threading.Condition() + + result = {} + result['done'] = False + + def doit(): + def it(): + # Execute the function. + assert threading.currentThread() == _main_thread + + try: + result['result'] = func (*args, **kwargs) + except: + print ("mainthread.execute: Executing %s: %s" + % (func, traceback.format_exc ())) + + if not async: + cond.acquire () + result['done'] = True + if not async: + cond.notify () + cond.release () + + return False + return it + + if not async: + cond.acquire () + _run_in_main_thread (doit()) + + if async: + # Don't wait for the method to complete execution. + return + + # Wait for the result to become available. + while not result['done']: + cond.wait () + + return result.get ('result', None) + +if __name__ == "__main__": + import sys + import gobject + + init() + + def in_main_thread(test_num): + assert threading.currentThread() == _main_thread, \ + "Test %d failed" % (test_num,) + return test_num + + mainloop = gobject.MainLoop() + gobject.threads_init() + + assert execute (in_main_thread, 1) == 1 + assert (execute (in_main_thread, 2, async=False) == 2) + execute (in_main_thread, 3, async=True) + + class T(threading.Thread): + def __init__(self): + threading.Thread.__init__(self) + + def run(self): + assert threading.currentThread() != _main_thread + + assert execute (in_main_thread, 4) == 4 + assert (execute (in_main_thread, 5, async=False) == 5) + execute (in_main_thread, 6, async=True) + execute (mainloop.quit, async=False) + + def start_thread(): + t = T() + t.start() + return False + + gobject.idle_add (start_thread) + mainloop.run() + +def mainthread(f): + def wrapper(*args, **kwargs): + return execute (f, *args, **kwargs) + return wrapper diff --git a/src/rss_sqlite.py b/src/rss_sqlite.py index 64aef0a..e8b44a2 100644 --- a/src/rss_sqlite.py +++ b/src/rss_sqlite.py @@ -28,6 +28,7 @@ import sqlite3 from os.path import isfile, isdir from shutil import rmtree from os import mkdir, remove, utime +import os import md5 import feedparser import time @@ -35,12 +36,36 @@ import urllib2 from BeautifulSoup import BeautifulSoup from urlparse import urljoin from calendar import timegm +from updatedbus import get_lock, release_lock import threading +import traceback +from jobmanager import JobManager +import mainthread +from httpprogresshandler import HTTPProgressHandler def getId(string): return md5.new(string).hexdigest() +def download_callback(connection): + if JobManager().do_quit: + raise KeyboardInterrupt + +def downloader(progress_handler=None, proxy=None): + openers = [] + + if progress_handler: + openers.append (progress_handler) + else: + openers.append(HTTPProgressHandler(download_callback)) + + if proxy: + openers.append (proxy) + + return urllib2.build_opener (*openers) + class Feed: + serial_execution_lock = threading.Lock() + def _getdb(self): try: db = self.tls.db @@ -63,17 +88,36 @@ class Feed: self.db.execute("CREATE TABLE images (id text, imagePath text);") self.db.commit() - def addImage(self, configdir, key, baseurl, url): + def addImage(self, configdir, key, baseurl, url, proxy=None, opener=None): filename = configdir+key+".d/"+getId(url) if not isfile(filename): try: - f = urllib2.urlopen(urljoin(baseurl,url)) + if not opener: + opener = downloader(proxy=proxy) + + abs_url = urljoin(baseurl,url) + f = opener.open(abs_url) outf = open(filename, "w") outf.write(f.read()) f.close() outf.close() + except (urllib2.HTTPError, urllib2.URLError), exception: + print ("Could not download image %s: %s" + % (abs_url, str (exception))) + return None except: - print "Could not download " + url + exception = sys.exc_info()[0] + + print "Downloading image: %s" % abs_url + import traceback + traceback.print_exc() + + try: + remove(filename) + except OSError: + pass + + raise exception else: #open(filename,"a").close() # "Touch" the file file = open(filename,"a") @@ -81,147 +125,246 @@ class Feed: file.close() return filename - def updateFeed(self, configdir, url, etag, modified, expiryTime=24, proxy=None, imageCache=False): - # Expiry time is in hours - if proxy == None: - tmp=feedparser.parse(url, etag = etag, modified = modified) - else: - tmp=feedparser.parse(url, etag = etag, modified = modified, handlers = [proxy]) - expiry = float(expiryTime) * 3600. + def updateFeed(self, configdir, url, etag, modified, expiryTime=24, proxy=None, imageCache=False, priority=0, postFeedUpdateFunc=None, *postFeedUpdateFuncArgs): + def doit(): + def it(): + self._updateFeed(configdir, url, etag, modified, expiryTime, proxy, imageCache, postFeedUpdateFunc, *postFeedUpdateFuncArgs) + return it + JobManager().execute(doit(), self.key, priority=priority) - currentTime = 0 - # Check if the parse was succesful (number of entries > 0, else do nothing) - if len(tmp["entries"])>0: - currentTime = time.time() - # The etag and modified value should only be updated if the content was not null - try: - etag = tmp["etag"] - except KeyError: - etag = None - try: - modified = tmp["modified"] - except KeyError: - modified = None - try: - f = urllib2.urlopen(urljoin(tmp["feed"]["link"],"/favicon.ico")) - data = f.read() - f.close() - outf = open(self.dir+"/favicon.ico", "w") - outf.write(data) - outf.close() - del data - except: - #import traceback - #traceback.print_exc() - pass + def _updateFeed(self, configdir, url, etag, modified, expiryTime=24, proxy=None, imageCache=False, postFeedUpdateFunc=None, *postFeedUpdateFuncArgs): + success = False + have_serial_execution_lock = False + try: + update_lock = None + update_lock = get_lock("key") + if not update_lock: + # Someone else is doing an update. + return + + download_start = time.time () + progress_handler = HTTPProgressHandler(download_callback) - #reversedEntries = self.getEntries() - #reversedEntries.reverse() + openers = [progress_handler] + if proxy: + openers.append (proxy) + kwargs = {'handlers':openers} + + tmp=feedparser.parse(url, etag=etag, modified=modified, **kwargs) + download_duration = time.time () - download_start + + opener = downloader(progress_handler, proxy) - ids = self.getIds() + if JobManager().do_quit: + raise KeyboardInterrupt - tmp["entries"].reverse() - for entry in tmp["entries"]: - date = self.extractDate(entry) + process_start = time.time() + + # Expiry time is in hours + expiry = float(expiryTime) * 3600. + + currentTime = 0 + http_status = tmp.get ('status', 200) + + # Check if the parse was succesful. If the http status code + # is 304, then the download was successful, but there is + # nothing new. Indeed, no content is returned. This make a + # 304 look like an error because there are no entries and the + # parse fails. But really, everything went great! Check for + # this first. + if http_status == 304: + success = True + elif len(tmp["entries"])==0 and not tmp.version: + # An error occured fetching or parsing the feed. (Version + # will be either None if e.g. the connection timed our or + # '' if the data is not a proper feed) + print ("Error fetching %s: version is: %s: error: %s" + % (url, str (tmp.version), + str (tmp.get ('bozo_exception', 'Unknown error')))) + print tmp + else: + currentTime = time.time() + # The etag and modified value should only be updated if the content was not null try: - entry["title"] - except: - entry["title"] = "No Title" + etag = tmp["etag"] + except KeyError: + etag = None try: - entry["link"] - except: - entry["link"] = "" + modified = tmp["modified"] + except KeyError: + modified = None try: - entry["author"] - except: - entry["author"] = None - if(not(entry.has_key("id"))): - entry["id"] = None - tmpEntry = {"title":entry["title"], "content":self.extractContent(entry), - "date":date, "link":entry["link"], "author":entry["author"], "id":entry["id"]} - id = self.generateUniqueId(tmpEntry) - - #articleTime = time.mktime(self.entries[id]["dateTuple"]) - soup = BeautifulSoup(self.getArticle(tmpEntry)) #tmpEntry["content"]) - images = soup('img') - baseurl = tmpEntry["link"] - #if not id in ids: - if imageCache: - for img in images: - try: - filename = self.addImage(configdir, self.key, baseurl, img['src']) - img['src']="file://%s" %filename - count = self.db.execute("SELECT count(1) FROM images where id=? and imagePath=?;", (id, filename )).fetchone()[0] - if count == 0: - self.db.execute("INSERT INTO images (id, imagePath) VALUES (?, ?);", (id, filename) ) - except: - import traceback - traceback.print_exc() - print "Error downloading image %s" % img - tmpEntry["contentLink"] = configdir+self.key+".d/"+id+".html" - file = open(tmpEntry["contentLink"], "w") - file.write(soup.prettify()) - file.close() - if id in ids: - self.db.execute("UPDATE feed SET updated=? WHERE id=?;", (currentTime, id) ) - self.db.commit() - else: - values = (id, tmpEntry["title"], tmpEntry["contentLink"], tmpEntry["date"], currentTime, tmpEntry["link"], 0) - self.db.execute("INSERT INTO feed (id, title, contentLink, date, updated, link, read) VALUES (?, ?, ?, ?, ?, ?, ?);", values) - self.db.commit() -# else: -# try: -# self.db.execute("UPDATE feed SET updated=? WHERE id=?;", (currentTime, id) ) -# self.db.commit() -# filename = configdir+self.key+".d/"+id+".html" -# file = open(filename,"a") -# utime(filename, None) -# file.close() -# images = self.db.execute("SELECT imagePath FROM images where id=?;", (id, )).fetchall() -# for image in images: -# file = open(image[0],"a") -# utime(image[0], None) -# file.close() -# except: -# pass - self.db.commit() - - - rows = self.db.execute("SELECT id FROM feed WHERE (read=0 AND updated 0: + self.serial_execution_lock.release () + have_serial_execution_lock = False + for img in images: + filename = self.addImage(configdir, self.key, baseurl, img['src'], proxy=proxy) + if filename: + img['src']="file://%s" %filename + count = self.db.execute("SELECT count(1) FROM images where id=? and imagePath=?;", (id, filename )).fetchone()[0] + if count == 0: + self.db.execute("INSERT INTO images (id, imagePath) VALUES (?, ?);", (id, filename) ) + self.db.commit() + + try: + object_size += os.path.getsize (filename) + except os.error, exception: + print ("Error getting size of %s: %s" + % (filename, exception)) + pass + self.serial_execution_lock.acquire () + have_serial_execution_lock = True + + tmpEntry["contentLink"] = configdir+self.key+".d/"+id+".html" + file = open(tmpEntry["contentLink"], "w") + file.write(soup.prettify()) + file.close() + if id in ids: + self.db.execute("UPDATE feed SET updated=? WHERE id=?;", (currentTime, id) ) + self.db.commit() + else: + values = (id, tmpEntry["title"], tmpEntry["contentLink"], tmpEntry["date"], currentTime, tmpEntry["link"], 0) + self.db.execute("INSERT INTO feed (id, title, contentLink, date, updated, link, read) VALUES (?, ?, ?, ?, ?, ?, ?);", values) + self.db.commit() +# else: +# try: +# self.db.execute("UPDATE feed SET updated=? WHERE id=?;", (currentTime, id) ) +# self.db.commit() +# filename = configdir+self.key+".d/"+id+".html" +# file = open(filename,"a") +# utime(filename, None) +# file.close() +# images = self.db.execute("SELECT imagePath FROM images where id=?;", (id, )).fetchall() +# for image in images: +# file = open(image[0],"a") +# utime(image[0], None) +# file.close() +# except: +# pass + - from glob import glob - from os import stat - for file in glob(configdir+self.key+".d/*"): - # - stats = stat(file) - # - # put the two dates into matching format - # - lastmodDate = stats[8] - # - expDate = time.time()-expiry*3 - # check if image-last-modified-date is outdated - # - if expDate > lastmodDate: + self.db.commit() + + success = True + + rows = self.db.execute("SELECT id FROM feed WHERE (read=0 AND updated lastmodDate: # - print 'Could not remove', file - updateTime = 0 - rows = self.db.execute("SELECT MAX(date) FROM feed;") - for row in rows: - updateTime=row[0] - return (updateTime, etag, modified) - + try: + # + #print 'Removing', file + # + remove(file) # commented out for testing + # + except OSError, exception: + # + print 'Could not remove %s: %s' % (file, str (exception)) + print ("updated %s: %fs in download, %fs in processing" + % (self.key, download_duration, + time.time () - process_start)) + finally: + self.db.commit () + + if have_serial_execution_lock: + self.serial_execution_lock.release () + + if update_lock is not None: + release_lock (update_lock) + + updateTime = 0 + try: + rows = self.db.execute("SELECT MAX(date) FROM feed;") + for row in rows: + updateTime=row[0] + except: + print "Fetching update time." + import traceback + traceback.print_exc() + finally: + if not success: + etag = None + modified = None + if postFeedUpdateFunc is not None: + postFeedUpdateFunc (self.key, updateTime, etag, modified, + *postFeedUpdateFuncArgs) + def setEntryRead(self, id): self.db.execute("UPDATE feed SET read=1 WHERE id=?;", (id,) ) self.db.commit() @@ -382,9 +525,10 @@ class ArchivedArticles(Feed): images = soup('img') baseurl = link for img in images: - filename = self.addImage(configdir, self.key, baseurl, img['src']) + filename = self.addImage(configdir, self.key, baseurl, img['src'], proxy=proxy) img['src']=filename self.db.execute("INSERT INTO images (id, imagePath) VALUES (?, ?);", (id, filename) ) + self.db.commit() contentLink = configdir+self.key+".d/"+id+".html" file = open(contentLink, "w") file.write(soup.prettify()) @@ -422,7 +566,8 @@ class Listing: db = property(_getdb) # Lists all the feeds in a dictionary, and expose the data - def __init__(self, configdir): + def __init__(self, config, configdir): + self.config = config self.configdir = configdir self.tls = threading.local () @@ -448,7 +593,7 @@ class Listing: self.addCategory("Default Category") self.db.execute("ALTER TABLE feeds ADD COLUMN category int;") self.db.execute("UPDATE feeds SET category=1;") - self.db.commit() + self.db.commit() except: pass @@ -507,14 +652,34 @@ class Listing: archFeed.addArchivedArticle(title, link, date, self.configdir) self.updateUnread("ArchivedArticles") - def updateFeed(self, key, expiryTime=24, proxy=None, imageCache=False): + def updateFeed(self, key, expiryTime=None, proxy=None, imageCache=None, + priority=0): + if expiryTime is None: + expiryTime = self.config.getExpiry() + if not expiryTime: + # Default to 24 hours + expriyTime = 24 + if proxy is None: + (use_proxy, proxy) = self.config.getProxy() + if not use_proxy: + proxy = None + if imageCache is None: + imageCache = self.config.getImageCache() + feed = self.getFeed(key) (url, etag, modified) = self.db.execute("SELECT url, etag, modified FROM feeds WHERE id=?;", (key,) ).fetchone() try: modified = time.struct_time(eval(modified)) except: modified = None - (updateTime, etag, modified) = feed.updateFeed(self.configdir, url, etag, modified, expiryTime, proxy, imageCache) + feed.updateFeed( + self.configdir, url, etag, modified, expiryTime, proxy, imageCache, + priority, postFeedUpdateFunc=self._queuePostFeedUpdate) + + def _queuePostFeedUpdate(self, *args, **kwargs): + mainthread.execute (self._postFeedUpdate, async=True, *args, **kwargs) + + def _postFeedUpdate(self, key, updateTime, etag, modified): if modified==None: modified="None" else: diff --git a/src/update_feeds.py b/src/update_feeds.py index 5122a5b..9516594 100644 --- a/src/update_feeds.py +++ b/src/update_feeds.py @@ -2,6 +2,7 @@ # # Copyright (c) 2007-2008 INdT. +# Copyright (c) 2011 Neal H. Walfield # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Lesser General Public License as published by # the Free Software Foundation, either version 3 of the License, or @@ -25,10 +26,14 @@ from rss_sqlite import Listing from config import Config +from updatedbus import get_lock, UpdateServerObject -import threading import os import gobject +import traceback + +from jobmanager import JobManager +import mainthread CONFIGDIR="/home/user/.feedingit/" #DESKTOP_FILE = "/usr/share/applications/hildon-status-menu/feedingit_status.desktop" @@ -39,100 +44,59 @@ timeout = 5 setdefaulttimeout(timeout) del timeout -from updatedbus import UpdateServerObject, get_lock - -class Download(threading.Thread): - def __init__(self, config, dbusHandler): - global dbug - threading.Thread.__init__(self) - self.running = True - self.config = config - self.dbusHandler = dbusHandler - if dbug: - self.dbug = open(CONFIGDIR+"dbug.log", "w") - - def run(self): - global dbug - if dbug: - self.dbug.write("Starting updates\n") - try: - self.dbusHandler.UpdateStarted() - (use_proxy, proxy) = self.config.getProxy() - listing = Listing(CONFIGDIR) - for key in listing.getListOfFeeds(): - if dbug: - self.dbug.write("updating %s\n" %key) - try: - if use_proxy: - from urllib2 import install_opener, build_opener - install_opener(build_opener(proxy)) - listing.updateFeed(key, self.config.getExpiry(), proxy=proxy, imageCache=self.config.getImageCache() ) - else: - listing.updateFeed(key, self.config.getExpiry(), imageCache=self.config.getImageCache() ) - except: - import traceback - file = open("/home/user/.feedingit/feedingit_update.log", "a") - traceback.print_exc(file=file) - file.close() - if not self.running: - if dbug: - self.dbug.write("received stopUpdate after %s\n" %key) - break - self.dbusHandler.UpdateFinished() - self.dbusHandler.ArticleCountUpdated() - if dbug: - self.dbug.write("Dbus ArticleCountUpdated signal sent\n") - except: - import traceback - file = open("/home/user/.feedingit/feedingit_update.log", "a") - traceback.print_exc(file=file) - file.close() - #pass - if dbug: - self.dbug.write("About to main_quit\n") - mainloop.quit() - if dbug: - self.dbug.write("After main_quit\n") - self.dbug.close() +from updatedbus import UpdateServerObject + +debug_file = None +def debug(*args): + global debug_file + if not debug_file: + debug_file = open("/home/user/.feedingit/feedingit_update.log", "a") + + debug_file.write (*args) class FeedUpdate(): def __init__(self): self.config = Config(self, CONFIGDIR+"config.ini") self.dbusHandler = UpdateServerObject(self) - self.updateThread = False - - def automaticUpdate(self): - #self.listing.updateFeeds() - if self.updateThread == False: - self.updateThread = Download(self.config, self.dbusHandler) - self.updateThread.start() + self.listing = Listing(self.config, CONFIGDIR) + self.done = False + + jm = JobManager(True) + jm.stats_hook_register (self.job_manager_update, + run_in_main_thread=True) + + self.dbusHandler.UpdateStarted() + for k in self.listing.getListOfFeeds(): + self.listing.updateFeed (k) - def stopUpdate(self): - try: - self.updateThread.running = False - except: + def job_manager_update(self, jm, old_stats, new_stats, updated_feed): + if not new_stats['jobs-queued'] and not new_stats['jobs-in-progress']: + self.dbusHandler.UpdateFinished() + self.dbusHandler.ArticleCountUpdated() + self.done = True mainloop.quit() + def stopUpdate(self): + print "Stop update called." + JobManager().quit() + import dbus.mainloop.glib dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) gobject.threads_init() +mainthread.init() mainloop = gobject.MainLoop() app_lock = get_lock("app_lock") if app_lock != None: - try: - feed = FeedUpdate() - mainloop.run() - del app_lock - except: - import traceback - file = open("/home/user/.feedingit/feedingit_update.log", "w") - traceback.print_exc(file=file) - file.close() + feed = FeedUpdate() + while not feed.done: + try: + mainloop.run() + except KeyboardInterrupt: + print "Interrupted. Quitting." + JobManager().quit() + del app_lock else: - file = open("/home/user/.feedingit/feedingit_update.log", "a") - file.write("Update in progress") - file.close() - + debug("Update in progress") diff --git a/src/updatedbus.py b/src/updatedbus.py index e712c7e..8e110ae 100644 --- a/src/updatedbus.py +++ b/src/updatedbus.py @@ -25,14 +25,19 @@ import dbus import dbus.service +import mainthread +@mainthread.mainthread def get_lock(key): try: bus_name = dbus.service.BusName('org.marcoz.feedingit.lock_%s' %key,bus=dbus.SessionBus(), do_not_queue=True) except: bus_name = None return bus_name - + +def release_lock(lock): + assert lock is not None + mainthread.execute(lock.__del__, async=True) class UpdateServerObject(dbus.service.Object): def __init__(self, app): -- 1.7.9.5