Move download management from frontends to rss_sqlite.py.
authorNeal H. Walfield <neal@walfield.org>
Sat, 23 Jul 2011 08:38:18 +0000 (10:38 +0200)
committerNeal H. Walfield <neal@gnu.org>
Thu, 28 Jul 2011 21:28:07 +0000 (23:28 +0200)
 - Move download logic to rss_sqlite.py:
  - Move thread management to jobmanager.py.
  - Queue jobs in rss_sqlite.py.
  - Manage per-feed DBus locks in rss_sqlite.py.
  - Get config settings in rss_sqlite.py on demand.

 - Make downloading more robust:
  - Only call non-thread-safe code (in particular, DBus and gconf
    functionality) from the main thread using mainthread.py.
  - Improve responsiveness of the frontend but yielding during CPU
    intense activities and only allowing a single slave to parse
    a feed at a time.
  - When downloads are canceled, do our best to quit cleanly and
    quickly.

 - Update frontends to use the new functionality.
  - Remove redundant code, in particular, download functionality.
  - Rework FeedingIt.py's download bar.

src/FeedingIt-Web.py
src/FeedingIt.py
src/config.py
src/httpprogresshandler.py [new file with mode: 0644]
src/jobmanager.py [new file with mode: 0644]
src/mainthread.py [new file with mode: 0644]
src/rss_sqlite.py
src/update_feeds.py
src/updatedbus.py

index 7f6b2a9..4fc6965 100644 (file)
@@ -44,7 +44,7 @@ def sanitize(text):
 
 def start_server():
     global listing
-    listing = Listing(CONFIGDIR)
+    listing = Listing(config, CONFIGDIR)
     httpd = BaseHTTPServer.HTTPServer(("127.0.0.1", PORT), Handler)
     httpd.serve_forever()
 
@@ -69,37 +69,11 @@ class App():
 #        download = Download(listing, feeds)
 #        download.start()
     
-class Download(Thread):
-    def __init__(self, listing, keys):
-        Thread.__init__(self)
-        self.listing = listing
-        self.keys = keys
-        
-    def run (self):
-        updateDbusHandler.UpdateStarted()
-        for key in self.keys:
-            print "Start update: %s" % key
-            updatingFeeds.append(key)
-            (use_proxy, proxy) = config.getProxy()
-            try:
-                if use_proxy:
-                        self.listing.updateFeed(key, proxy=proxy, imageCache=config.getImageCache() )
-                else:
-                        self.listing.updateFeed(key, imageCache=config.getImageCache() )
-            except:
-                print "Error updating feed: %s" %key
-            updatingFeeds.remove(key)
-            print "End update: %s" % key
-        updateDbusHandler.UpdateFinished()
-
 class Handler(BaseHTTPServer.BaseHTTPRequestHandler):
     def updateAll(self):
-        feeds = []
         for cat in listing.getListOfCategories():
             for feed in listing.getSortedListOfKeys("Manual", category=cat):
-                feeds.append(feed)
-        download = Download(listing, feeds)
-        download.start()
+                listing.updateFeed(feed)
     
     def openTaskSwitch(self):
         import subprocess
@@ -304,9 +278,6 @@ import thread
 
 
 
-# Start the HTTP server in a new thread
-thread.start_new_thread(start_server, ())
-
 # Initialize the glib mainloop, for dbus purposes
 from feedingitdbus import ServerObject
 from updatedbus import UpdateServerObject, get_lock
@@ -315,11 +286,18 @@ import gobject
 gobject.threads_init()
 import dbus.mainloop.glib
 dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
+import mainthread
+mainthread.init()
+from jobmanager import JobManager
+JobManager(True)
 
 global updateDbusHandler, dbusHandler
 app = App()
 dbusHandler = ServerObject(app)
 updateDbusHandler = UpdateServerObject(app)
 
+# Start the HTTP server in a new thread
+thread.start_new_thread(start_server, ())
+
 mainloop = gobject.MainLoop()
 mainloop.run()
index 9ba9469..f7a45b3 100644 (file)
@@ -2,6 +2,7 @@
 
 # 
 # Copyright (c) 2007-2008 INdT.
+# Copyright (c) 2011 Neal H. Walfield
 # This program is free software: you can redistribute it and/or modify
 # it under the terms of the GNU Lesser General Public License as published by
 # the Free Software Foundation, either version 3 of the License, or
@@ -44,11 +45,13 @@ from feedingitdbus import ServerObject
 from updatedbus import UpdateServerObject, get_lock
 from config import Config
 from cgi import escape
+import weakref
 
 from rss_sqlite import Listing
 from opml import GetOpmlData, ExportOpmlData
 
-from urllib2 import install_opener, build_opener
+import mainthread
+from jobmanager import JobManager
 
 from socket import setdefaulttimeout
 timeout = 5
@@ -270,90 +273,95 @@ class AddCategoryWizard(gtk.Dialog):
     def getData(self):
         return self.nameEntry.get_text()
         
-class Download(Thread):
-    def __init__(self, listing, key, config):
-        Thread.__init__(self)
-        self.listing = listing
-        self.key = key
-        self.config = config
-        
-    def run (self):
-        (use_proxy, proxy) = self.config.getProxy()
-        key_lock = get_lock(self.key)
-        if key_lock != None:
-            if use_proxy:
-                self.listing.updateFeed(self.key, self.config.getExpiry(), proxy=proxy, imageCache=self.config.getImageCache() )
-            else:
-                self.listing.updateFeed(self.key, self.config.getExpiry(), imageCache=self.config.getImageCache() )
-        del key_lock
-
-        
 class DownloadBar(gtk.ProgressBar):
-    def __init__(self, parent, listing, listOfKeys, config, single=False):
-        
-        update_lock = get_lock("update_lock")
-        if update_lock != None:
-            gtk.ProgressBar.__init__(self)
-            self.listOfKeys = listOfKeys[:]
-            self.listing = listing
-            self.total = len(self.listOfKeys)
-            self.config = config
-            self.current = 0
-            self.single = single
-            (use_proxy, proxy) = self.config.getProxy()
-            if use_proxy:
-                opener = build_opener(proxy)
-            else:
-                opener = build_opener()
-
-            opener.addheaders = [('User-agent', USER_AGENT)]
-            install_opener(opener)
-
-            if self.total>0:
-                # In preparation for i18n/l10n
-                def N_(a, b, n):
-                    return (a if n == 1 else b)
-
-                self.set_text(N_('Updating %d feed', 'Updating %d feeds', self.total) % self.total)
-
-                self.fraction = 0
-                self.set_fraction(self.fraction)
-                self.show_all()
-                # Create a timeout
-                self.timeout_handler_id = gobject.timeout_add(50, self.update_progress_bar)
-
-    def update_progress_bar(self):
-        #self.progress_bar.pulse()
-        if activeCount() < 4:
-            x = activeCount() - 1
-            k = len(self.listOfKeys)
-            fin = self.total - k - x
-            fraction = float(fin)/float(self.total) + float(x)/(self.total*2.)
-            #print x, k, fin, fraction
-            self.set_fraction(fraction)
-
-            if len(self.listOfKeys)>0:
-                self.current = self.current+1
-                key = self.listOfKeys.pop()
-                #if self.single == True:
-                    # Check if the feed is being displayed
-                download = Download(self.listing, key, self.config)
-                download.start()
-                return True
-            elif activeCount() > 1:
-                return True
+    @classmethod
+    def class_init(cls):
+        if hasattr (cls, 'class_init_done'):
+            return
+
+        jm = JobManager ()
+        jm.stats_hook_register (cls.update_progress,
+                                run_in_main_thread=True)
+
+        cls.downloadbars = []
+        # Total number of jobs we are monitoring.
+        cls.total = 0
+        # Number of jobs complete (of those that we are monitoring).
+        cls.done = 0
+        # Percent complete.
+        cls.progress = 0
+
+        cls.class_init_done = True
+
+    def __init__(self, parent):
+        self.class_init ()
+
+        gtk.ProgressBar.__init__(self)
+
+        self.downloadbars.append(weakref.ref (self))
+        self.set_fraction(0)
+        self.__class__.update_bars()
+        self.show_all()
+
+    @classmethod
+    def downloading(cls):
+        return hasattr (cls, 'jobs_at_start')
+
+    @classmethod
+    def update_progress(cls, jm, old_stats, new_stats, updated_feed):
+        if not cls.downloading():
+            cls.jobs_at_start = old_stats['jobs-completed']
+
+        if not cls.downloadbars:
+            return
+
+        if new_stats['jobs-in-progress'] + new_stats['jobs-queued'] == 0:
+            del cls.jobs_at_start
+            for ref in cls.downloadbars:
+                bar = ref ()
+                if bar is None:
+                    # The download bar disappeared.
+                    cls.downloadbars.remove (ref)
+                else:
+                    bar.emit("download-done", None)
+            return
+
+        # This should never be called if new_stats['jobs'] is 0, but
+        # just in case...
+        cls.total = max (1, new_stats['jobs'] - cls.jobs_at_start)
+        cls.done = new_stats['jobs-completed'] - cls.jobs_at_start
+        cls.progress = 1 - (new_stats['jobs-in-progress'] / 2.
+                            + new_stats['jobs-queued']) / cls.total
+        cls.update_bars()
+
+        if updated_feed:
+            for ref in cls.downloadbars:
+                bar = ref ()
+                if bar is None:
+                    # The download bar disappeared.
+                    cls.downloadbars.remove (ref)
+                else:
+                    bar.emit("download-done", updated_feed)
+
+    @classmethod
+    def update_bars(cls):
+        # In preparation for i18n/l10n
+        def N_(a, b, n):
+            return (a if n == 1 else b)
+
+        text = (N_('Updated %d of %d feeds ', 'Updated %d of %d feeds',
+                   cls.total)
+                % (cls.done, cls.total))
+
+        for ref in cls.downloadbars:
+            bar = ref ()
+            if bar is None:
+                # The download bar disappeared.
+                cls.downloadbars.remove (ref)
             else:
-                #self.waitingWindow.destroy()
-                #self.destroy()
-                try:
-                    del self.update_lock
-                except:
-                    pass
-                self.emit("download-done", "success")
-                return False 
-        return True
-    
-    
+                bar.set_text(text)
+                bar.set_fraction(cls.progress)
+
 class SortList(hildon.StackableWindow):
     def __init__(self, parent, listing, feedingit, after_closing, category=None):
         hildon.StackableWindow.__init__(self)
@@ -741,7 +749,14 @@ class DisplayFeed(hildon.StackableWindow):
         self.set_app_menu(menu)
         menu.show_all()
         
+        self.main_vbox = gtk.VBox(False, 0)
+        self.add(self.main_vbox)
+
+        self.pannableFeed = None
         self.displayFeed()
+
+        if DownloadBar.downloading ():
+            self.show_download_bar ()
         
         self.connect('configure-event', self.on_configure_event)
         self.connect("destroy", self.destroyWindow)
@@ -771,6 +786,9 @@ class DisplayFeed(hildon.StackableWindow):
         return escape(unescape(title).replace("<em>","").replace("</em>","").replace("<nobr>","").replace("</nobr>","").replace("<wbr>",""))
 
     def displayFeed(self):
+        if self.pannableFeed:
+            self.pannableFeed.destroy()
+
         self.pannableFeed = hildon.PannableArea()
 
         self.pannableFeed.set_property('hscrollbar-policy', gtk.POLICY_NEVER)
@@ -842,7 +860,7 @@ class DisplayFeed(hildon.StackableWindow):
             markup = ENTRY_TEMPLATE % (self.config.getFontSize(), "No Articles To Display")
             self.feedItems.append((markup, ""))
 
-        self.add(self.pannableFeed)
+        self.main_vbox.pack_start(self.pannableFeed)
         self.show_all()
 
     def clear(self):
@@ -935,22 +953,24 @@ class DisplayFeed(hildon.StackableWindow):
         self.displayFeed()
 
     def button_update_clicked(self, button):
-        #bar = DownloadBar(self, self.listing, [self.key,], self.config ) 
+        self.listing.updateFeed (self.key, priority=-1)
+            
+    def show_download_bar(self):
         if not type(self.downloadDialog).__name__=="DownloadBar":
-            self.pannableFeed.destroy()
-            self.vbox = gtk.VBox(False, 10)
-            self.downloadDialog = DownloadBar(self.window, self.listing, [self.key,], self.config, single=True )
-            self.downloadDialog.connect("download-done", self.onDownloadsDone)
-            self.vbox.pack_start(self.downloadDialog, expand=False, fill=False)
-            self.add(self.vbox)
+            self.downloadDialog = DownloadBar(self.window)
+            self.downloadDialog.connect("download-done", self.onDownloadDone)
+            self.main_vbox.pack_end(self.downloadDialog,
+                                    expand=False, fill=False)
             self.show_all()
-            
-    def onDownloadsDone(self, *widget):
-        self.vbox.destroy()
-        self.feed = self.listing.getFeed(self.key)
-        self.displayFeed()
-        self.updateDbusHandler.ArticleCountUpdated()
         
+    def onDownloadDone(self, widget, feed):
+        if feed == self.feed or feed is None:
+            self.downloadDialog.destroy()
+            self.downloadDialog = False
+            self.feed = self.listing.getFeed(self.key)
+            self.displayFeed()
+            self.updateDbusHandler.ArticleCountUpdated()
+
     def buttonReadAllClicked(self, button):
         #self.clear()
         self.feed.markAllAsRead()
@@ -1007,8 +1027,8 @@ class FeedingIt:
             self.stopButton.destroy()
         except:
             pass
-        self.listing = Listing(CONFIGDIR)
-        
+        self.listing = Listing(self.config, CONFIGDIR)
+
         self.downloadDialog = False
         try:
             self.orientation = FremantleRotation(__appname__, main_window=self.window, app=self)
@@ -1080,8 +1100,29 @@ class FeedingIt:
         self.checkAutoUpdate()
         
         hildon.hildon_gtk_window_set_progress_indicator(self.window, 0)
-        gobject.idle_add(self.enableDbus)
+        gobject.idle_add(self.late_init)
         
+    def job_manager_update(self, jm, old_stats, new_stats, updated_feed):
+        if (not self.downloadDialog
+            and new_stats['jobs-in-progress'] + new_stats['jobs-queued'] > 0):
+            self.updateDbusHandler.UpdateStarted()
+
+            self.downloadDialog = DownloadBar(self.window)
+            self.downloadDialog.connect("download-done", self.onDownloadDone)
+            self.mainVbox.pack_end(self.downloadDialog, expand=False, fill=False)
+            self.mainVbox.show_all()
+
+            if self.__dict__.get ('disp', None):
+                self.disp.show_download_bar ()
+
+    def onDownloadDone(self, widget, feed):
+        if feed is None:
+            self.downloadDialog.destroy()
+            self.downloadDialog = False
+            self.displayListing()
+            self.updateDbusHandler.UpdateFinished()
+            self.updateDbusHandler.ArticleCountUpdated()
+
     def stop_running_update(self, button):
         self.stopButton.set_sensitive(False)
         import dbus
@@ -1092,10 +1133,15 @@ class FeedingIt:
         iface = dbus.Interface(remote_object, 'org.marcoz.feedingit')
         iface.StopUpdate()
     
-    def enableDbus(self):
+    def late_init(self):
         self.dbusHandler = ServerObject(self)
         self.updateDbusHandler = UpdateServerObject(self)
 
+        jm = JobManager()
+        jm.stats_hook_register (self.job_manager_update,
+                                run_in_main_thread=True)
+        JobManager(True)
+
     def button_markAll(self, button):
         for key in self.listing.getListOfFeeds():
             feed = self.listing.getFeed(key)
@@ -1142,12 +1188,8 @@ class FeedingIt:
         SortList(self.window, self.listing, self, after_closing)
 
     def button_update_clicked(self, button, key):
-        if not type(self.downloadDialog).__name__=="DownloadBar":
-            self.updateDbusHandler.UpdateStarted()
-            self.downloadDialog = DownloadBar(self.window, self.listing, self.listing.getListOfFeeds(), self.config )
-            self.downloadDialog.connect("download-done", self.onDownloadsDone)
-            self.mainVbox.pack_end(self.downloadDialog, expand=False, fill=False)
-            self.mainVbox.show_all()
+        for k in self.listing.getListOfFeeds():
+            self.listing.updateFeed (k)
         #self.displayListing()
 
     def onDownloadsDone(self, *widget):
@@ -1309,11 +1351,28 @@ class FeedingIt:
     def onFeedClosedTimeout(self):
         del self.feed_lock
         self.updateDbusHandler.ArticleCountUpdated()
-     
+
+    def quit(self, *args):
+        self.window.hide()
+
+        if hasattr (self, 'app_lock'):
+            del self.app_lock
+
+        # Wait until all slave threads have properly exited before
+        # terminating the mainloop.
+        jm = JobManager()
+        jm.quit ()
+        stats = jm.stats()
+        if stats['jobs-in-progress'] == 0 and stats['jobs-queued'] == 0:
+            gtk.main_quit ()
+        else:
+            gobject.timeout_add(500, self.quit)
+
+        return False
+
     def run(self):
-        self.window.connect("destroy", gtk.main_quit)
+        self.window.connect("destroy", self.quit)
         gtk.main()
-        del self.app_lock
 
     def prefsClosed(self, *widget):
         try:
@@ -1353,7 +1412,7 @@ class FeedingIt:
     def stopUpdate(self):
         # Not implemented in the app (see update_feeds.py)
         try:
-            self.downloadDialog.listOfKeys = []
+            JobManager().cancel ()
         except:
             pass
     
@@ -1367,6 +1426,8 @@ class FeedingIt:
         return status
 
 if __name__ == "__main__":
+    mainthread.init ()
+
     gobject.signal_new("feed-closed", DisplayFeed, gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, (gobject.TYPE_PYOBJECT,))
     gobject.signal_new("article-closed", DisplayArticle, gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, (gobject.TYPE_PYOBJECT,))
     gobject.signal_new("article-deleted", DisplayArticle, gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, (gobject.TYPE_PYOBJECT,))
index 8aa45ce..65fbddd 100644 (file)
@@ -32,6 +32,7 @@
 from ConfigParser import RawConfigParser
 from gconf import client_get_default
 from urllib2 import ProxyHandler
+from mainthread import mainthread
 
 VERSION = "52"
 
@@ -280,6 +281,7 @@ class Config():
         return ranges["orientation"].index(self.config["orientation"])
     def getImageCache(self):
         return self.config["imageCache"]
+    @mainthread
     def getProxy(self):
         if self.config["proxy"] == False:
             return (False, None)
diff --git a/src/httpprogresshandler.py b/src/httpprogresshandler.py
new file mode 100644 (file)
index 0000000..78eb39c
--- /dev/null
@@ -0,0 +1,143 @@
+#!/usr/bin/env python2.5
+
+# Copyright (c) 2011 Neal H. Walfield <neal@walfield.org>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+import urllib2 
+import httplib
+import time
+
+class ProgressSocket(object):
+    """
+    Monitor what is being sent and received.
+    """
+    def __init__(self, socket, connection):
+        self.socket = socket
+        self.connection = connection
+
+    def __getattribute__(self, attr):
+        # print "%s.__getattribute__(%s)" % (self.__class__.__name__, attr)
+
+        def send(data):
+            # 100k at a time.
+            bs = 100 * 1024
+            sent = 0
+            while sent < len (data):
+                remaining = len (data) - sent
+                if remaining < bs:
+                    amount = remaining
+                else:
+                    amount = bs
+        
+                self.socket.sendall(data[sent:sent+amount])
+                sent += amount
+                self.connection.stats['sent'] += amount
+                self.connection.opener.stats['sent'] += amount
+        
+                if self.connection.callback is not None:
+                    self.connection.callback ()
+        
+        def read(*args, **kwargs):
+            data = self.socket.read (*args, **kwargs)
+            # print "GOT: %s" % (data[0:240],)
+            self.connection.stats['received'] += len (data)
+            self.connection.opener.stats['received'] += len (data)
+            if self.connection.callback is not None:
+                self.connection.callback ()
+            return data
+
+        if attr == 'send' or attr == 'sendall':
+            return send
+        if attr == 'read':
+            return read
+
+        try:
+            return super (ProgressSocket, self).__getattribute__(attr)
+        except AttributeError:
+            socket = super (ProgressSocket, self).__getattribute__('socket')
+            return socket.__getattribute__(attr)
+
+    def makefile(self, mode, bufsize):
+        return ProgressSocket (socket=self.socket.makefile(mode, bufsize),
+                               connection=self.connection)
+
+    def close(self):
+        return self.socket.close ()
+
+def HTTPProgressConnectionBuilder(callback, opener):
+    class HTTPProgressConnection(httplib.HTTPConnection):
+        def __init__(self, *args, **kwargs):
+            self.method = None
+            self.url = None
+            return httplib.HTTPConnection.__init__ (self, *args, **kwargs)
+
+        def putrequest(self, method, url, *args, **kwargs):
+            self.method = method
+            self.url = url
+            return httplib.HTTPConnection.putrequest (
+                self, method, url, *args, **kwargs)
+
+        def connect(self):
+            httplib.HTTPConnection.connect(self)
+            # Wrap the socket.
+            self.sock = ProgressSocket(socket=self.sock,
+                                       connection=self)
+
+    HTTPProgressConnection.callback = callback
+    HTTPProgressConnection.opener = opener
+    HTTPProgressConnection.stats \
+        = {'sent': 0, 'received': 0, 'started':time.time()}
+    return HTTPProgressConnection
+
+class HTTPProgressHandler(urllib2.HTTPHandler):
+    def __init__(self, callback):
+        self.callback = callback
+        self.stats = {'sent': 0, 'received': 0, 'started':time.time()}
+        return urllib2.HTTPHandler.__init__(self)
+
+    def http_open(self, request):
+        return self.do_open(
+            HTTPProgressConnectionBuilder(self.callback, self),
+            request)
+
+if __name__ == '__main__':
+    def callback(connection):
+        req = ""
+        if connection.method:
+            req += connection.method + " "
+        req += connection.host + ':' + str (connection.port)
+        if connection.url:
+            req += connection.url
+
+        cstats = connection.stats
+        ostats = connection.opener.stats
+
+        print (("%s: connection: %d sent, %d received: %d kb/s; "
+                + "opener: %d sent, %d received, %d kb/s")
+               % (req,
+                  cstats['sent'], cstats['received'],
+                  ((cstats['sent'] + cstats['received'])
+                   / (time.time() - cstats['started']) / 1024),
+                  ostats['sent'], ostats['received'],
+                  ((ostats['sent'] + ostats['received'])
+                   / (time.time() - ostats['started']) / 1024)))
+
+    opener = urllib2.build_opener(HTTPProgressHandler(callback))
+
+    data = opener.open ('http://google.com')
+    downloaded = 0
+    for d in data:
+        downloaded += len (d)
+    print "Document is %d bytes in size" % (downloaded,)
diff --git a/src/jobmanager.py b/src/jobmanager.py
new file mode 100644 (file)
index 0000000..8ca4df8
--- /dev/null
@@ -0,0 +1,355 @@
+#!/usr/bin/env python2.5
+
+# Copyright (c) 2011 Neal H. Walfield <neal@walfield.org>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+import threading
+import thread
+import traceback
+import heapq
+import sys
+import mainthread
+
+def debug(*args):
+    if False:
+        sys.stdout.write(*args)
+        sys.stdout.write("\n")
+
+# The default priority.  Like nice(), a smaller numeric priority
+# corresponds to a higher priority class.
+default_priority = 0
+
+class JobRunner(threading.Thread):
+    def __init__(self, job_manager):
+        threading.Thread.__init__(self)
+        self.job_manager = job_manager
+
+    def run (self):
+        have_lock = True
+        self.job_manager.lock.acquire ()
+        try:
+            while self.job_manager.pause == 0 and not self.job_manager.do_quit:
+                try:
+                    _, key, job = heapq.heappop (self.job_manager.queue)
+                except IndexError:
+                    return
+    
+                try:
+                    self.job_manager.in_progress.append (key)
+                    self.job_manager.lock.release ()
+                    have_lock = False
+        
+                    # Execute the job.
+                    try:
+                        job ()
+                    except KeyboardInterrupt:
+                        # This is handled below and doesn't require a
+                        # traceback.
+                        raise
+                    except:
+                        print ("Executing job %s (%s) from thread %s: %s"
+                               % (str (key), str (job),
+                                  threading.currentThread(),
+                                  traceback.format_exc ()))
+        
+                    self.job_manager.lock.acquire ()
+                    have_lock = True
+    
+                    assert key in self.job_manager.in_progress
+                finally:
+                    try:
+                        self.job_manager.in_progress.remove (key)
+                    except ValueError:
+                        pass
+    
+                debug("Finished executing job %s (%s)" % (key, job,))
+    
+                self.job_manager._stats_hooks_run ({'job':job, 'key':key})
+        except KeyboardInterrupt:
+            debug("%s: KeyboardInterrupt" % threading.currentThread())
+            thread.interrupt_main()
+            debug("%s: Forwarded KeyboardInterrupt to main thread"
+                  % threading.currentThread())
+        finally:
+            if have_lock:
+                self.job_manager.lock.release ()
+
+            assert self in self.job_manager.threads
+            self.job_manager.threads.remove (self)
+
+            debug ("Job runner %s (%d left) exiting."
+                   % (threading.currentThread(),
+                      len (self.job_manager.threads)))
+
+_jm = None
+def JobManager(start=False):
+    """
+    Return the job manager instance.  The job manager will not start
+    executing jobs until this is called with start set to True.  Note:
+    you can still queue jobs.
+    """
+    global _jm
+    if _jm is None:
+        _jm = _JobManager ()
+    if start and not _jm.started:
+        _jm.started = True
+        if _jm.jobs > 0:
+            _jm._stats_hooks_run ()
+        _jm.tickle ()
+
+    return _jm
+
+class _JobManager(object):
+    def __init__(self, started=False, num_threads=4):
+        """
+        Initialize the job manager.
+
+        If started is false, jobs may be queued, but jobs will not be
+        started until start() is called.
+        """
+        # A reentrant lock so that a job runner can call stat without
+        # dropping the lock.
+        self.lock = threading.RLock()
+
+        # If we can start executing jobs.
+        self.started = started
+
+        # The maximum number of threads to use for executing jobs.
+        self.num_threads = num_threads
+
+        # List of jobs (priority, key, job) that are queued for
+        # execution.
+        self.queue = []
+        # List of keys of the jobs that are being executed.
+        self.in_progress = []
+        # List of threads.
+        self.threads = []
+
+        # If 0, jobs may execute, otherwise, job execution is paused.
+        self.pause = 0
+
+        # The total number of jobs that this manager ever executed.
+        self.jobs = 0
+
+        # A list of status hooks to execute when the stats change.
+        self._stats_hooks = []
+        self._current_stats = self.stats ()
+
+        self.do_quit = False
+
+    def _lock(f):
+        def wrapper(*args, **kwargs):
+            self = args[0]
+            self.lock.acquire ()
+            try:
+                return f(*args, **kwargs)
+            finally:
+                self.lock.release()
+        return wrapper
+
+    @_lock
+    def start(self):
+        """
+        Start executing jobs.
+        """
+        if self.started:
+            return
+        if self.jobs > 0:
+            self._stats_hooks_run ()
+        self.tickle ()
+
+    @_lock
+    def tickle(self):
+        """
+        Ensure that there are enough job runners for the number of
+        pending jobs.
+        """
+        if self.do_quit:
+            debug("%s.quit called, not creating new threads."
+                  % self.__class__.__name__)
+            return
+
+        if self.pause > 0:
+            # Job execution is paused.  Don't start any new threads.
+            debug("%s.tickle(): Not doing anything: paused"
+                  % (self.__class__.__name__))
+            return
+
+        debug("%s.tickle: Have %d threads (can start %d); %d jobs queued"
+              % (self.__class__.__name__,
+                 len (self.threads), self.num_threads, len (self.queue)))
+        if len (self.threads) < self.num_threads:
+            for _ in range (min (len (self.queue),
+                                 self.num_threads - len (self.threads))):
+                thread = JobRunner (self)
+                # Setting threads as daemons means faster shutdown
+                # when the main thread exists, but it results in
+                # exceptions and occassional setfaults.
+                # thread.setDaemon(True)
+                self.threads.append (thread)
+                thread.start ()
+                debug("Now have %d threads" % len (self.threads))
+
+    @_lock
+    def execute(self, job, key=None, priority=default_priority):
+        """
+        Enqueue a job for execution.  job is a function to execute.
+        If key is not None, the job is only enqueued if there is no
+        job that is inprogress or enqueued with the same key.
+        priority is the job's priority.  Like nice(), a smaller
+        numeric priority corresponds to a higher priority class.  Jobs
+        are executed highest priority first, in the order that they
+        were added.
+        """
+        if self.do_quit:
+            debug("%s.quit called, not enqueuing new jobs."
+                  % self.__class__.__name__)
+
+        if key is not None:
+            if key in self.in_progress:
+                return
+            for item in self.queue:
+                if item[1] == key:
+                    if item[0][0] < priority:
+                        # Priority raised.
+                        item[0][0] = priority
+                        self.queue = heapq.heapify (self.queue)
+                    return
+
+        # To ensure that jobs with the same priority are executed
+        # in the order they are added, we set the priority to
+        # [priority, next (monotomic counter)].
+        self.jobs += 1
+        heapq.heappush (self.queue, [[priority, self.jobs], key, job])
+
+        if self.started:
+            self._stats_hooks_run ()
+            self.tickle ()
+        else:
+            debug("%s not initialized. delaying execution of %s (%s)"
+                  % (self.__class__.__name__, key, str (job),))
+
+    @_lock
+    def pause(self):
+        """
+        Increasement the pause count.  When the pause count is greater
+        than 0, job execution is suspended.
+        """
+        self.pause += 1
+
+        if self.pause == 1:
+            self._stats_hooks_run ()
+
+    @_lock
+    def resume(self):
+        """
+        Decrement the pause count.  If the pause count is greater than
+        0 and this decrement brings it to 0, enqueued jobs are
+        resumed.
+        """
+        assert self.pause > 0
+        self.pause -= 1
+        if not self.paused():
+            self._stats_hooks_run ()
+            self.tickle ()
+
+    @_lock
+    def paused(self):
+        """
+        Returns whether job execution is paused.
+        """
+        return self.pause > 0
+
+    @_lock
+    def cancel(self):
+        """
+        Cancel any pending jobs.
+        """
+        self.queue = []
+        self._stats_hooks_run ()
+
+    def quit(self):
+        self.cancel ()
+        self.do_quit = True
+
+    @_lock
+    def stats(self):
+        """
+        Return a dictionary consisting of:
+
+          - 'paused': whether execution is paused
+          - 'jobs': the total number of jobs this manager has
+              executed, is executing or are queued
+          - 'jobs-completed': the numer of jobs that have completed
+          - 'jobs-in-progress': the number of jobs in progress
+          - 'jobs-queued': the number of jobs currently queued
+        """
+        return {'paused': self.paused(),
+                'jobs': self.jobs,
+                'jobs-completed':
+                  self.jobs - len (self.in_progress) - len (self.queue),
+                'jobs-in-progress': len (self.in_progress),
+                'jobs-queued': len (self.queue)
+                }
+
+    def stats_hook_register(self, func, *args, **kwargs):
+        """
+        Registers a function to be called when the job status changes.
+        Passed the following parameters:
+
+          - the JobManager instance.
+          - the previous stats (as returned by stats)
+          - the current stats
+          - the job that was completed (or None)
+
+        Note: the hook may not be run in the main thread!
+        """
+        mainthread=False
+        try:
+            mainthread = kwargs['run_in_main_thread']
+            del kwargs['run_in_main_thread']
+        except KeyError:
+            pass
+        self._stats_hooks.append ([func, mainthread, args, kwargs])
+
+    def _stats_hooks_run(self, completed_job=None):
+        """
+        Run the stats hooks.
+        """
+        # if not self._stats_hooks:
+        #     return
+
+        self.lock.acquire ()
+        try:
+            old_stats = self._current_stats
+            self._current_stats = self.stats ()
+            current_stats = self._current_stats
+        finally:
+            self.lock.release ()
+
+        debug("%s -> %s" % (str (old_stats), str (current_stats)))
+
+        for (f, run_in_main_thread, args, kwargs) in self._stats_hooks:
+            if run_in_main_thread:
+                debug("JobManager._stats_hooks_run: Running %s in main thread"
+                      % f)
+                mainthread.execute(
+                    f, self, old_stats, current_stats, completed_job,
+                    async=True, *args, **kwargs)
+            else:
+                debug("JobManager._stats_hooks_run: Running %s in any thread"
+                      % f)
+                f(self, old_stats, current_stats, completed_job,
+                  *args, **kwargs)
diff --git a/src/mainthread.py b/src/mainthread.py
new file mode 100644 (file)
index 0000000..a85cf5a
--- /dev/null
@@ -0,0 +1,157 @@
+#!/usr/bin/env python2.5
+
+# Copyright (c) 2011 Neal H. Walfield <neal@walfield.org>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+import threading
+import traceback
+
+_run_in_main_thread = None
+_main_thread = None
+
+def init(run_in_main_thread=None):
+    """
+    run_in_main_thread is a function that takes a single argument, a
+    callable and returns False.  run_in_main_thread should run the
+    function in the main thread.
+
+    If you are using glib, gobject.idle_add (the default) is
+    sufficient.  (gobject.idle_add is thread-safe.)
+    """
+    if run_in_main_thread is None:
+        import gobject
+        run_in_main_thread = gobject.idle_add
+
+    global _run_in_main_thread
+    assert _run_in_main_thread is None
+    _run_in_main_thread = run_in_main_thread
+
+    global _main_thread
+    _main_thread = threading.currentThread ()
+
+def execute(func, *args, **kwargs):
+    """
+    Execute FUNC in the main thread.
+
+    If kwargs['async'] exists and is True, the function is executed
+    asynchronously (i.e., the thread does not wait for the function to
+    return in which case the function's return value is discarded).
+    Otherwise, this function waits until the function is executed and
+    returns its return value.
+    """
+    async = False
+    try:
+        async = kwargs['async']
+        del kwargs['async']
+    except KeyError:
+        pass
+
+    if threading.currentThread() == _main_thread:
+        if async:
+            try:
+                func (*args, **kwargs)
+            except:
+                print ("mainthread.execute: Executing %s: %s"
+                       % (func, traceback.format_exc ()))
+            return
+        else:
+            return func (*args, **kwargs)
+
+    assert _run_in_main_thread is not None, \
+        "You can't call this function from a non-main thread until you've called init()"
+
+    if not async:
+        cond = threading.Condition()
+
+    result = {}
+    result['done'] = False
+
+    def doit():
+        def it():
+            # Execute the function.
+            assert threading.currentThread() == _main_thread
+
+            try:
+                result['result'] = func (*args, **kwargs)
+            except:
+                print ("mainthread.execute: Executing %s: %s"
+                       % (func, traceback.format_exc ()))
+
+            if not async:
+                cond.acquire ()
+            result['done'] = True
+            if not async:
+                cond.notify ()
+                cond.release ()
+
+            return False
+        return it
+
+    if not async:
+        cond.acquire ()
+    _run_in_main_thread (doit())
+
+    if async:
+        # Don't wait for the method to complete execution.
+        return
+
+    # Wait for the result to become available.
+    while not result['done']:
+        cond.wait ()
+
+    return result.get ('result', None)
+
+if __name__ == "__main__":
+    import sys
+    import gobject
+
+    init()
+
+    def in_main_thread(test_num):
+        assert threading.currentThread() == _main_thread, \
+            "Test %d failed" % (test_num,)
+        return test_num
+
+    mainloop = gobject.MainLoop()
+    gobject.threads_init()
+
+    assert execute (in_main_thread, 1) == 1
+    assert (execute (in_main_thread, 2, async=False) == 2)
+    execute (in_main_thread, 3, async=True)
+
+    class T(threading.Thread):
+        def __init__(self):
+            threading.Thread.__init__(self)
+
+        def run(self):
+            assert threading.currentThread() != _main_thread
+
+            assert execute (in_main_thread, 4) == 4
+            assert (execute (in_main_thread, 5, async=False) == 5)
+            execute (in_main_thread, 6, async=True)
+            execute (mainloop.quit, async=False)
+
+    def start_thread():
+        t = T()
+        t.start()
+        return False
+
+    gobject.idle_add (start_thread)
+    mainloop.run()
+
+def mainthread(f):
+    def wrapper(*args, **kwargs):
+        return execute (f, *args, **kwargs)
+    return wrapper
index 64aef0a..e8b44a2 100644 (file)
@@ -28,6 +28,7 @@ import sqlite3
 from os.path import isfile, isdir
 from shutil import rmtree
 from os import mkdir, remove, utime
+import os
 import md5
 import feedparser
 import time
@@ -35,12 +36,36 @@ import urllib2
 from BeautifulSoup import BeautifulSoup
 from urlparse import urljoin
 from calendar import timegm
+from updatedbus import get_lock, release_lock
 import threading
+import traceback
+from jobmanager import JobManager
+import mainthread
+from httpprogresshandler import HTTPProgressHandler
 
 def getId(string):
     return md5.new(string).hexdigest()
 
+def download_callback(connection):
+    if JobManager().do_quit:
+        raise KeyboardInterrupt
+
+def downloader(progress_handler=None, proxy=None):
+    openers = []
+
+    if progress_handler:
+        openers.append (progress_handler)
+    else:
+        openers.append(HTTPProgressHandler(download_callback))
+
+    if proxy:
+        openers.append (proxy)
+
+    return urllib2.build_opener (*openers)
+
 class Feed:
+    serial_execution_lock = threading.Lock()
+
     def _getdb(self):
         try:
             db = self.tls.db
@@ -63,17 +88,36 @@ class Feed:
             self.db.execute("CREATE TABLE images (id text, imagePath text);")
             self.db.commit()
 
-    def addImage(self, configdir, key, baseurl, url):
+    def addImage(self, configdir, key, baseurl, url, proxy=None, opener=None):
         filename = configdir+key+".d/"+getId(url)
         if not isfile(filename):
             try:
-                f = urllib2.urlopen(urljoin(baseurl,url))
+                if not opener:
+                    opener = downloader(proxy=proxy)
+
+                abs_url = urljoin(baseurl,url)
+                f = opener.open(abs_url)
                 outf = open(filename, "w")
                 outf.write(f.read())
                 f.close()
                 outf.close()
+            except (urllib2.HTTPError, urllib2.URLError), exception:
+                print ("Could not download image %s: %s"
+                       % (abs_url, str (exception)))
+                return None
             except:
-                print "Could not download " + url
+                exception = sys.exc_info()[0]
+
+                print "Downloading image: %s" % abs_url
+                import traceback
+                traceback.print_exc()
+
+                try:
+                    remove(filename)
+                except OSError:
+                    pass
+
+                raise exception
         else:
             #open(filename,"a").close()  # "Touch" the file
             file = open(filename,"a")
@@ -81,147 +125,246 @@ class Feed:
             file.close()
         return filename
 
-    def updateFeed(self, configdir, url, etag, modified, expiryTime=24, proxy=None, imageCache=False):
-        # Expiry time is in hours
-        if proxy == None:
-            tmp=feedparser.parse(url, etag = etag, modified = modified)
-        else:
-            tmp=feedparser.parse(url, etag = etag, modified = modified, handlers = [proxy])
-        expiry = float(expiryTime) * 3600.
+    def updateFeed(self, configdir, url, etag, modified, expiryTime=24, proxy=None, imageCache=False, priority=0, postFeedUpdateFunc=None, *postFeedUpdateFuncArgs):
+        def doit():
+            def it():
+                self._updateFeed(configdir, url, etag, modified, expiryTime, proxy, imageCache, postFeedUpdateFunc, *postFeedUpdateFuncArgs)
+            return it
+        JobManager().execute(doit(), self.key, priority=priority)
 
-        currentTime = 0
-        # Check if the parse was succesful (number of entries > 0, else do nothing)
-        if len(tmp["entries"])>0:
-           currentTime = time.time()
-           # The etag and modified value should only be updated if the content was not null
-           try:
-               etag = tmp["etag"]
-           except KeyError:
-               etag = None
-           try:
-               modified = tmp["modified"]
-           except KeyError:
-               modified = None
-           try:
-               f = urllib2.urlopen(urljoin(tmp["feed"]["link"],"/favicon.ico"))
-               data = f.read()
-               f.close()
-               outf = open(self.dir+"/favicon.ico", "w")
-               outf.write(data)
-               outf.close()
-               del data
-           except:
-               #import traceback
-               #traceback.print_exc()
-                pass
+    def _updateFeed(self, configdir, url, etag, modified, expiryTime=24, proxy=None, imageCache=False, postFeedUpdateFunc=None, *postFeedUpdateFuncArgs):
+        success = False
+        have_serial_execution_lock = False
+        try:
+            update_lock = None
+            update_lock = get_lock("key")
+            if not update_lock:
+                # Someone else is doing an update.
+                return
+
+            download_start = time.time ()
 
+            progress_handler = HTTPProgressHandler(download_callback)
 
-           #reversedEntries = self.getEntries()
-           #reversedEntries.reverse()
+            openers = [progress_handler]
+            if proxy:
+                openers.append (proxy)
+            kwargs = {'handlers':openers}
+            
+            tmp=feedparser.parse(url, etag=etag, modified=modified, **kwargs)
+            download_duration = time.time () - download_start
+    
+            opener = downloader(progress_handler, proxy)
 
-           ids = self.getIds()
+            if JobManager().do_quit:
+                raise KeyboardInterrupt
 
-           tmp["entries"].reverse()
-           for entry in tmp["entries"]:
-               date = self.extractDate(entry)
+            process_start = time.time()
+
+            # Expiry time is in hours
+            expiry = float(expiryTime) * 3600.
+    
+            currentTime = 0
+            http_status = tmp.get ('status', 200)
+    
+            # Check if the parse was succesful.  If the http status code
+            # is 304, then the download was successful, but there is
+            # nothing new.  Indeed, no content is returned.  This make a
+            # 304 look like an error because there are no entries and the
+            # parse fails.  But really, everything went great!  Check for
+            # this first.
+            if http_status == 304:
+                success = True
+            elif len(tmp["entries"])==0 and not tmp.version:
+                # An error occured fetching or parsing the feed.  (Version
+                # will be either None if e.g. the connection timed our or
+                # '' if the data is not a proper feed)
+                print ("Error fetching %s: version is: %s: error: %s"
+                       % (url, str (tmp.version),
+                          str (tmp.get ('bozo_exception', 'Unknown error'))))
+                print tmp
+            else:
+               currentTime = time.time()
+               # The etag and modified value should only be updated if the content was not null
                try:
-                   entry["title"]
-               except:
-                   entry["title"] = "No Title"
+                   etag = tmp["etag"]
+               except KeyError:
+                   etag = None
                try:
-                   entry["link"]
-               except:
-                   entry["link"] = ""
+                   modified = tmp["modified"]
+               except KeyError:
+                   modified = None
                try:
-                   entry["author"]
-               except:
-                   entry["author"] = None
-               if(not(entry.has_key("id"))):
-                   entry["id"] = None 
-               tmpEntry = {"title":entry["title"], "content":self.extractContent(entry),
-                            "date":date, "link":entry["link"], "author":entry["author"], "id":entry["id"]}
-               id = self.generateUniqueId(tmpEntry)
-               
-               #articleTime = time.mktime(self.entries[id]["dateTuple"])
-               soup = BeautifulSoup(self.getArticle(tmpEntry)) #tmpEntry["content"])
-               images = soup('img')
-               baseurl = tmpEntry["link"]
-               #if not id in ids:
-               if imageCache:
-                   for img in images:
-                      try:
-                        filename = self.addImage(configdir, self.key, baseurl, img['src'])
-                        img['src']="file://%s" %filename
-                        count = self.db.execute("SELECT count(1) FROM images where id=? and imagePath=?;", (id, filename )).fetchone()[0]
-                        if count == 0:
-                            self.db.execute("INSERT INTO images (id, imagePath) VALUES (?, ?);", (id, filename) )
-                      except:
-                          import traceback
-                          traceback.print_exc()
-                          print "Error downloading image %s" % img
-               tmpEntry["contentLink"] = configdir+self.key+".d/"+id+".html"
-               file = open(tmpEntry["contentLink"], "w")
-               file.write(soup.prettify())
-               file.close()
-               if id in ids:
-                   self.db.execute("UPDATE feed SET updated=? WHERE id=?;", (currentTime, id) )
-                   self.db.commit()
-               else:
-                   values = (id, tmpEntry["title"], tmpEntry["contentLink"], tmpEntry["date"], currentTime, tmpEntry["link"], 0)
-                   self.db.execute("INSERT INTO feed (id, title, contentLink, date, updated, link, read) VALUES (?, ?, ?, ?, ?, ?, ?);", values)
-                   self.db.commit()
-#               else:
-#                   try:
-#                       self.db.execute("UPDATE feed SET updated=? WHERE id=?;", (currentTime, id) )
-#                       self.db.commit()
-#                       filename = configdir+self.key+".d/"+id+".html"
-#                       file = open(filename,"a")
-#                       utime(filename, None)
-#                       file.close()
-#                       images = self.db.execute("SELECT imagePath FROM images where id=?;", (id, )).fetchall()
-#                       for image in images:
-#                            file = open(image[0],"a")
-#                            utime(image[0], None)
-#                            file.close()
-#                   except:
-#                       pass
-           self.db.commit()
-            
-           
-        rows = self.db.execute("SELECT id FROM feed WHERE (read=0 AND updated<?) OR (read=1 AND updated<?);", (currentTime-2*expiry, currentTime-expiry))
-        for row in rows:
-           self.removeEntry(row[0])
+                   abs_url = urljoin(tmp["feed"]["link"],"/favicon.ico")
+                   f = opener.open(abs_url)
+                   data = f.read()
+                   f.close()
+                   outf = open(self.dir+"/favicon.ico", "w")
+                   outf.write(data)
+                   outf.close()
+                   del data
+               except (urllib2.HTTPError, urllib2.URLError), exception:
+                   print ("Could not download favicon %s: %s"
+                          % (abs_url, str (exception)))
+    
+               self.serial_execution_lock.acquire ()
+               have_serial_execution_lock = True
+
+               #reversedEntries = self.getEntries()
+               #reversedEntries.reverse()
+    
+               ids = self.getIds()
+    
+               tmp["entries"].reverse()
+               for entry in tmp["entries"]:
+                   # Yield so as to make the main thread a bit more
+                   # responsive.
+                   time.sleep(0)
+    
+                   if JobManager().do_quit:
+                       raise KeyboardInterrupt
+
+                   received_base = progress_handler.stats['received']
+                   sent_base = progress_handler.stats['sent']
+                   object_size = 0
+
+                   date = self.extractDate(entry)
+                   try:
+                       entry["title"]
+                   except KeyError:
+                       entry["title"] = "No Title"
+                   try :
+                       entry["link"]
+                   except KeyError:
+                       entry["link"] = ""
+                   try:
+                       entry["author"]
+                   except KeyError:
+                       entry["author"] = None
+                   if(not(entry.has_key("id"))):
+                       entry["id"] = None
+                   content = self.extractContent(entry)
+                   object_size = len (content)
+                   received_base -= len (content)
+                   tmpEntry = {"title":entry["title"], "content":content,
+                                "date":date, "link":entry["link"], "author":entry["author"], "id":entry["id"]}
+                   id = self.generateUniqueId(tmpEntry)
+                   
+                   #articleTime = time.mktime(self.entries[id]["dateTuple"])
+                   soup = BeautifulSoup(self.getArticle(tmpEntry)) #tmpEntry["content"])
+                   images = soup('img')
+                   baseurl = tmpEntry["link"]
+                   #if not id in ids:
+                   if imageCache and len(images) > 0:
+                       self.serial_execution_lock.release ()
+                       have_serial_execution_lock = False
+                       for img in images:
+                            filename = self.addImage(configdir, self.key, baseurl, img['src'], proxy=proxy)
+                            if filename:
+                                img['src']="file://%s" %filename
+                                count = self.db.execute("SELECT count(1) FROM images where id=? and imagePath=?;", (id, filename )).fetchone()[0]
+                                if count == 0:
+                                    self.db.execute("INSERT INTO images (id, imagePath) VALUES (?, ?);", (id, filename) )
+                                    self.db.commit()
+    
+                                try:
+                                    object_size += os.path.getsize (filename)
+                                except os.error, exception:
+                                    print ("Error getting size of %s: %s"
+                                           % (filename, exception))
+                                    pass
+                       self.serial_execution_lock.acquire ()
+                       have_serial_execution_lock = True
+    
+                   tmpEntry["contentLink"] = configdir+self.key+".d/"+id+".html"
+                   file = open(tmpEntry["contentLink"], "w")
+                   file.write(soup.prettify())
+                   file.close()
+                   if id in ids:
+                       self.db.execute("UPDATE feed SET updated=? WHERE id=?;", (currentTime, id) )
+                       self.db.commit()
+                   else:
+                       values = (id, tmpEntry["title"], tmpEntry["contentLink"], tmpEntry["date"], currentTime, tmpEntry["link"], 0)
+                       self.db.execute("INSERT INTO feed (id, title, contentLink, date, updated, link, read) VALUES (?, ?, ?, ?, ?, ?, ?);", values)
+                       self.db.commit()
+#                   else:
+#                       try:
+#                           self.db.execute("UPDATE feed SET updated=? WHERE id=?;", (currentTime, id) )
+#                           self.db.commit()
+#                           filename = configdir+self.key+".d/"+id+".html"
+#                           file = open(filename,"a")
+#                           utime(filename, None)
+#                           file.close()
+#                           images = self.db.execute("SELECT imagePath FROM images where id=?;", (id, )).fetchall()
+#                           for image in images:
+#                                file = open(image[0],"a")
+#                                utime(image[0], None)
+#                                file.close()
+#                       except:
+#                           pass
+    
         
-        from glob import glob
-        from os import stat
-        for file in glob(configdir+self.key+".d/*"):
-            #
-            stats = stat(file)
-            #
-            # put the two dates into matching format
-            #
-            lastmodDate = stats[8]
-            #
-            expDate = time.time()-expiry*3
-            # check if image-last-modified-date is outdated
-            #
-            if expDate > lastmodDate:
+               self.db.commit()
+
+               success = True
+
+            rows = self.db.execute("SELECT id FROM feed WHERE (read=0 AND updated<?) OR (read=1 AND updated<?);", (currentTime-2*expiry, currentTime-expiry))
+            for row in rows:
+               self.removeEntry(row[0])
+            
+            from glob import glob
+            from os import stat
+            for file in glob(configdir+self.key+".d/*"):
                 #
-                try:
-                    #
-                    #print 'Removing', file
-                    #
-                    remove(file) # commented out for testing
-                    #
-                except OSError:
+                stats = stat(file)
+                #
+                # put the two dates into matching format
+                #
+                lastmodDate = stats[8]
+                #
+                expDate = time.time()-expiry*3
+                # check if image-last-modified-date is outdated
+                #
+                if expDate > lastmodDate:
                     #
-                    print 'Could not remove', file
-        updateTime = 0
-        rows = self.db.execute("SELECT MAX(date) FROM feed;")
-        for row in rows:
-            updateTime=row[0]
-        return (updateTime, etag, modified)
-    
+                    try:
+                        #
+                        #print 'Removing', file
+                        #
+                        remove(file) # commented out for testing
+                        #
+                    except OSError, exception:
+                        #
+                        print 'Could not remove %s: %s' % (file, str (exception))
+            print ("updated %s: %fs in download, %fs in processing"
+                   % (self.key, download_duration,
+                      time.time () - process_start))
+        finally:
+            self.db.commit ()
+
+            if have_serial_execution_lock:
+                self.serial_execution_lock.release ()
+
+            if update_lock is not None:
+                release_lock (update_lock)
+
+            updateTime = 0
+            try:
+                rows = self.db.execute("SELECT MAX(date) FROM feed;")
+                for row in rows:
+                    updateTime=row[0]
+            except:
+                print "Fetching update time."
+                import traceback
+                traceback.print_exc()
+            finally:
+                if not success:
+                    etag = None
+                    modified = None
+                if postFeedUpdateFunc is not None:
+                    postFeedUpdateFunc (self.key, updateTime, etag, modified,
+                                        *postFeedUpdateFuncArgs)
+
     def setEntryRead(self, id):
         self.db.execute("UPDATE feed SET read=1 WHERE id=?;", (id,) )
         self.db.commit()
@@ -382,9 +525,10 @@ class ArchivedArticles(Feed):
             images = soup('img')
             baseurl = link
             for img in images:
-                filename = self.addImage(configdir, self.key, baseurl, img['src'])
+                filename = self.addImage(configdir, self.key, baseurl, img['src'], proxy=proxy)
                 img['src']=filename
                 self.db.execute("INSERT INTO images (id, imagePath) VALUES (?, ?);", (id, filename) )
+                self.db.commit()
             contentLink = configdir+self.key+".d/"+id+".html"
             file = open(contentLink, "w")
             file.write(soup.prettify())
@@ -422,7 +566,8 @@ class Listing:
     db = property(_getdb)
 
     # Lists all the feeds in a dictionary, and expose the data
-    def __init__(self, configdir):
+    def __init__(self, config, configdir):
+        self.config = config
         self.configdir = configdir
 
         self.tls = threading.local ()
@@ -448,7 +593,7 @@ class Listing:
                     self.addCategory("Default Category")
                     self.db.execute("ALTER TABLE feeds ADD COLUMN category int;")
                     self.db.execute("UPDATE feeds SET category=1;")
-                    self.db.commit()
+            self.db.commit()
         except:
             pass
 
@@ -507,14 +652,34 @@ class Listing:
         archFeed.addArchivedArticle(title, link, date, self.configdir)
         self.updateUnread("ArchivedArticles")
         
-    def updateFeed(self, key, expiryTime=24, proxy=None, imageCache=False):
+    def updateFeed(self, key, expiryTime=None, proxy=None, imageCache=None,
+                   priority=0):
+        if expiryTime is None:
+            expiryTime = self.config.getExpiry()
+        if not expiryTime:
+            # Default to 24 hours
+            expriyTime = 24
+        if proxy is None:
+            (use_proxy, proxy) = self.config.getProxy()
+            if not use_proxy:
+                proxy = None
+        if imageCache is None:
+            imageCache = self.config.getImageCache()
+
         feed = self.getFeed(key)
         (url, etag, modified) = self.db.execute("SELECT url, etag, modified FROM feeds WHERE id=?;", (key,) ).fetchone()
         try:
             modified = time.struct_time(eval(modified))
         except:
             modified = None
-        (updateTime, etag, modified) = feed.updateFeed(self.configdir, url, etag, modified, expiryTime, proxy, imageCache)
+        feed.updateFeed(
+            self.configdir, url, etag, modified, expiryTime, proxy, imageCache,
+            priority, postFeedUpdateFunc=self._queuePostFeedUpdate)
+
+    def _queuePostFeedUpdate(self, *args, **kwargs):
+        mainthread.execute (self._postFeedUpdate, async=True, *args, **kwargs)
+
+    def _postFeedUpdate(self, key, updateTime, etag, modified):
         if modified==None:
             modified="None"
         else:
index 5122a5b..9516594 100644 (file)
@@ -2,6 +2,7 @@
 
 # 
 # Copyright (c) 2007-2008 INdT.
+# Copyright (c) 2011 Neal H. Walfield
 # This program is free software: you can redistribute it and/or modify
 # it under the terms of the GNU Lesser General Public License as published by
 # the Free Software Foundation, either version 3 of the License, or
 
 from rss_sqlite import Listing
 from config import Config
+from updatedbus import get_lock, UpdateServerObject
 
-import threading
 import os
 import gobject
+import traceback
+
+from jobmanager import JobManager
+import mainthread
 
 CONFIGDIR="/home/user/.feedingit/"
 #DESKTOP_FILE = "/usr/share/applications/hildon-status-menu/feedingit_status.desktop"
@@ -39,100 +44,59 @@ timeout = 5
 setdefaulttimeout(timeout)
 del timeout
 
-from updatedbus import UpdateServerObject, get_lock
-
-class Download(threading.Thread):
-    def __init__(self, config, dbusHandler):
-        global dbug
-        threading.Thread.__init__(self)
-        self.running = True
-        self.config = config
-        self.dbusHandler = dbusHandler
-        if dbug:
-            self.dbug = open(CONFIGDIR+"dbug.log", "w")
-        
-    def run(self):
-        global dbug
-        if dbug:
-            self.dbug.write("Starting updates\n")
-        try:
-            self.dbusHandler.UpdateStarted()
-            (use_proxy, proxy) = self.config.getProxy()
-            listing = Listing(CONFIGDIR)
-            for key in listing.getListOfFeeds():
-                if dbug:
-                    self.dbug.write("updating %s\n" %key)
-                try:
-                    if use_proxy:
-                        from urllib2 import install_opener, build_opener
-                        install_opener(build_opener(proxy))
-                        listing.updateFeed(key, self.config.getExpiry(), proxy=proxy, imageCache=self.config.getImageCache() )
-                    else:
-                        listing.updateFeed(key, self.config.getExpiry(), imageCache=self.config.getImageCache() )
-                except:
-                    import traceback
-                    file = open("/home/user/.feedingit/feedingit_update.log", "a")
-                    traceback.print_exc(file=file)
-                    file.close()
-                if not self.running:
-                    if dbug:
-                        self.dbug.write("received stopUpdate after %s\n" %key)
-                    break
-            self.dbusHandler.UpdateFinished()
-            self.dbusHandler.ArticleCountUpdated()
-            if dbug:
-                self.dbug.write("Dbus ArticleCountUpdated signal sent\n")
-        except:
-            import traceback
-            file = open("/home/user/.feedingit/feedingit_update.log", "a")
-            traceback.print_exc(file=file)
-            file.close()
-            #pass
-        if dbug:
-            self.dbug.write("About to main_quit\n")
-        mainloop.quit()
-        if dbug:
-            self.dbug.write("After main_quit\n")
-            self.dbug.close()
+from updatedbus import UpdateServerObject
+
+debug_file = None
+def debug(*args):
+    global debug_file
+    if not debug_file:
+        debug_file = open("/home/user/.feedingit/feedingit_update.log", "a")
+
+    debug_file.write (*args)
 
 class FeedUpdate():
     def __init__(self):
         self.config = Config(self, CONFIGDIR+"config.ini")
         self.dbusHandler = UpdateServerObject(self)
-        self.updateThread = False
-        
-    def automaticUpdate(self):
-        #self.listing.updateFeeds()
-        if self.updateThread == False:
-            self.updateThread = Download(self.config, self.dbusHandler)
-            self.updateThread.start()
+        self.listing = Listing(self.config, CONFIGDIR)
+        self.done = False
+
+        jm = JobManager(True)
+        jm.stats_hook_register (self.job_manager_update,
+                                run_in_main_thread=True)
+
+        self.dbusHandler.UpdateStarted()
+        for k in self.listing.getListOfFeeds():
+            self.listing.updateFeed (k)
         
-    def stopUpdate(self):
-        try:
-            self.updateThread.running = False
-        except:
+    def job_manager_update(self, jm, old_stats, new_stats, updated_feed):
+        if not new_stats['jobs-queued'] and not new_stats['jobs-in-progress']:
+            self.dbusHandler.UpdateFinished()
+            self.dbusHandler.ArticleCountUpdated()
+            self.done = True
             mainloop.quit()
 
+    def stopUpdate(self):
+        print "Stop update called."
+        JobManager().quit()
+
 import dbus.mainloop.glib
 dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
 
 gobject.threads_init()
+mainthread.init()
 mainloop = gobject.MainLoop()
 
 app_lock = get_lock("app_lock")
 
 if app_lock != None:
-    try:
-        feed = FeedUpdate()
-        mainloop.run()
-        del app_lock
-    except:
-        import traceback
-        file = open("/home/user/.feedingit/feedingit_update.log", "w")
-        traceback.print_exc(file=file)
-        file.close()
+    feed = FeedUpdate()
+    while not feed.done:
+        try:
+            mainloop.run()
+        except KeyboardInterrupt:
+            print "Interrupted.  Quitting."
+            JobManager().quit()
+    del app_lock
 else:
-    file = open("/home/user/.feedingit/feedingit_update.log", "a")
-    file.write("Update in progress")
-    file.close()
-    
+    debug("Update in progress")
index e712c7e..8e110ae 100644 (file)
 
 import dbus
 import dbus.service
+import mainthread
 
+@mainthread.mainthread
 def get_lock(key):
     try:
         bus_name = dbus.service.BusName('org.marcoz.feedingit.lock_%s' %key,bus=dbus.SessionBus(), do_not_queue=True)
     except:
         bus_name = None
     return bus_name
-    
+
+def release_lock(lock):
+    assert lock is not None
+    mainthread.execute(lock.__del__, async=True)
 
 class UpdateServerObject(dbus.service.Object):
     def __init__(self, app):