Move download management from frontends to rss_sqlite.py.
authorNeal H. Walfield <neal@walfield.org>
Sat, 23 Jul 2011 08:38:18 +0000 (10:38 +0200)
committerNeal H. Walfield <neal@gnu.org>
Thu, 28 Jul 2011 21:28:07 +0000 (23:28 +0200)
 - Move download logic to rss_sqlite.py:
  - Move thread management to jobmanager.py.
  - Queue jobs in rss_sqlite.py.
  - Manage per-feed DBus locks in rss_sqlite.py.
  - Get config settings in rss_sqlite.py on demand.

 - Make downloading more robust:
  - Only call non-thread-safe code (in particular, DBus and gconf
    functionality) from the main thread using mainthread.py.
  - Improve responsiveness of the frontend but yielding during CPU
    intense activities and only allowing a single slave to parse
    a feed at a time.
  - When downloads are canceled, do our best to quit cleanly and
    quickly.

 - Update frontends to use the new functionality.
  - Remove redundant code, in particular, download functionality.
  - Rework FeedingIt.py's download bar.

src/FeedingIt-Web.py
src/FeedingIt.py
src/config.py
src/httpprogresshandler.py [new file with mode: 0644]
src/jobmanager.py [new file with mode: 0644]
src/mainthread.py [new file with mode: 0644]
src/rss_sqlite.py
src/update_feeds.py
src/updatedbus.py

index 7f6b2a9..4fc6965 100644 (file)
@@ -44,7 +44,7 @@ def sanitize(text):
 
 def start_server():
     global listing
 
 def start_server():
     global listing
-    listing = Listing(CONFIGDIR)
+    listing = Listing(config, CONFIGDIR)
     httpd = BaseHTTPServer.HTTPServer(("127.0.0.1", PORT), Handler)
     httpd.serve_forever()
 
     httpd = BaseHTTPServer.HTTPServer(("127.0.0.1", PORT), Handler)
     httpd.serve_forever()
 
@@ -69,37 +69,11 @@ class App():
 #        download = Download(listing, feeds)
 #        download.start()
     
 #        download = Download(listing, feeds)
 #        download.start()
     
-class Download(Thread):
-    def __init__(self, listing, keys):
-        Thread.__init__(self)
-        self.listing = listing
-        self.keys = keys
-        
-    def run (self):
-        updateDbusHandler.UpdateStarted()
-        for key in self.keys:
-            print "Start update: %s" % key
-            updatingFeeds.append(key)
-            (use_proxy, proxy) = config.getProxy()
-            try:
-                if use_proxy:
-                        self.listing.updateFeed(key, proxy=proxy, imageCache=config.getImageCache() )
-                else:
-                        self.listing.updateFeed(key, imageCache=config.getImageCache() )
-            except:
-                print "Error updating feed: %s" %key
-            updatingFeeds.remove(key)
-            print "End update: %s" % key
-        updateDbusHandler.UpdateFinished()
-
 class Handler(BaseHTTPServer.BaseHTTPRequestHandler):
     def updateAll(self):
 class Handler(BaseHTTPServer.BaseHTTPRequestHandler):
     def updateAll(self):
-        feeds = []
         for cat in listing.getListOfCategories():
             for feed in listing.getSortedListOfKeys("Manual", category=cat):
         for cat in listing.getListOfCategories():
             for feed in listing.getSortedListOfKeys("Manual", category=cat):
-                feeds.append(feed)
-        download = Download(listing, feeds)
-        download.start()
+                listing.updateFeed(feed)
     
     def openTaskSwitch(self):
         import subprocess
     
     def openTaskSwitch(self):
         import subprocess
@@ -304,9 +278,6 @@ import thread
 
 
 
 
 
 
-# Start the HTTP server in a new thread
-thread.start_new_thread(start_server, ())
-
 # Initialize the glib mainloop, for dbus purposes
 from feedingitdbus import ServerObject
 from updatedbus import UpdateServerObject, get_lock
 # Initialize the glib mainloop, for dbus purposes
 from feedingitdbus import ServerObject
 from updatedbus import UpdateServerObject, get_lock
@@ -315,11 +286,18 @@ import gobject
 gobject.threads_init()
 import dbus.mainloop.glib
 dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
 gobject.threads_init()
 import dbus.mainloop.glib
 dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
+import mainthread
+mainthread.init()
+from jobmanager import JobManager
+JobManager(True)
 
 global updateDbusHandler, dbusHandler
 app = App()
 dbusHandler = ServerObject(app)
 updateDbusHandler = UpdateServerObject(app)
 
 
 global updateDbusHandler, dbusHandler
 app = App()
 dbusHandler = ServerObject(app)
 updateDbusHandler = UpdateServerObject(app)
 
+# Start the HTTP server in a new thread
+thread.start_new_thread(start_server, ())
+
 mainloop = gobject.MainLoop()
 mainloop.run()
 mainloop = gobject.MainLoop()
 mainloop.run()
index 9ba9469..f7a45b3 100644 (file)
@@ -2,6 +2,7 @@
 
 # 
 # Copyright (c) 2007-2008 INdT.
 
 # 
 # Copyright (c) 2007-2008 INdT.
+# Copyright (c) 2011 Neal H. Walfield
 # This program is free software: you can redistribute it and/or modify
 # it under the terms of the GNU Lesser General Public License as published by
 # the Free Software Foundation, either version 3 of the License, or
 # This program is free software: you can redistribute it and/or modify
 # it under the terms of the GNU Lesser General Public License as published by
 # the Free Software Foundation, either version 3 of the License, or
@@ -44,11 +45,13 @@ from feedingitdbus import ServerObject
 from updatedbus import UpdateServerObject, get_lock
 from config import Config
 from cgi import escape
 from updatedbus import UpdateServerObject, get_lock
 from config import Config
 from cgi import escape
+import weakref
 
 from rss_sqlite import Listing
 from opml import GetOpmlData, ExportOpmlData
 
 
 from rss_sqlite import Listing
 from opml import GetOpmlData, ExportOpmlData
 
-from urllib2 import install_opener, build_opener
+import mainthread
+from jobmanager import JobManager
 
 from socket import setdefaulttimeout
 timeout = 5
 
 from socket import setdefaulttimeout
 timeout = 5
@@ -270,90 +273,95 @@ class AddCategoryWizard(gtk.Dialog):
     def getData(self):
         return self.nameEntry.get_text()
         
     def getData(self):
         return self.nameEntry.get_text()
         
-class Download(Thread):
-    def __init__(self, listing, key, config):
-        Thread.__init__(self)
-        self.listing = listing
-        self.key = key
-        self.config = config
-        
-    def run (self):
-        (use_proxy, proxy) = self.config.getProxy()
-        key_lock = get_lock(self.key)
-        if key_lock != None:
-            if use_proxy:
-                self.listing.updateFeed(self.key, self.config.getExpiry(), proxy=proxy, imageCache=self.config.getImageCache() )
-            else:
-                self.listing.updateFeed(self.key, self.config.getExpiry(), imageCache=self.config.getImageCache() )
-        del key_lock
-
-        
 class DownloadBar(gtk.ProgressBar):
 class DownloadBar(gtk.ProgressBar):
-    def __init__(self, parent, listing, listOfKeys, config, single=False):
-        
-        update_lock = get_lock("update_lock")
-        if update_lock != None:
-            gtk.ProgressBar.__init__(self)
-            self.listOfKeys = listOfKeys[:]
-            self.listing = listing
-            self.total = len(self.listOfKeys)
-            self.config = config
-            self.current = 0
-            self.single = single
-            (use_proxy, proxy) = self.config.getProxy()
-            if use_proxy:
-                opener = build_opener(proxy)
-            else:
-                opener = build_opener()
-
-            opener.addheaders = [('User-agent', USER_AGENT)]
-            install_opener(opener)
-
-            if self.total>0:
-                # In preparation for i18n/l10n
-                def N_(a, b, n):
-                    return (a if n == 1 else b)
-
-                self.set_text(N_('Updating %d feed', 'Updating %d feeds', self.total) % self.total)
-
-                self.fraction = 0
-                self.set_fraction(self.fraction)
-                self.show_all()
-                # Create a timeout
-                self.timeout_handler_id = gobject.timeout_add(50, self.update_progress_bar)
-
-    def update_progress_bar(self):
-        #self.progress_bar.pulse()
-        if activeCount() < 4:
-            x = activeCount() - 1
-            k = len(self.listOfKeys)
-            fin = self.total - k - x
-            fraction = float(fin)/float(self.total) + float(x)/(self.total*2.)
-            #print x, k, fin, fraction
-            self.set_fraction(fraction)
-
-            if len(self.listOfKeys)>0:
-                self.current = self.current+1
-                key = self.listOfKeys.pop()
-                #if self.single == True:
-                    # Check if the feed is being displayed
-                download = Download(self.listing, key, self.config)
-                download.start()
-                return True
-            elif activeCount() > 1:
-                return True
+    @classmethod
+    def class_init(cls):
+        if hasattr (cls, 'class_init_done'):
+            return
+
+        jm = JobManager ()
+        jm.stats_hook_register (cls.update_progress,
+                                run_in_main_thread=True)
+
+        cls.downloadbars = []
+        # Total number of jobs we are monitoring.
+        cls.total = 0
+        # Number of jobs complete (of those that we are monitoring).
+        cls.done = 0
+        # Percent complete.
+        cls.progress = 0
+
+        cls.class_init_done = True
+
+    def __init__(self, parent):
+        self.class_init ()
+
+        gtk.ProgressBar.__init__(self)
+
+        self.downloadbars.append(weakref.ref (self))
+        self.set_fraction(0)
+        self.__class__.update_bars()
+        self.show_all()
+
+    @classmethod
+    def downloading(cls):
+        return hasattr (cls, 'jobs_at_start')
+
+    @classmethod
+    def update_progress(cls, jm, old_stats, new_stats, updated_feed):
+        if not cls.downloading():
+            cls.jobs_at_start = old_stats['jobs-completed']
+
+        if not cls.downloadbars:
+            return
+
+        if new_stats['jobs-in-progress'] + new_stats['jobs-queued'] == 0:
+            del cls.jobs_at_start
+            for ref in cls.downloadbars:
+                bar = ref ()
+                if bar is None:
+                    # The download bar disappeared.
+                    cls.downloadbars.remove (ref)
+                else:
+                    bar.emit("download-done", None)
+            return
+
+        # This should never be called if new_stats['jobs'] is 0, but
+        # just in case...
+        cls.total = max (1, new_stats['jobs'] - cls.jobs_at_start)
+        cls.done = new_stats['jobs-completed'] - cls.jobs_at_start
+        cls.progress = 1 - (new_stats['jobs-in-progress'] / 2.
+                            + new_stats['jobs-queued']) / cls.total
+        cls.update_bars()
+
+        if updated_feed:
+            for ref in cls.downloadbars:
+                bar = ref ()
+                if bar is None:
+                    # The download bar disappeared.
+                    cls.downloadbars.remove (ref)
+                else:
+                    bar.emit("download-done", updated_feed)
+
+    @classmethod
+    def update_bars(cls):
+        # In preparation for i18n/l10n
+        def N_(a, b, n):
+            return (a if n == 1 else b)
+
+        text = (N_('Updated %d of %d feeds ', 'Updated %d of %d feeds',
+                   cls.total)
+                % (cls.done, cls.total))
+
+        for ref in cls.downloadbars:
+            bar = ref ()
+            if bar is None:
+                # The download bar disappeared.
+                cls.downloadbars.remove (ref)
             else:
             else:
-                #self.waitingWindow.destroy()
-                #self.destroy()
-                try:
-                    del self.update_lock
-                except:
-                    pass
-                self.emit("download-done", "success")
-                return False 
-        return True
-    
-    
+                bar.set_text(text)
+                bar.set_fraction(cls.progress)
+
 class SortList(hildon.StackableWindow):
     def __init__(self, parent, listing, feedingit, after_closing, category=None):
         hildon.StackableWindow.__init__(self)
 class SortList(hildon.StackableWindow):
     def __init__(self, parent, listing, feedingit, after_closing, category=None):
         hildon.StackableWindow.__init__(self)
@@ -741,7 +749,14 @@ class DisplayFeed(hildon.StackableWindow):
         self.set_app_menu(menu)
         menu.show_all()
         
         self.set_app_menu(menu)
         menu.show_all()
         
+        self.main_vbox = gtk.VBox(False, 0)
+        self.add(self.main_vbox)
+
+        self.pannableFeed = None
         self.displayFeed()
         self.displayFeed()
+
+        if DownloadBar.downloading ():
+            self.show_download_bar ()
         
         self.connect('configure-event', self.on_configure_event)
         self.connect("destroy", self.destroyWindow)
         
         self.connect('configure-event', self.on_configure_event)
         self.connect("destroy", self.destroyWindow)
@@ -771,6 +786,9 @@ class DisplayFeed(hildon.StackableWindow):
         return escape(unescape(title).replace("<em>","").replace("</em>","").replace("<nobr>","").replace("</nobr>","").replace("<wbr>",""))
 
     def displayFeed(self):
         return escape(unescape(title).replace("<em>","").replace("</em>","").replace("<nobr>","").replace("</nobr>","").replace("<wbr>",""))
 
     def displayFeed(self):
+        if self.pannableFeed:
+            self.pannableFeed.destroy()
+
         self.pannableFeed = hildon.PannableArea()
 
         self.pannableFeed.set_property('hscrollbar-policy', gtk.POLICY_NEVER)
         self.pannableFeed = hildon.PannableArea()
 
         self.pannableFeed.set_property('hscrollbar-policy', gtk.POLICY_NEVER)
@@ -842,7 +860,7 @@ class DisplayFeed(hildon.StackableWindow):
             markup = ENTRY_TEMPLATE % (self.config.getFontSize(), "No Articles To Display")
             self.feedItems.append((markup, ""))
 
             markup = ENTRY_TEMPLATE % (self.config.getFontSize(), "No Articles To Display")
             self.feedItems.append((markup, ""))
 
-        self.add(self.pannableFeed)
+        self.main_vbox.pack_start(self.pannableFeed)
         self.show_all()
 
     def clear(self):
         self.show_all()
 
     def clear(self):
@@ -935,22 +953,24 @@ class DisplayFeed(hildon.StackableWindow):
         self.displayFeed()
 
     def button_update_clicked(self, button):
         self.displayFeed()
 
     def button_update_clicked(self, button):
-        #bar = DownloadBar(self, self.listing, [self.key,], self.config ) 
+        self.listing.updateFeed (self.key, priority=-1)
+            
+    def show_download_bar(self):
         if not type(self.downloadDialog).__name__=="DownloadBar":
         if not type(self.downloadDialog).__name__=="DownloadBar":
-            self.pannableFeed.destroy()
-            self.vbox = gtk.VBox(False, 10)
-            self.downloadDialog = DownloadBar(self.window, self.listing, [self.key,], self.config, single=True )
-            self.downloadDialog.connect("download-done", self.onDownloadsDone)
-            self.vbox.pack_start(self.downloadDialog, expand=False, fill=False)
-            self.add(self.vbox)
+            self.downloadDialog = DownloadBar(self.window)
+            self.downloadDialog.connect("download-done", self.onDownloadDone)
+            self.main_vbox.pack_end(self.downloadDialog,
+                                    expand=False, fill=False)
             self.show_all()
             self.show_all()
-            
-    def onDownloadsDone(self, *widget):
-        self.vbox.destroy()
-        self.feed = self.listing.getFeed(self.key)
-        self.displayFeed()
-        self.updateDbusHandler.ArticleCountUpdated()
         
         
+    def onDownloadDone(self, widget, feed):
+        if feed == self.feed or feed is None:
+            self.downloadDialog.destroy()
+            self.downloadDialog = False
+            self.feed = self.listing.getFeed(self.key)
+            self.displayFeed()
+            self.updateDbusHandler.ArticleCountUpdated()
+
     def buttonReadAllClicked(self, button):
         #self.clear()
         self.feed.markAllAsRead()
     def buttonReadAllClicked(self, button):
         #self.clear()
         self.feed.markAllAsRead()
@@ -1007,8 +1027,8 @@ class FeedingIt:
             self.stopButton.destroy()
         except:
             pass
             self.stopButton.destroy()
         except:
             pass
-        self.listing = Listing(CONFIGDIR)
-        
+        self.listing = Listing(self.config, CONFIGDIR)
+
         self.downloadDialog = False
         try:
             self.orientation = FremantleRotation(__appname__, main_window=self.window, app=self)
         self.downloadDialog = False
         try:
             self.orientation = FremantleRotation(__appname__, main_window=self.window, app=self)
@@ -1080,8 +1100,29 @@ class FeedingIt:
         self.checkAutoUpdate()
         
         hildon.hildon_gtk_window_set_progress_indicator(self.window, 0)
         self.checkAutoUpdate()
         
         hildon.hildon_gtk_window_set_progress_indicator(self.window, 0)
-        gobject.idle_add(self.enableDbus)
+        gobject.idle_add(self.late_init)
         
         
+    def job_manager_update(self, jm, old_stats, new_stats, updated_feed):
+        if (not self.downloadDialog
+            and new_stats['jobs-in-progress'] + new_stats['jobs-queued'] > 0):
+            self.updateDbusHandler.UpdateStarted()
+
+            self.downloadDialog = DownloadBar(self.window)
+            self.downloadDialog.connect("download-done", self.onDownloadDone)
+            self.mainVbox.pack_end(self.downloadDialog, expand=False, fill=False)
+            self.mainVbox.show_all()
+
+            if self.__dict__.get ('disp', None):
+                self.disp.show_download_bar ()
+
+    def onDownloadDone(self, widget, feed):
+        if feed is None:
+            self.downloadDialog.destroy()
+            self.downloadDialog = False
+            self.displayListing()
+            self.updateDbusHandler.UpdateFinished()
+            self.updateDbusHandler.ArticleCountUpdated()
+
     def stop_running_update(self, button):
         self.stopButton.set_sensitive(False)
         import dbus
     def stop_running_update(self, button):
         self.stopButton.set_sensitive(False)
         import dbus
@@ -1092,10 +1133,15 @@ class FeedingIt:
         iface = dbus.Interface(remote_object, 'org.marcoz.feedingit')
         iface.StopUpdate()
     
         iface = dbus.Interface(remote_object, 'org.marcoz.feedingit')
         iface.StopUpdate()
     
-    def enableDbus(self):
+    def late_init(self):
         self.dbusHandler = ServerObject(self)
         self.updateDbusHandler = UpdateServerObject(self)
 
         self.dbusHandler = ServerObject(self)
         self.updateDbusHandler = UpdateServerObject(self)
 
+        jm = JobManager()
+        jm.stats_hook_register (self.job_manager_update,
+                                run_in_main_thread=True)
+        JobManager(True)
+
     def button_markAll(self, button):
         for key in self.listing.getListOfFeeds():
             feed = self.listing.getFeed(key)
     def button_markAll(self, button):
         for key in self.listing.getListOfFeeds():
             feed = self.listing.getFeed(key)
@@ -1142,12 +1188,8 @@ class FeedingIt:
         SortList(self.window, self.listing, self, after_closing)
 
     def button_update_clicked(self, button, key):
         SortList(self.window, self.listing, self, after_closing)
 
     def button_update_clicked(self, button, key):
-        if not type(self.downloadDialog).__name__=="DownloadBar":
-            self.updateDbusHandler.UpdateStarted()
-            self.downloadDialog = DownloadBar(self.window, self.listing, self.listing.getListOfFeeds(), self.config )
-            self.downloadDialog.connect("download-done", self.onDownloadsDone)
-            self.mainVbox.pack_end(self.downloadDialog, expand=False, fill=False)
-            self.mainVbox.show_all()
+        for k in self.listing.getListOfFeeds():
+            self.listing.updateFeed (k)
         #self.displayListing()
 
     def onDownloadsDone(self, *widget):
         #self.displayListing()
 
     def onDownloadsDone(self, *widget):
@@ -1309,11 +1351,28 @@ class FeedingIt:
     def onFeedClosedTimeout(self):
         del self.feed_lock
         self.updateDbusHandler.ArticleCountUpdated()
     def onFeedClosedTimeout(self):
         del self.feed_lock
         self.updateDbusHandler.ArticleCountUpdated()
-     
+
+    def quit(self, *args):
+        self.window.hide()
+
+        if hasattr (self, 'app_lock'):
+            del self.app_lock
+
+        # Wait until all slave threads have properly exited before
+        # terminating the mainloop.
+        jm = JobManager()
+        jm.quit ()
+        stats = jm.stats()
+        if stats['jobs-in-progress'] == 0 and stats['jobs-queued'] == 0:
+            gtk.main_quit ()
+        else:
+            gobject.timeout_add(500, self.quit)
+
+        return False
+
     def run(self):
     def run(self):
-        self.window.connect("destroy", gtk.main_quit)
+        self.window.connect("destroy", self.quit)
         gtk.main()
         gtk.main()
-        del self.app_lock
 
     def prefsClosed(self, *widget):
         try:
 
     def prefsClosed(self, *widget):
         try:
@@ -1353,7 +1412,7 @@ class FeedingIt:
     def stopUpdate(self):
         # Not implemented in the app (see update_feeds.py)
         try:
     def stopUpdate(self):
         # Not implemented in the app (see update_feeds.py)
         try:
-            self.downloadDialog.listOfKeys = []
+            JobManager().cancel ()
         except:
             pass
     
         except:
             pass
     
@@ -1367,6 +1426,8 @@ class FeedingIt:
         return status
 
 if __name__ == "__main__":
         return status
 
 if __name__ == "__main__":
+    mainthread.init ()
+
     gobject.signal_new("feed-closed", DisplayFeed, gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, (gobject.TYPE_PYOBJECT,))
     gobject.signal_new("article-closed", DisplayArticle, gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, (gobject.TYPE_PYOBJECT,))
     gobject.signal_new("article-deleted", DisplayArticle, gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, (gobject.TYPE_PYOBJECT,))
     gobject.signal_new("feed-closed", DisplayFeed, gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, (gobject.TYPE_PYOBJECT,))
     gobject.signal_new("article-closed", DisplayArticle, gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, (gobject.TYPE_PYOBJECT,))
     gobject.signal_new("article-deleted", DisplayArticle, gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, (gobject.TYPE_PYOBJECT,))
index 8aa45ce..65fbddd 100644 (file)
@@ -32,6 +32,7 @@
 from ConfigParser import RawConfigParser
 from gconf import client_get_default
 from urllib2 import ProxyHandler
 from ConfigParser import RawConfigParser
 from gconf import client_get_default
 from urllib2 import ProxyHandler
+from mainthread import mainthread
 
 VERSION = "52"
 
 
 VERSION = "52"
 
@@ -280,6 +281,7 @@ class Config():
         return ranges["orientation"].index(self.config["orientation"])
     def getImageCache(self):
         return self.config["imageCache"]
         return ranges["orientation"].index(self.config["orientation"])
     def getImageCache(self):
         return self.config["imageCache"]
+    @mainthread
     def getProxy(self):
         if self.config["proxy"] == False:
             return (False, None)
     def getProxy(self):
         if self.config["proxy"] == False:
             return (False, None)
diff --git a/src/httpprogresshandler.py b/src/httpprogresshandler.py
new file mode 100644 (file)
index 0000000..78eb39c
--- /dev/null
@@ -0,0 +1,143 @@
+#!/usr/bin/env python2.5
+
+# Copyright (c) 2011 Neal H. Walfield <neal@walfield.org>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+import urllib2 
+import httplib
+import time
+
+class ProgressSocket(object):
+    """
+    Monitor what is being sent and received.
+    """
+    def __init__(self, socket, connection):
+        self.socket = socket
+        self.connection = connection
+
+    def __getattribute__(self, attr):
+        # print "%s.__getattribute__(%s)" % (self.__class__.__name__, attr)
+
+        def send(data):
+            # 100k at a time.
+            bs = 100 * 1024
+            sent = 0
+            while sent < len (data):
+                remaining = len (data) - sent
+                if remaining < bs:
+                    amount = remaining
+                else:
+                    amount = bs
+        
+                self.socket.sendall(data[sent:sent+amount])
+                sent += amount
+                self.connection.stats['sent'] += amount
+                self.connection.opener.stats['sent'] += amount
+        
+                if self.connection.callback is not None:
+                    self.connection.callback ()
+        
+        def read(*args, **kwargs):
+            data = self.socket.read (*args, **kwargs)
+            # print "GOT: %s" % (data[0:240],)
+            self.connection.stats['received'] += len (data)
+            self.connection.opener.stats['received'] += len (data)
+            if self.connection.callback is not None:
+                self.connection.callback ()
+            return data
+
+        if attr == 'send' or attr == 'sendall':
+            return send
+        if attr == 'read':
+            return read
+
+        try:
+            return super (ProgressSocket, self).__getattribute__(attr)
+        except AttributeError:
+            socket = super (ProgressSocket, self).__getattribute__('socket')
+            return socket.__getattribute__(attr)
+
+    def makefile(self, mode, bufsize):
+        return ProgressSocket (socket=self.socket.makefile(mode, bufsize),
+                               connection=self.connection)
+
+    def close(self):
+        return self.socket.close ()
+
+def HTTPProgressConnectionBuilder(callback, opener):
+    class HTTPProgressConnection(httplib.HTTPConnection):
+        def __init__(self, *args, **kwargs):
+            self.method = None
+            self.url = None
+            return httplib.HTTPConnection.__init__ (self, *args, **kwargs)
+
+        def putrequest(self, method, url, *args, **kwargs):
+            self.method = method
+            self.url = url
+            return httplib.HTTPConnection.putrequest (
+                self, method, url, *args, **kwargs)
+
+        def connect(self):
+            httplib.HTTPConnection.connect(self)
+            # Wrap the socket.
+            self.sock = ProgressSocket(socket=self.sock,
+                                       connection=self)
+
+    HTTPProgressConnection.callback = callback
+    HTTPProgressConnection.opener = opener
+    HTTPProgressConnection.stats \
+        = {'sent': 0, 'received': 0, 'started':time.time()}
+    return HTTPProgressConnection
+
+class HTTPProgressHandler(urllib2.HTTPHandler):
+    def __init__(self, callback):
+        self.callback = callback
+        self.stats = {'sent': 0, 'received': 0, 'started':time.time()}
+        return urllib2.HTTPHandler.__init__(self)
+
+    def http_open(self, request):
+        return self.do_open(
+            HTTPProgressConnectionBuilder(self.callback, self),
+            request)
+
+if __name__ == '__main__':
+    def callback(connection):
+        req = ""
+        if connection.method:
+            req += connection.method + " "
+        req += connection.host + ':' + str (connection.port)
+        if connection.url:
+            req += connection.url
+
+        cstats = connection.stats
+        ostats = connection.opener.stats
+
+        print (("%s: connection: %d sent, %d received: %d kb/s; "
+                + "opener: %d sent, %d received, %d kb/s")
+               % (req,
+                  cstats['sent'], cstats['received'],
+                  ((cstats['sent'] + cstats['received'])
+                   / (time.time() - cstats['started']) / 1024),
+                  ostats['sent'], ostats['received'],
+                  ((ostats['sent'] + ostats['received'])
+                   / (time.time() - ostats['started']) / 1024)))
+
+    opener = urllib2.build_opener(HTTPProgressHandler(callback))
+
+    data = opener.open ('http://google.com')
+    downloaded = 0
+    for d in data:
+        downloaded += len (d)
+    print "Document is %d bytes in size" % (downloaded,)
diff --git a/src/jobmanager.py b/src/jobmanager.py
new file mode 100644 (file)
index 0000000..8ca4df8
--- /dev/null
@@ -0,0 +1,355 @@
+#!/usr/bin/env python2.5
+
+# Copyright (c) 2011 Neal H. Walfield <neal@walfield.org>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+import threading
+import thread
+import traceback
+import heapq
+import sys
+import mainthread
+
+def debug(*args):
+    if False:
+        sys.stdout.write(*args)
+        sys.stdout.write("\n")
+
+# The default priority.  Like nice(), a smaller numeric priority
+# corresponds to a higher priority class.
+default_priority = 0
+
+class JobRunner(threading.Thread):
+    def __init__(self, job_manager):
+        threading.Thread.__init__(self)
+        self.job_manager = job_manager
+
+    def run (self):
+        have_lock = True
+        self.job_manager.lock.acquire ()
+        try:
+            while self.job_manager.pause == 0 and not self.job_manager.do_quit:
+                try:
+                    _, key, job = heapq.heappop (self.job_manager.queue)
+                except IndexError:
+                    return
+    
+                try:
+                    self.job_manager.in_progress.append (key)
+                    self.job_manager.lock.release ()
+                    have_lock = False
+        
+                    # Execute the job.
+                    try:
+                        job ()
+                    except KeyboardInterrupt:
+                        # This is handled below and doesn't require a
+                        # traceback.
+                        raise
+                    except:
+                        print ("Executing job %s (%s) from thread %s: %s"
+                               % (str (key), str (job),
+                                  threading.currentThread(),
+                                  traceback.format_exc ()))
+        
+                    self.job_manager.lock.acquire ()
+                    have_lock = True
+    
+                    assert key in self.job_manager.in_progress
+                finally:
+                    try:
+                        self.job_manager.in_progress.remove (key)
+                    except ValueError:
+                        pass
+    
+                debug("Finished executing job %s (%s)" % (key, job,))
+    
+                self.job_manager._stats_hooks_run ({'job':job, 'key':key})
+        except KeyboardInterrupt:
+            debug("%s: KeyboardInterrupt" % threading.currentThread())
+            thread.interrupt_main()
+            debug("%s: Forwarded KeyboardInterrupt to main thread"
+                  % threading.currentThread())
+        finally:
+            if have_lock:
+                self.job_manager.lock.release ()
+
+            assert self in self.job_manager.threads
+            self.job_manager.threads.remove (self)
+
+            debug ("Job runner %s (%d left) exiting."
+                   % (threading.currentThread(),
+                      len (self.job_manager.threads)))
+
+_jm = None
+def JobManager(start=False):
+    """
+    Return the job manager instance.  The job manager will not start
+    executing jobs until this is called with start set to True.  Note:
+    you can still queue jobs.
+    """
+    global _jm
+    if _jm is None:
+        _jm = _JobManager ()
+    if start and not _jm.started:
+        _jm.started = True
+        if _jm.jobs > 0:
+            _jm._stats_hooks_run ()
+        _jm.tickle ()
+
+    return _jm
+
+class _JobManager(object):
+    def __init__(self, started=False, num_threads=4):
+        """
+        Initialize the job manager.
+
+        If started is false, jobs may be queued, but jobs will not be
+        started until start() is called.
+        """
+        # A reentrant lock so that a job runner can call stat without
+        # dropping the lock.
+        self.lock = threading.RLock()
+
+        # If we can start executing jobs.
+        self.started = started
+
+        # The maximum number of threads to use for executing jobs.
+        self.num_threads = num_threads
+
+        # List of jobs (priority, key, job) that are queued for
+        # execution.
+        self.queue = []
+        # List of keys of the jobs that are being executed.
+        self.in_progress = []
+        # List of threads.
+        self.threads = []
+
+        # If 0, jobs may execute, otherwise, job execution is paused.
+        self.pause = 0
+
+        # The total number of jobs that this manager ever executed.
+        self.jobs = 0
+
+        # A list of status hooks to execute when the stats change.
+        self._stats_hooks = []
+        self._current_stats = self.stats ()
+
+        self.do_quit = False
+
+    def _lock(f):
+        def wrapper(*args, **kwargs):
+            self = args[0]
+            self.lock.acquire ()
+            try:
+                return f(*args, **kwargs)
+            finally:
+                self.lock.release()
+        return wrapper
+
+    @_lock
+    def start(self):
+        """
+        Start executing jobs.
+        """
+        if self.started:
+            return
+        if self.jobs > 0:
+            self._stats_hooks_run ()
+        self.tickle ()
+
+    @_lock
+    def tickle(self):
+        """
+        Ensure that there are enough job runners for the number of
+        pending jobs.
+        """
+        if self.do_quit:
+            debug("%s.quit called, not creating new threads."
+                  % self.__class__.__name__)
+            return
+
+        if self.pause > 0:
+            # Job execution is paused.  Don't start any new threads.
+            debug("%s.tickle(): Not doing anything: paused"
+                  % (self.__class__.__name__))
+            return
+
+        debug("%s.tickle: Have %d threads (can start %d); %d jobs queued"
+              % (self.__class__.__name__,
+                 len (self.threads), self.num_threads, len (self.queue)))
+        if len (self.threads) < self.num_threads:
+            for _ in range (min (len (self.queue),
+                                 self.num_threads - len (self.threads))):
+                thread = JobRunner (self)
+                # Setting threads as daemons means faster shutdown
+                # when the main thread exists, but it results in
+                # exceptions and occassional setfaults.
+                # thread.setDaemon(True)
+                self.threads.append (thread)
+                thread.start ()
+                debug("Now have %d threads" % len (self.threads))
+
+    @_lock
+    def execute(self, job, key=None, priority=default_priority):
+        """
+        Enqueue a job for execution.  job is a function to execute.
+        If key is not None, the job is only enqueued if there is no
+        job that is inprogress or enqueued with the same key.
+        priority is the job's priority.  Like nice(), a smaller
+        numeric priority corresponds to a higher priority class.  Jobs
+        are executed highest priority first, in the order that they
+        were added.
+        """
+        if self.do_quit:
+            debug("%s.quit called, not enqueuing new jobs."
+                  % self.__class__.__name__)
+
+        if key is not None:
+            if key in self.in_progress:
+                return
+            for item in self.queue:
+                if item[1] == key:
+                    if item[0][0] < priority:
+                        # Priority raised.
+                        item[0][0] = priority
+                        self.queue = heapq.heapify (self.queue)
+                    return
+
+        # To ensure that jobs with the same priority are executed
+        # in the order they are added, we set the priority to
+        # [priority, next (monotomic counter)].
+        self.jobs += 1
+        heapq.heappush (self.queue, [[priority, self.jobs], key, job])
+
+        if self.started:
+            self._stats_hooks_run ()
+            self.tickle ()
+        else:
+            debug("%s not initialized. delaying execution of %s (%s)"
+                  % (self.__class__.__name__, key, str (job),))
+
+    @_lock
+    def pause(self):
+        """
+        Increasement the pause count.  When the pause count is greater
+        than 0, job execution is suspended.
+        """
+        self.pause += 1
+
+        if self.pause == 1:
+            self._stats_hooks_run ()
+
+    @_lock
+    def resume(self):
+        """
+        Decrement the pause count.  If the pause count is greater than
+        0 and this decrement brings it to 0, enqueued jobs are
+        resumed.
+        """
+        assert self.pause > 0
+        self.pause -= 1
+        if not self.paused():
+            self._stats_hooks_run ()
+            self.tickle ()
+
+    @_lock
+    def paused(self):
+        """
+        Returns whether job execution is paused.
+        """
+        return self.pause > 0
+
+    @_lock
+    def cancel(self):
+        """
+        Cancel any pending jobs.
+        """
+        self.queue = []
+        self._stats_hooks_run ()
+
+    def quit(self):
+        self.cancel ()
+        self.do_quit = True
+
+    @_lock
+    def stats(self):
+        """
+        Return a dictionary consisting of:
+
+          - 'paused': whether execution is paused
+          - 'jobs': the total number of jobs this manager has
+              executed, is executing or are queued
+          - 'jobs-completed': the numer of jobs that have completed
+          - 'jobs-in-progress': the number of jobs in progress
+          - 'jobs-queued': the number of jobs currently queued
+        """
+        return {'paused': self.paused(),
+                'jobs': self.jobs,
+                'jobs-completed':
+                  self.jobs - len (self.in_progress) - len (self.queue),
+                'jobs-in-progress': len (self.in_progress),
+                'jobs-queued': len (self.queue)
+                }
+
+    def stats_hook_register(self, func, *args, **kwargs):
+        """
+        Registers a function to be called when the job status changes.
+        Passed the following parameters:
+
+          - the JobManager instance.
+          - the previous stats (as returned by stats)
+          - the current stats
+          - the job that was completed (or None)
+
+        Note: the hook may not be run in the main thread!
+        """
+        mainthread=False
+        try:
+            mainthread = kwargs['run_in_main_thread']
+            del kwargs['run_in_main_thread']
+        except KeyError:
+            pass
+        self._stats_hooks.append ([func, mainthread, args, kwargs])
+
+    def _stats_hooks_run(self, completed_job=None):
+        """
+        Run the stats hooks.
+        """
+        # if not self._stats_hooks:
+        #     return
+
+        self.lock.acquire ()
+        try:
+            old_stats = self._current_stats
+            self._current_stats = self.stats ()
+            current_stats = self._current_stats
+        finally:
+            self.lock.release ()
+
+        debug("%s -> %s" % (str (old_stats), str (current_stats)))
+
+        for (f, run_in_main_thread, args, kwargs) in self._stats_hooks:
+            if run_in_main_thread:
+                debug("JobManager._stats_hooks_run: Running %s in main thread"
+                      % f)
+                mainthread.execute(
+                    f, self, old_stats, current_stats, completed_job,
+                    async=True, *args, **kwargs)
+            else:
+                debug("JobManager._stats_hooks_run: Running %s in any thread"
+                      % f)
+                f(self, old_stats, current_stats, completed_job,
+                  *args, **kwargs)
diff --git a/src/mainthread.py b/src/mainthread.py
new file mode 100644 (file)
index 0000000..a85cf5a
--- /dev/null
@@ -0,0 +1,157 @@
+#!/usr/bin/env python2.5
+
+# Copyright (c) 2011 Neal H. Walfield <neal@walfield.org>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+import threading
+import traceback
+
+_run_in_main_thread = None
+_main_thread = None
+
+def init(run_in_main_thread=None):
+    """
+    run_in_main_thread is a function that takes a single argument, a
+    callable and returns False.  run_in_main_thread should run the
+    function in the main thread.
+
+    If you are using glib, gobject.idle_add (the default) is
+    sufficient.  (gobject.idle_add is thread-safe.)
+    """
+    if run_in_main_thread is None:
+        import gobject
+        run_in_main_thread = gobject.idle_add
+
+    global _run_in_main_thread
+    assert _run_in_main_thread is None
+    _run_in_main_thread = run_in_main_thread
+
+    global _main_thread
+    _main_thread = threading.currentThread ()
+
+def execute(func, *args, **kwargs):
+    """
+    Execute FUNC in the main thread.
+
+    If kwargs['async'] exists and is True, the function is executed
+    asynchronously (i.e., the thread does not wait for the function to
+    return in which case the function's return value is discarded).
+    Otherwise, this function waits until the function is executed and
+    returns its return value.
+    """
+    async = False
+    try:
+        async = kwargs['async']
+        del kwargs['async']
+    except KeyError:
+        pass
+
+    if threading.currentThread() == _main_thread:
+        if async:
+            try:
+                func (*args, **kwargs)
+            except:
+                print ("mainthread.execute: Executing %s: %s"
+                       % (func, traceback.format_exc ()))
+            return
+        else:
+            return func (*args, **kwargs)
+
+    assert _run_in_main_thread is not None, \
+        "You can't call this function from a non-main thread until you've called init()"
+
+    if not async:
+        cond = threading.Condition()
+
+    result = {}
+    result['done'] = False
+
+    def doit():
+        def it():
+            # Execute the function.
+            assert threading.currentThread() == _main_thread
+
+            try:
+                result['result'] = func (*args, **kwargs)
+            except:
+                print ("mainthread.execute: Executing %s: %s"
+                       % (func, traceback.format_exc ()))
+
+            if not async:
+                cond.acquire ()
+            result['done'] = True
+            if not async:
+                cond.notify ()
+                cond.release ()
+
+            return False
+        return it
+
+    if not async:
+        cond.acquire ()
+    _run_in_main_thread (doit())
+
+    if async:
+        # Don't wait for the method to complete execution.
+        return
+
+    # Wait for the result to become available.
+    while not result['done']:
+        cond.wait ()
+
+    return result.get ('result', None)
+
+if __name__ == "__main__":
+    import sys
+    import gobject
+
+    init()
+
+    def in_main_thread(test_num):
+        assert threading.currentThread() == _main_thread, \
+            "Test %d failed" % (test_num,)
+        return test_num
+
+    mainloop = gobject.MainLoop()
+    gobject.threads_init()
+
+    assert execute (in_main_thread, 1) == 1
+    assert (execute (in_main_thread, 2, async=False) == 2)
+    execute (in_main_thread, 3, async=True)
+
+    class T(threading.Thread):
+        def __init__(self):
+            threading.Thread.__init__(self)
+
+        def run(self):
+            assert threading.currentThread() != _main_thread
+
+            assert execute (in_main_thread, 4) == 4
+            assert (execute (in_main_thread, 5, async=False) == 5)
+            execute (in_main_thread, 6, async=True)
+            execute (mainloop.quit, async=False)
+
+    def start_thread():
+        t = T()
+        t.start()
+        return False
+
+    gobject.idle_add (start_thread)
+    mainloop.run()
+
+def mainthread(f):
+    def wrapper(*args, **kwargs):
+        return execute (f, *args, **kwargs)
+    return wrapper
index 64aef0a..e8b44a2 100644 (file)
@@ -28,6 +28,7 @@ import sqlite3
 from os.path import isfile, isdir
 from shutil import rmtree
 from os import mkdir, remove, utime
 from os.path import isfile, isdir
 from shutil import rmtree
 from os import mkdir, remove, utime
+import os
 import md5
 import feedparser
 import time
 import md5
 import feedparser
 import time
@@ -35,12 +36,36 @@ import urllib2
 from BeautifulSoup import BeautifulSoup
 from urlparse import urljoin
 from calendar import timegm
 from BeautifulSoup import BeautifulSoup
 from urlparse import urljoin
 from calendar import timegm
+from updatedbus import get_lock, release_lock
 import threading
 import threading
+import traceback
+from jobmanager import JobManager
+import mainthread
+from httpprogresshandler import HTTPProgressHandler
 
 def getId(string):
     return md5.new(string).hexdigest()
 
 
 def getId(string):
     return md5.new(string).hexdigest()
 
+def download_callback(connection):
+    if JobManager().do_quit:
+        raise KeyboardInterrupt
+
+def downloader(progress_handler=None, proxy=None):
+    openers = []
+
+    if progress_handler:
+        openers.append (progress_handler)
+    else:
+        openers.append(HTTPProgressHandler(download_callback))
+
+    if proxy:
+        openers.append (proxy)
+
+    return urllib2.build_opener (*openers)
+
 class Feed:
 class Feed:
+    serial_execution_lock = threading.Lock()
+
     def _getdb(self):
         try:
             db = self.tls.db
     def _getdb(self):
         try:
             db = self.tls.db
@@ -63,17 +88,36 @@ class Feed:
             self.db.execute("CREATE TABLE images (id text, imagePath text);")
             self.db.commit()
 
             self.db.execute("CREATE TABLE images (id text, imagePath text);")
             self.db.commit()
 
-    def addImage(self, configdir, key, baseurl, url):
+    def addImage(self, configdir, key, baseurl, url, proxy=None, opener=None):
         filename = configdir+key+".d/"+getId(url)
         if not isfile(filename):
             try:
         filename = configdir+key+".d/"+getId(url)
         if not isfile(filename):
             try:
-                f = urllib2.urlopen(urljoin(baseurl,url))
+                if not opener:
+                    opener = downloader(proxy=proxy)
+
+                abs_url = urljoin(baseurl,url)
+                f = opener.open(abs_url)
                 outf = open(filename, "w")
                 outf.write(f.read())
                 f.close()
                 outf.close()
                 outf = open(filename, "w")
                 outf.write(f.read())
                 f.close()
                 outf.close()
+            except (urllib2.HTTPError, urllib2.URLError), exception:
+                print ("Could not download image %s: %s"
+                       % (abs_url, str (exception)))
+                return None
             except:
             except:
-                print "Could not download " + url
+                exception = sys.exc_info()[0]
+
+                print "Downloading image: %s" % abs_url
+                import traceback
+                traceback.print_exc()
+
+                try:
+                    remove(filename)
+                except OSError:
+                    pass
+
+                raise exception
         else:
             #open(filename,"a").close()  # "Touch" the file
             file = open(filename,"a")
         else:
             #open(filename,"a").close()  # "Touch" the file
             file = open(filename,"a")
@@ -81,147 +125,246 @@ class Feed:
             file.close()
         return filename
 
             file.close()
         return filename
 
-    def updateFeed(self, configdir, url, etag, modified, expiryTime=24, proxy=None, imageCache=False):
-        # Expiry time is in hours
-        if proxy == None:
-            tmp=feedparser.parse(url, etag = etag, modified = modified)
-        else:
-            tmp=feedparser.parse(url, etag = etag, modified = modified, handlers = [proxy])
-        expiry = float(expiryTime) * 3600.
+    def updateFeed(self, configdir, url, etag, modified, expiryTime=24, proxy=None, imageCache=False, priority=0, postFeedUpdateFunc=None, *postFeedUpdateFuncArgs):
+        def doit():
+            def it():
+                self._updateFeed(configdir, url, etag, modified, expiryTime, proxy, imageCache, postFeedUpdateFunc, *postFeedUpdateFuncArgs)
+            return it
+        JobManager().execute(doit(), self.key, priority=priority)
 
 
-        currentTime = 0
-        # Check if the parse was succesful (number of entries > 0, else do nothing)
-        if len(tmp["entries"])>0:
-           currentTime = time.time()
-           # The etag and modified value should only be updated if the content was not null
-           try:
-               etag = tmp["etag"]
-           except KeyError:
-               etag = None
-           try:
-               modified = tmp["modified"]
-           except KeyError:
-               modified = None
-           try:
-               f = urllib2.urlopen(urljoin(tmp["feed"]["link"],"/favicon.ico"))
-               data = f.read()
-               f.close()
-               outf = open(self.dir+"/favicon.ico", "w")
-               outf.write(data)
-               outf.close()
-               del data
-           except:
-               #import traceback
-               #traceback.print_exc()
-                pass
+    def _updateFeed(self, configdir, url, etag, modified, expiryTime=24, proxy=None, imageCache=False, postFeedUpdateFunc=None, *postFeedUpdateFuncArgs):
+        success = False
+        have_serial_execution_lock = False
+        try:
+            update_lock = None
+            update_lock = get_lock("key")
+            if not update_lock:
+                # Someone else is doing an update.
+                return
+
+            download_start = time.time ()
 
 
+            progress_handler = HTTPProgressHandler(download_callback)
 
 
-           #reversedEntries = self.getEntries()
-           #reversedEntries.reverse()
+            openers = [progress_handler]
+            if proxy:
+                openers.append (proxy)
+            kwargs = {'handlers':openers}
+            
+            tmp=feedparser.parse(url, etag=etag, modified=modified, **kwargs)
+            download_duration = time.time () - download_start
+    
+            opener = downloader(progress_handler, proxy)
 
 
-           ids = self.getIds()
+            if JobManager().do_quit:
+                raise KeyboardInterrupt
 
 
-           tmp["entries"].reverse()
-           for entry in tmp["entries"]:
-               date = self.extractDate(entry)
+            process_start = time.time()
+
+            # Expiry time is in hours
+            expiry = float(expiryTime) * 3600.
+    
+            currentTime = 0
+            http_status = tmp.get ('status', 200)
+    
+            # Check if the parse was succesful.  If the http status code
+            # is 304, then the download was successful, but there is
+            # nothing new.  Indeed, no content is returned.  This make a
+            # 304 look like an error because there are no entries and the
+            # parse fails.  But really, everything went great!  Check for
+            # this first.
+            if http_status == 304:
+                success = True
+            elif len(tmp["entries"])==0 and not tmp.version:
+                # An error occured fetching or parsing the feed.  (Version
+                # will be either None if e.g. the connection timed our or
+                # '' if the data is not a proper feed)
+                print ("Error fetching %s: version is: %s: error: %s"
+                       % (url, str (tmp.version),
+                          str (tmp.get ('bozo_exception', 'Unknown error'))))
+                print tmp
+            else:
+               currentTime = time.time()
+               # The etag and modified value should only be updated if the content was not null
                try:
                try:
-                   entry["title"]
-               except:
-                   entry["title"] = "No Title"
+                   etag = tmp["etag"]
+               except KeyError:
+                   etag = None
                try:
                try:
-                   entry["link"]
-               except:
-                   entry["link"] = ""
+                   modified = tmp["modified"]
+               except KeyError:
+                   modified = None
                try:
                try:
-                   entry["author"]
-               except:
-                   entry["author"] = None
-               if(not(entry.has_key("id"))):
-                   entry["id"] = None 
-               tmpEntry = {"title":entry["title"], "content":self.extractContent(entry),
-                            "date":date, "link":entry["link"], "author":entry["author"], "id":entry["id"]}
-               id = self.generateUniqueId(tmpEntry)
-               
-               #articleTime = time.mktime(self.entries[id]["dateTuple"])
-               soup = BeautifulSoup(self.getArticle(tmpEntry)) #tmpEntry["content"])
-               images = soup('img')
-               baseurl = tmpEntry["link"]
-               #if not id in ids:
-               if imageCache:
-                   for img in images:
-                      try:
-                        filename = self.addImage(configdir, self.key, baseurl, img['src'])
-                        img['src']="file://%s" %filename
-                        count = self.db.execute("SELECT count(1) FROM images where id=? and imagePath=?;", (id, filename )).fetchone()[0]
-                        if count == 0:
-                            self.db.execute("INSERT INTO images (id, imagePath) VALUES (?, ?);", (id, filename) )
-                      except:
-                          import traceback
-                          traceback.print_exc()
-                          print "Error downloading image %s" % img
-               tmpEntry["contentLink"] = configdir+self.key+".d/"+id+".html"
-               file = open(tmpEntry["contentLink"], "w")
-               file.write(soup.prettify())
-               file.close()
-               if id in ids:
-                   self.db.execute("UPDATE feed SET updated=? WHERE id=?;", (currentTime, id) )
-                   self.db.commit()
-               else:
-                   values = (id, tmpEntry["title"], tmpEntry["contentLink"], tmpEntry["date"], currentTime, tmpEntry["link"], 0)
-                   self.db.execute("INSERT INTO feed (id, title, contentLink, date, updated, link, read) VALUES (?, ?, ?, ?, ?, ?, ?);", values)
-                   self.db.commit()
-#               else:
-#                   try:
-#                       self.db.execute("UPDATE feed SET updated=? WHERE id=?;", (currentTime, id) )
-#                       self.db.commit()
-#                       filename = configdir+self.key+".d/"+id+".html"
-#                       file = open(filename,"a")
-#                       utime(filename, None)
-#                       file.close()
-#                       images = self.db.execute("SELECT imagePath FROM images where id=?;", (id, )).fetchall()
-#                       for image in images:
-#                            file = open(image[0],"a")
-#                            utime(image[0], None)
-#                            file.close()
-#                   except:
-#                       pass
-           self.db.commit()
-            
-           
-        rows = self.db.execute("SELECT id FROM feed WHERE (read=0 AND updated<?) OR (read=1 AND updated<?);", (currentTime-2*expiry, currentTime-expiry))
-        for row in rows:
-           self.removeEntry(row[0])
+                   abs_url = urljoin(tmp["feed"]["link"],"/favicon.ico")
+                   f = opener.open(abs_url)
+                   data = f.read()
+                   f.close()
+                   outf = open(self.dir+"/favicon.ico", "w")
+                   outf.write(data)
+                   outf.close()
+                   del data
+               except (urllib2.HTTPError, urllib2.URLError), exception:
+                   print ("Could not download favicon %s: %s"
+                          % (abs_url, str (exception)))
+    
+               self.serial_execution_lock.acquire ()
+               have_serial_execution_lock = True
+
+               #reversedEntries = self.getEntries()
+               #reversedEntries.reverse()
+    
+               ids = self.getIds()
+    
+               tmp["entries"].reverse()
+               for entry in tmp["entries"]:
+                   # Yield so as to make the main thread a bit more
+                   # responsive.
+                   time.sleep(0)
+    
+                   if JobManager().do_quit:
+                       raise KeyboardInterrupt
+
+                   received_base = progress_handler.stats['received']
+                   sent_base = progress_handler.stats['sent']
+                   object_size = 0
+
+                   date = self.extractDate(entry)
+                   try:
+                       entry["title"]
+                   except KeyError:
+                       entry["title"] = "No Title"
+                   try :
+                       entry["link"]
+                   except KeyError:
+                       entry["link"] = ""
+                   try:
+                       entry["author"]
+                   except KeyError:
+                       entry["author"] = None
+                   if(not(entry.has_key("id"))):
+                       entry["id"] = None
+                   content = self.extractContent(entry)
+                   object_size = len (content)
+                   received_base -= len (content)
+                   tmpEntry = {"title":entry["title"], "content":content,
+                                "date":date, "link":entry["link"], "author":entry["author"], "id":entry["id"]}
+                   id = self.generateUniqueId(tmpEntry)
+                   
+                   #articleTime = time.mktime(self.entries[id]["dateTuple"])
+                   soup = BeautifulSoup(self.getArticle(tmpEntry)) #tmpEntry["content"])
+                   images = soup('img')
+                   baseurl = tmpEntry["link"]
+                   #if not id in ids:
+                   if imageCache and len(images) > 0:
+                       self.serial_execution_lock.release ()
+                       have_serial_execution_lock = False
+                       for img in images:
+                            filename = self.addImage(configdir, self.key, baseurl, img['src'], proxy=proxy)
+                            if filename:
+                                img['src']="file://%s" %filename
+                                count = self.db.execute("SELECT count(1) FROM images where id=? and imagePath=?;", (id, filename )).fetchone()[0]
+                                if count == 0:
+                                    self.db.execute("INSERT INTO images (id, imagePath) VALUES (?, ?);", (id, filename) )
+                                    self.db.commit()
+    
+                                try:
+                                    object_size += os.path.getsize (filename)
+                                except os.error, exception:
+                                    print ("Error getting size of %s: %s"
+                                           % (filename, exception))
+                                    pass
+                       self.serial_execution_lock.acquire ()
+                       have_serial_execution_lock = True
+    
+                   tmpEntry["contentLink"] = configdir+self.key+".d/"+id+".html"
+                   file = open(tmpEntry["contentLink"], "w")
+                   file.write(soup.prettify())
+                   file.close()
+                   if id in ids:
+                       self.db.execute("UPDATE feed SET updated=? WHERE id=?;", (currentTime, id) )
+                       self.db.commit()
+                   else:
+                       values = (id, tmpEntry["title"], tmpEntry["contentLink"], tmpEntry["date"], currentTime, tmpEntry["link"], 0)
+                       self.db.execute("INSERT INTO feed (id, title, contentLink, date, updated, link, read) VALUES (?, ?, ?, ?, ?, ?, ?);", values)
+                       self.db.commit()
+#                   else:
+#                       try:
+#                           self.db.execute("UPDATE feed SET updated=? WHERE id=?;", (currentTime, id) )
+#                           self.db.commit()
+#                           filename = configdir+self.key+".d/"+id+".html"
+#                           file = open(filename,"a")
+#                           utime(filename, None)
+#                           file.close()
+#                           images = self.db.execute("SELECT imagePath FROM images where id=?;", (id, )).fetchall()
+#                           for image in images:
+#                                file = open(image[0],"a")
+#                                utime(image[0], None)
+#                                file.close()
+#                       except:
+#                           pass
+    
         
         
-        from glob import glob
-        from os import stat
-        for file in glob(configdir+self.key+".d/*"):
-            #
-            stats = stat(file)
-            #
-            # put the two dates into matching format
-            #
-            lastmodDate = stats[8]
-            #
-            expDate = time.time()-expiry*3
-            # check if image-last-modified-date is outdated
-            #
-            if expDate > lastmodDate:
+               self.db.commit()
+
+               success = True
+
+            rows = self.db.execute("SELECT id FROM feed WHERE (read=0 AND updated<?) OR (read=1 AND updated<?);", (currentTime-2*expiry, currentTime-expiry))
+            for row in rows:
+               self.removeEntry(row[0])
+            
+            from glob import glob
+            from os import stat
+            for file in glob(configdir+self.key+".d/*"):
                 #
                 #
-                try:
-                    #
-                    #print 'Removing', file
-                    #
-                    remove(file) # commented out for testing
-                    #
-                except OSError:
+                stats = stat(file)
+                #
+                # put the two dates into matching format
+                #
+                lastmodDate = stats[8]
+                #
+                expDate = time.time()-expiry*3
+                # check if image-last-modified-date is outdated
+                #
+                if expDate > lastmodDate:
                     #
                     #
-                    print 'Could not remove', file
-        updateTime = 0
-        rows = self.db.execute("SELECT MAX(date) FROM feed;")
-        for row in rows:
-            updateTime=row[0]
-        return (updateTime, etag, modified)
-    
+                    try:
+                        #
+                        #print 'Removing', file
+                        #
+                        remove(file) # commented out for testing
+                        #
+                    except OSError, exception:
+                        #
+                        print 'Could not remove %s: %s' % (file, str (exception))
+            print ("updated %s: %fs in download, %fs in processing"
+                   % (self.key, download_duration,
+                      time.time () - process_start))
+        finally:
+            self.db.commit ()
+
+            if have_serial_execution_lock:
+                self.serial_execution_lock.release ()
+
+            if update_lock is not None:
+                release_lock (update_lock)
+
+            updateTime = 0
+            try:
+                rows = self.db.execute("SELECT MAX(date) FROM feed;")
+                for row in rows:
+                    updateTime=row[0]
+            except:
+                print "Fetching update time."
+                import traceback
+                traceback.print_exc()
+            finally:
+                if not success:
+                    etag = None
+                    modified = None
+                if postFeedUpdateFunc is not None:
+                    postFeedUpdateFunc (self.key, updateTime, etag, modified,
+                                        *postFeedUpdateFuncArgs)
+
     def setEntryRead(self, id):
         self.db.execute("UPDATE feed SET read=1 WHERE id=?;", (id,) )
         self.db.commit()
     def setEntryRead(self, id):
         self.db.execute("UPDATE feed SET read=1 WHERE id=?;", (id,) )
         self.db.commit()
@@ -382,9 +525,10 @@ class ArchivedArticles(Feed):
             images = soup('img')
             baseurl = link
             for img in images:
             images = soup('img')
             baseurl = link
             for img in images:
-                filename = self.addImage(configdir, self.key, baseurl, img['src'])
+                filename = self.addImage(configdir, self.key, baseurl, img['src'], proxy=proxy)
                 img['src']=filename
                 self.db.execute("INSERT INTO images (id, imagePath) VALUES (?, ?);", (id, filename) )
                 img['src']=filename
                 self.db.execute("INSERT INTO images (id, imagePath) VALUES (?, ?);", (id, filename) )
+                self.db.commit()
             contentLink = configdir+self.key+".d/"+id+".html"
             file = open(contentLink, "w")
             file.write(soup.prettify())
             contentLink = configdir+self.key+".d/"+id+".html"
             file = open(contentLink, "w")
             file.write(soup.prettify())
@@ -422,7 +566,8 @@ class Listing:
     db = property(_getdb)
 
     # Lists all the feeds in a dictionary, and expose the data
     db = property(_getdb)
 
     # Lists all the feeds in a dictionary, and expose the data
-    def __init__(self, configdir):
+    def __init__(self, config, configdir):
+        self.config = config
         self.configdir = configdir
 
         self.tls = threading.local ()
         self.configdir = configdir
 
         self.tls = threading.local ()
@@ -448,7 +593,7 @@ class Listing:
                     self.addCategory("Default Category")
                     self.db.execute("ALTER TABLE feeds ADD COLUMN category int;")
                     self.db.execute("UPDATE feeds SET category=1;")
                     self.addCategory("Default Category")
                     self.db.execute("ALTER TABLE feeds ADD COLUMN category int;")
                     self.db.execute("UPDATE feeds SET category=1;")
-                    self.db.commit()
+            self.db.commit()
         except:
             pass
 
         except:
             pass
 
@@ -507,14 +652,34 @@ class Listing:
         archFeed.addArchivedArticle(title, link, date, self.configdir)
         self.updateUnread("ArchivedArticles")
         
         archFeed.addArchivedArticle(title, link, date, self.configdir)
         self.updateUnread("ArchivedArticles")
         
-    def updateFeed(self, key, expiryTime=24, proxy=None, imageCache=False):
+    def updateFeed(self, key, expiryTime=None, proxy=None, imageCache=None,
+                   priority=0):
+        if expiryTime is None:
+            expiryTime = self.config.getExpiry()
+        if not expiryTime:
+            # Default to 24 hours
+            expriyTime = 24
+        if proxy is None:
+            (use_proxy, proxy) = self.config.getProxy()
+            if not use_proxy:
+                proxy = None
+        if imageCache is None:
+            imageCache = self.config.getImageCache()
+
         feed = self.getFeed(key)
         (url, etag, modified) = self.db.execute("SELECT url, etag, modified FROM feeds WHERE id=?;", (key,) ).fetchone()
         try:
             modified = time.struct_time(eval(modified))
         except:
             modified = None
         feed = self.getFeed(key)
         (url, etag, modified) = self.db.execute("SELECT url, etag, modified FROM feeds WHERE id=?;", (key,) ).fetchone()
         try:
             modified = time.struct_time(eval(modified))
         except:
             modified = None
-        (updateTime, etag, modified) = feed.updateFeed(self.configdir, url, etag, modified, expiryTime, proxy, imageCache)
+        feed.updateFeed(
+            self.configdir, url, etag, modified, expiryTime, proxy, imageCache,
+            priority, postFeedUpdateFunc=self._queuePostFeedUpdate)
+
+    def _queuePostFeedUpdate(self, *args, **kwargs):
+        mainthread.execute (self._postFeedUpdate, async=True, *args, **kwargs)
+
+    def _postFeedUpdate(self, key, updateTime, etag, modified):
         if modified==None:
             modified="None"
         else:
         if modified==None:
             modified="None"
         else:
index 5122a5b..9516594 100644 (file)
@@ -2,6 +2,7 @@
 
 # 
 # Copyright (c) 2007-2008 INdT.
 
 # 
 # Copyright (c) 2007-2008 INdT.
+# Copyright (c) 2011 Neal H. Walfield
 # This program is free software: you can redistribute it and/or modify
 # it under the terms of the GNU Lesser General Public License as published by
 # the Free Software Foundation, either version 3 of the License, or
 # This program is free software: you can redistribute it and/or modify
 # it under the terms of the GNU Lesser General Public License as published by
 # the Free Software Foundation, either version 3 of the License, or
 
 from rss_sqlite import Listing
 from config import Config
 
 from rss_sqlite import Listing
 from config import Config
+from updatedbus import get_lock, UpdateServerObject
 
 
-import threading
 import os
 import gobject
 import os
 import gobject
+import traceback
+
+from jobmanager import JobManager
+import mainthread
 
 CONFIGDIR="/home/user/.feedingit/"
 #DESKTOP_FILE = "/usr/share/applications/hildon-status-menu/feedingit_status.desktop"
 
 CONFIGDIR="/home/user/.feedingit/"
 #DESKTOP_FILE = "/usr/share/applications/hildon-status-menu/feedingit_status.desktop"
@@ -39,100 +44,59 @@ timeout = 5
 setdefaulttimeout(timeout)
 del timeout
 
 setdefaulttimeout(timeout)
 del timeout
 
-from updatedbus import UpdateServerObject, get_lock
-
-class Download(threading.Thread):
-    def __init__(self, config, dbusHandler):
-        global dbug
-        threading.Thread.__init__(self)
-        self.running = True
-        self.config = config
-        self.dbusHandler = dbusHandler
-        if dbug:
-            self.dbug = open(CONFIGDIR+"dbug.log", "w")
-        
-    def run(self):
-        global dbug
-        if dbug:
-            self.dbug.write("Starting updates\n")
-        try:
-            self.dbusHandler.UpdateStarted()
-            (use_proxy, proxy) = self.config.getProxy()
-            listing = Listing(CONFIGDIR)
-            for key in listing.getListOfFeeds():
-                if dbug:
-                    self.dbug.write("updating %s\n" %key)
-                try:
-                    if use_proxy:
-                        from urllib2 import install_opener, build_opener
-                        install_opener(build_opener(proxy))
-                        listing.updateFeed(key, self.config.getExpiry(), proxy=proxy, imageCache=self.config.getImageCache() )
-                    else:
-                        listing.updateFeed(key, self.config.getExpiry(), imageCache=self.config.getImageCache() )
-                except:
-                    import traceback
-                    file = open("/home/user/.feedingit/feedingit_update.log", "a")
-                    traceback.print_exc(file=file)
-                    file.close()
-                if not self.running:
-                    if dbug:
-                        self.dbug.write("received stopUpdate after %s\n" %key)
-                    break
-            self.dbusHandler.UpdateFinished()
-            self.dbusHandler.ArticleCountUpdated()
-            if dbug:
-                self.dbug.write("Dbus ArticleCountUpdated signal sent\n")
-        except:
-            import traceback
-            file = open("/home/user/.feedingit/feedingit_update.log", "a")
-            traceback.print_exc(file=file)
-            file.close()
-            #pass
-        if dbug:
-            self.dbug.write("About to main_quit\n")
-        mainloop.quit()
-        if dbug:
-            self.dbug.write("After main_quit\n")
-            self.dbug.close()
+from updatedbus import UpdateServerObject
+
+debug_file = None
+def debug(*args):
+    global debug_file
+    if not debug_file:
+        debug_file = open("/home/user/.feedingit/feedingit_update.log", "a")
+
+    debug_file.write (*args)
 
 class FeedUpdate():
     def __init__(self):
         self.config = Config(self, CONFIGDIR+"config.ini")
         self.dbusHandler = UpdateServerObject(self)
 
 class FeedUpdate():
     def __init__(self):
         self.config = Config(self, CONFIGDIR+"config.ini")
         self.dbusHandler = UpdateServerObject(self)
-        self.updateThread = False
-        
-    def automaticUpdate(self):
-        #self.listing.updateFeeds()
-        if self.updateThread == False:
-            self.updateThread = Download(self.config, self.dbusHandler)
-            self.updateThread.start()
+        self.listing = Listing(self.config, CONFIGDIR)
+        self.done = False
+
+        jm = JobManager(True)
+        jm.stats_hook_register (self.job_manager_update,
+                                run_in_main_thread=True)
+
+        self.dbusHandler.UpdateStarted()
+        for k in self.listing.getListOfFeeds():
+            self.listing.updateFeed (k)
         
         
-    def stopUpdate(self):
-        try:
-            self.updateThread.running = False
-        except:
+    def job_manager_update(self, jm, old_stats, new_stats, updated_feed):
+        if not new_stats['jobs-queued'] and not new_stats['jobs-in-progress']:
+            self.dbusHandler.UpdateFinished()
+            self.dbusHandler.ArticleCountUpdated()
+            self.done = True
             mainloop.quit()
 
             mainloop.quit()
 
+    def stopUpdate(self):
+        print "Stop update called."
+        JobManager().quit()
+
 import dbus.mainloop.glib
 dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
 
 gobject.threads_init()
 import dbus.mainloop.glib
 dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
 
 gobject.threads_init()
+mainthread.init()
 mainloop = gobject.MainLoop()
 
 app_lock = get_lock("app_lock")
 
 if app_lock != None:
 mainloop = gobject.MainLoop()
 
 app_lock = get_lock("app_lock")
 
 if app_lock != None:
-    try:
-        feed = FeedUpdate()
-        mainloop.run()
-        del app_lock
-    except:
-        import traceback
-        file = open("/home/user/.feedingit/feedingit_update.log", "w")
-        traceback.print_exc(file=file)
-        file.close()
+    feed = FeedUpdate()
+    while not feed.done:
+        try:
+            mainloop.run()
+        except KeyboardInterrupt:
+            print "Interrupted.  Quitting."
+            JobManager().quit()
+    del app_lock
 else:
 else:
-    file = open("/home/user/.feedingit/feedingit_update.log", "a")
-    file.write("Update in progress")
-    file.close()
-    
+    debug("Update in progress")
index e712c7e..8e110ae 100644 (file)
 
 import dbus
 import dbus.service
 
 import dbus
 import dbus.service
+import mainthread
 
 
+@mainthread.mainthread
 def get_lock(key):
     try:
         bus_name = dbus.service.BusName('org.marcoz.feedingit.lock_%s' %key,bus=dbus.SessionBus(), do_not_queue=True)
     except:
         bus_name = None
     return bus_name
 def get_lock(key):
     try:
         bus_name = dbus.service.BusName('org.marcoz.feedingit.lock_%s' %key,bus=dbus.SessionBus(), do_not_queue=True)
     except:
         bus_name = None
     return bus_name
-    
+
+def release_lock(lock):
+    assert lock is not None
+    mainthread.execute(lock.__del__, async=True)
 
 class UpdateServerObject(dbus.service.Object):
     def __init__(self, app):
 
 class UpdateServerObject(dbus.service.Object):
     def __init__(self, app):