psa: remove old event feed items
[feedingit] / src / update_feeds.py
index bb6c15a..4ff52a6 100644 (file)
@@ -2,6 +2,7 @@
 
 # 
 # Copyright (c) 2007-2008 INdT.
+# Copyright (c) 2011 Neal H. Walfield
 # This program is free software: you can redistribute it and/or modify
 # it under the terms of the GNU Lesser General Public License as published by
 # the Free Software Foundation, either version 3 of the License, or
 # Description : Simple RSS Reader
 # ============================================================================
 
-from rss import Listing
+from rss_sqlite import Listing
 from config import Config
+from updatedbus import UpdateServerObject
 
-import threading
 import os
+import traceback
+import sys
+import dbus
+
+from jobmanager import JobManager
+import mainthread
+
 import gobject
+gobject.threads_init()
 
-CONFIGDIR="/home/user/.feedingit/"
+import logging
+logger = logging.getLogger(__name__)
+import debugging
+debugging.init(dot_directory=".feedingit", program_name="update_feeds")
+
+from updatedbus import update_server_object
+
+CONFIGDIR = os.environ.get("HOME", "/home/user") + "/.feedingit/"
 #DESKTOP_FILE = "/usr/share/applications/hildon-status-menu/feedingit_status.desktop"
-dbug = False
 
 from socket import setdefaulttimeout
 timeout = 5
 setdefaulttimeout(timeout)
 del timeout
 
-from updatedbus import UpdateServerObject, get_lock
-
-class Download(threading.Thread):
-    def __init__(self, listing, config, dbusHandler):
-        global dbug
-        threading.Thread.__init__(self)
-        self.running = True
-        self.listing = listing
-        self.config = config
-        self.dbusHandler = dbusHandler
-        if dbug:
-            self.dbug = open(CONFIGDIR+"dbug.log", "w")
-        
-    def run(self):
-        global dbug
-        if dbug:
-            self.dbug.write("Starting updates\n")
-        try:
-            self.dbusHandler.UpdateStarted()
-            (use_proxy, proxy) = self.config.getProxy()
-            for key in self.listing.getListOfFeeds():
-                if dbug:
-                    self.dbug.write("updating %s\n" %key)
-                try:
-                    if use_proxy:
-                        from urllib2 import install_opener, build_opener
-                        install_opener(build_opener(proxy))
-                        self.listing.updateFeed(key, self.config.getExpiry(), proxy=proxy, imageCache=self.config.getImageCache() )
-                    else:
-                        self.listing.updateFeed(key, self.config.getExpiry(), imageCache=self.config.getImageCache() )
-                except:
-                    import traceback
-                    file = open("/home/user/.feedingit/feedingit_update.log", "a")
-                    traceback.print_exc(file=file)
-                    file.close()
-                if not self.running:
-                    if dbug:
-                        self.dbug.write("received stopUpdate after %s\n" %key)
-                    break
-            self.dbusHandler.UpdateFinished()
-            self.dbusHandler.ArticleCountUpdated()
-            if dbug:
-                self.dbug.write("Dbus ArticleCountUpdated signal sent\n")
-        except:
-            pass
-        self.listing.saveConfig()
-        if dbug:
-            self.dbug.write("About to main_quit\n")
-        mainloop.quit()
-        if dbug:
-            self.dbug.write("After main_quit\n")
-            self.dbug.close()
-
-class FeedUpdate():
-    def __init__(self):
-        self.listing = Listing(CONFIGDIR)
+class FeedUpdate(UpdateServerObject):
+    def __init__(self, bus_name):
+        UpdateServerObject.__init__(self, bus_name)
+
         self.config = Config(self, CONFIGDIR+"config.ini")
-        self.dbusHandler = UpdateServerObject(self)
-        self.updateThread = False
-        
-    def automaticUpdate(self):
-        #self.listing.updateFeeds()
-        if self.updateThread == False:
-            self.updateThread = Download(self.listing, self.config, self.dbusHandler)
-            self.updateThread.start()
-        
-    def stopUpdate(self):
-        try:
-            self.updateThread.running = False
-        except:
+        self.listing = Listing(self.config, CONFIGDIR)
+
+        jm = JobManager(True)
+        jm.stats_hook_register (self.job_manager_update,
+                                run_in_main_thread=True)
+
+        # Whether or no an update is in progress.
+        self.am_updating = False
+
+        # After an update an finished, we start the inactivity timer.
+        # If this fires before a new job arrives, we quit.
+        self.inactivity_timer = 0
+
+        # Whether we started in daemon mode, or not.
+        self.daemon = '--daemon' in sys.argv
+
+        if self.daemon:
+            logger.debug("Running in daemon mode: waiting for commands.")
+            self.inactivity_timer = gobject.timeout_add(
+                5 * 60 * 1000, self.inactivity_cb)
+        else:
+            # Update all feeds.
+            logger.debug("Not running in daemon mode: updating all feeds.")
+            gobject.idle_add(self.UpdateAll)
+
+        # If the system becomes idle
+        bus = dbus.SystemBus()
+
+        mce_request_proxy = bus.get_object(
+            'com.nokia.mce', '/com/nokia/mce/request')
+        mce_request_iface = dbus.Interface(
+            mce_request_proxy, 'com.nokia.mce.request')
+        system_idle = mce_request_iface.get_inactivity_status()
+        # Force self.system_inactivity_ind to run: ensure that a state
+        # change occurs.
+        self.system_idle = not system_idle
+        self.system_inactivity_ind(system_idle)
+
+        mce_signal_proxy = bus.get_object(
+            'com.nokia.mce', '/com/nokia/mce/signal')
+        mce_signal_iface = dbus.Interface(
+            mce_signal_proxy, 'com.nokia.mce.signal')
+        mce_signal_iface.connect_to_signal(
+            'system_inactivity_ind', self.system_inactivity_ind)
+
+    def increase_download_parallelism(self):
+        # The system has been idle for a while.  Enable parallel
+        # downloads.
+        logger.debug("Increasing parallelism to 4 workers.")
+        JobManager().num_threads = 4
+        gobject.source_remove (self.increase_download_parallelism_id)
+        del self.increase_download_parallelism_id
+        return False
+
+    def system_inactivity_ind(self, idle):
+        # The system's idle state changed.
+        if (self.system_idle and idle) or (not self.system_idle and not idle):
+            # No change.
+            return
+
+        if not idle:
+            if hasattr (self, 'increase_download_parallelism_id'):
+                gobject.source_remove (self.increase_download_parallelism_id)
+                del self.increase_download_parallelism_id
+        else:
+            self.increase_download_parallelism_id = \
+                gobject.timeout_add_seconds(
+                    60, self.increase_download_parallelism)
+
+        if not idle:
+            logger.debug("Reducing parallelism to 1 worker.")
+            JobManager().num_threads = 1
+
+        self.system_idle = idle
+
+    def job_manager_update(self, jm, old_stats, new_stats, updated_feed):
+        queued = new_stats['jobs-queued']
+        in_progress = new_stats['jobs-in-progress']
+
+        if (queued or in_progress) and not self.am_updating:
+            logger.debug("new update started")
+            self.am_updating = True
+            self.UpdateStarted()
+            self.UpdateProgress(0, 0, in_progress, queued, 0, 0, 0, "")
+
+        if not queued and not in_progress:
+            logger.debug("update finished!")
+            self.am_updating = False
+            self.UpdateFinished()
+            self.ArticleCountUpdated()
+
+            if self.daemon:
+                self.inactivity_timer = gobject.timeout_add(
+                    60 * 1000, self.inactivity_cb)
+            else:
+                logger.debug("update finished, not running in daemon mode: "
+                             "quitting")
+                mainloop.quit()
+
+        if (queued or in_progress) and self.inactivity_timer:
+            gobject.source_remove(self.inactivity_timer)
+            self.inactivity_timer = 0
+
+    def inactivity_cb(self):
+        """
+        The updater has been inactive for a while.  Quit.
+        """
+        assert self.inactivity_timer
+        self.inactivity_timer = 0
+
+        if not self.am_updating:
+            logger.info("Nothing to do for a while.  Quitting.")
+
+            # Make any progress bar go away.
+            try:
+                update_server_object().UpdateProgress(
+                    100, 0, 0, 0, 0, 0, 0, "")
+            except Exception:
+                logger.exception("Sending final progress update")
+
             mainloop.quit()
 
+    def StopUpdate(self):
+        """
+        Stop updating.
+        """
+        super(FeedUpdate, self).stopUpdate()
+
+        JobManager().quit()
+
+    def UpdateAll(self):
+        """
+        Update all feeds.
+        """
+        super(FeedUpdate, self).UpdateAll()
+
+        feeds = self.listing.getListOfFeeds()
+        for k in feeds:
+            self.listing.updateFeed(k)
+        logger.debug("Queued all feeds (%d) for update." % len(feeds))
+
+    def Update(self, feed):
+        """
+        Update a particular feed.
+        """
+        super(FeedUpdate, self).Update(feed)
+
+        # We got a request via dbus.  If we weren't in daemon mode
+        # before, enter it now.
+        self.daemon = True
+
+        self.listing.updateFeed(feed)
+
+
 import dbus.mainloop.glib
 dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
 
-gobject.threads_init()
 mainloop = gobject.MainLoop()
+mainthread.init()
+
+# Acquire our name on the session bus.  If this doesn't work, most
+# likely another update_feeds instance is already running.  In this
+# case, just quit.
+try:
+    bus_name = dbus.service.BusName('org.marcoz.feedingit',
+                                    bus=dbus.SessionBus(),
+                                    do_not_queue=True)
+except Exception:
+    # We failed to acquire our bus name.  Die.
+    try:
+        dbus_proxy = dbus.SessionBus().get_object(
+            'org.freedesktop.DBus', '/org/freedesktop/DBus')
+        dbus_iface = dbus.Interface(dbus_proxy, 'org.freedesktop.DBus')
+        pid = dbus_iface.GetConnectionUnixProcessID('org.marcoz.feedingit')
+        logger.error("update_feeds already running: pid %d." % pid)
+    except Exception, e:
+        logger.error("Getting pid associated with org.marcoz.feedingit: %s"
+                     % str(e))
+        logger.error("update_feeds already running.")
 
-app_lock = get_lock("app_lock")
+    sys.exit(1)
 
-if app_lock != None:
+# Run the updater.  Note: we run this until feed.am_updating is false.
+# Only is this case have all worker threads exited.  If the main
+# thread exits before all threads have exited and the process gets a
+# signal, the Python interpreter is unable to handle the signal and it
+# runs really slow (rescheduling after ever single instruction instead
+# of every few thousand).
+feed = FeedUpdate(bus_name)
+while True:
     try:
-        feed = FeedUpdate()
         mainloop.run()
-        del app_lock
-    except:
-        import traceback
-        file = open("/home/user/.feedingit/feedingit_update.log", "w")
-        traceback.print_exc(file=file)
-        file.close()
-else:
-    file = open("/home/user/.feedingit/feedingit_update.log", "a")
-    file.write("Update in progress")
-    file.close()
-    
+    except KeyboardInterrupt:
+        logger.error("Interrupted.  Quitting.")
+        JobManager().quit()
+
+    if not feed.am_updating:
+        break
+