#
# Copyright (c) 2007-2008 INdT.
+# Copyright (c) 2011 Neal H. Walfield
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# Description : Simple RSS Reader
# ============================================================================
-from rss import Listing
+from rss_sqlite import Listing
from config import Config
+from updatedbus import UpdateServerObject
-import threading
import os
+import traceback
+import sys
+import dbus
+
+from jobmanager import JobManager
+import mainthread
+
import gobject
+gobject.threads_init()
-CONFIGDIR="/home/user/.feedingit/"
+import logging
+logger = logging.getLogger(__name__)
+import debugging
+debugging.init(dot_directory=".feedingit", program_name="update_feeds")
+
+from updatedbus import update_server_object
+
+CONFIGDIR = os.environ.get("HOME", "/home/user") + "/.feedingit/"
#DESKTOP_FILE = "/usr/share/applications/hildon-status-menu/feedingit_status.desktop"
-dbug = False
from socket import setdefaulttimeout
timeout = 5
setdefaulttimeout(timeout)
del timeout
-from updatedbus import UpdateServerObject, get_lock
-
-class Download(threading.Thread):
- def __init__(self, listing, config, dbusHandler):
- global dbug
- threading.Thread.__init__(self)
- self.running = True
- self.listing = listing
- self.config = config
- self.dbusHandler = dbusHandler
- if dbug:
- self.dbug = open(CONFIGDIR+"dbug.log", "w")
-
- def run(self):
- global dbug
- if dbug:
- self.dbug.write("Starting updates\n")
- try:
- self.dbusHandler.UpdateStarted()
- (use_proxy, proxy) = self.config.getProxy()
- for key in self.listing.getListOfFeeds():
- if dbug:
- self.dbug.write("updating %s\n" %key)
- try:
- if use_proxy:
- from urllib2 import install_opener, build_opener
- install_opener(build_opener(proxy))
- self.listing.updateFeed(key, self.config.getExpiry(), proxy=proxy, imageCache=self.config.getImageCache() )
- else:
- self.listing.updateFeed(key, self.config.getExpiry(), imageCache=self.config.getImageCache() )
- except:
- import traceback
- file = open("/home/user/.feedingit/feedingit_update.log", "a")
- traceback.print_exc(file=file)
- file.close()
- if not self.running:
- if dbug:
- self.dbug.write("received stopUpdate after %s\n" %key)
- break
- self.dbusHandler.UpdateFinished()
- self.dbusHandler.ArticleCountUpdated()
- if dbug:
- self.dbug.write("Dbus ArticleCountUpdated signal sent\n")
- except:
- pass
- self.listing.saveConfig()
- if dbug:
- self.dbug.write("About to main_quit\n")
- mainloop.quit()
- if dbug:
- self.dbug.write("After main_quit\n")
- self.dbug.close()
-
-class FeedUpdate():
- def __init__(self):
- self.listing = Listing(CONFIGDIR)
+class FeedUpdate(UpdateServerObject):
+ def __init__(self, bus_name):
+ UpdateServerObject.__init__(self, bus_name)
+
self.config = Config(self, CONFIGDIR+"config.ini")
- self.dbusHandler = UpdateServerObject(self)
- self.updateThread = False
-
- def automaticUpdate(self):
- #self.listing.updateFeeds()
- if self.updateThread == False:
- self.updateThread = Download(self.listing, self.config, self.dbusHandler)
- self.updateThread.start()
-
- def stopUpdate(self):
- try:
- self.updateThread.running = False
- except:
+ self.listing = Listing(self.config, CONFIGDIR)
+
+ jm = JobManager(True)
+ jm.stats_hook_register (self.job_manager_update,
+ run_in_main_thread=True)
+
+ # Whether or no an update is in progress.
+ self.am_updating = False
+
+ # After an update an finished, we start the inactivity timer.
+ # If this fires before a new job arrives, we quit.
+ self.inactivity_timer = 0
+
+ # Whether we started in daemon mode, or not.
+ self.daemon = '--daemon' in sys.argv
+
+ if self.daemon:
+ logger.debug("Running in daemon mode: waiting for commands.")
+ self.inactivity_timer = gobject.timeout_add(
+ 5 * 60 * 1000, self.inactivity_cb)
+ else:
+ # Update all feeds.
+ logger.debug("Not running in daemon mode: updating all feeds.")
+ gobject.idle_add(self.UpdateAll)
+
+ # If the system becomes idle
+ bus = dbus.SystemBus()
+
+ mce_request_proxy = bus.get_object(
+ 'com.nokia.mce', '/com/nokia/mce/request')
+ mce_request_iface = dbus.Interface(
+ mce_request_proxy, 'com.nokia.mce.request')
+ system_idle = mce_request_iface.get_inactivity_status()
+ # Force self.system_inactivity_ind to run: ensure that a state
+ # change occurs.
+ self.system_idle = not system_idle
+ self.system_inactivity_ind(system_idle)
+
+ mce_signal_proxy = bus.get_object(
+ 'com.nokia.mce', '/com/nokia/mce/signal')
+ mce_signal_iface = dbus.Interface(
+ mce_signal_proxy, 'com.nokia.mce.signal')
+ mce_signal_iface.connect_to_signal(
+ 'system_inactivity_ind', self.system_inactivity_ind)
+
+ def increase_download_parallelism(self):
+ # The system has been idle for a while. Enable parallel
+ # downloads.
+ logger.debug("Increasing parallelism to 4 workers.")
+ JobManager().num_threads = 4
+ gobject.source_remove (self.increase_download_parallelism_id)
+ del self.increase_download_parallelism_id
+ return False
+
+ def system_inactivity_ind(self, idle):
+ # The system's idle state changed.
+ if (self.system_idle and idle) or (not self.system_idle and not idle):
+ # No change.
+ return
+
+ if not idle:
+ if hasattr (self, 'increase_download_parallelism_id'):
+ gobject.source_remove (self.increase_download_parallelism_id)
+ del self.increase_download_parallelism_id
+ else:
+ self.increase_download_parallelism_id = \
+ gobject.timeout_add_seconds(
+ 60, self.increase_download_parallelism)
+
+ if not idle:
+ logger.debug("Reducing parallelism to 1 worker.")
+ JobManager().num_threads = 1
+
+ self.system_idle = idle
+
+ def job_manager_update(self, jm, old_stats, new_stats, updated_feed):
+ queued = new_stats['jobs-queued']
+ in_progress = new_stats['jobs-in-progress']
+
+ if (queued or in_progress) and not self.am_updating:
+ logger.debug("new update started")
+ self.am_updating = True
+ self.UpdateStarted()
+ self.UpdateProgress(0, 0, in_progress, queued, 0, 0, 0, "")
+
+ if not queued and not in_progress:
+ logger.debug("update finished!")
+ self.am_updating = False
+ self.UpdateFinished()
+ self.ArticleCountUpdated()
+
+ if self.daemon:
+ self.inactivity_timer = gobject.timeout_add(
+ 60 * 1000, self.inactivity_cb)
+ else:
+ logger.debug("update finished, not running in daemon mode: "
+ "quitting")
+ mainloop.quit()
+
+ if (queued or in_progress) and self.inactivity_timer:
+ gobject.source_remove(self.inactivity_timer)
+ self.inactivity_timer = 0
+
+ def inactivity_cb(self):
+ """
+ The updater has been inactive for a while. Quit.
+ """
+ assert self.inactivity_timer
+ self.inactivity_timer = 0
+
+ if not self.am_updating:
+ logger.info("Nothing to do for a while. Quitting.")
+
+ # Make any progress bar go away.
+ try:
+ update_server_object().UpdateProgress(
+ 100, 0, 0, 0, 0, 0, 0, "")
+ except Exception:
+ logger.exception("Sending final progress update")
+
mainloop.quit()
+ def StopUpdate(self):
+ """
+ Stop updating.
+ """
+ super(FeedUpdate, self).stopUpdate()
+
+ JobManager().quit()
+
+ def UpdateAll(self):
+ """
+ Update all feeds.
+ """
+ super(FeedUpdate, self).UpdateAll()
+
+ feeds = self.listing.getListOfFeeds()
+ for k in feeds:
+ self.listing.updateFeed(k)
+ logger.debug("Queued all feeds (%d) for update." % len(feeds))
+
+ def Update(self, feed):
+ """
+ Update a particular feed.
+ """
+ super(FeedUpdate, self).Update(feed)
+
+ # We got a request via dbus. If we weren't in daemon mode
+ # before, enter it now.
+ self.daemon = True
+
+ self.listing.updateFeed(feed)
+
+
import dbus.mainloop.glib
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
-gobject.threads_init()
mainloop = gobject.MainLoop()
+mainthread.init()
+
+# Acquire our name on the session bus. If this doesn't work, most
+# likely another update_feeds instance is already running. In this
+# case, just quit.
+try:
+ bus_name = dbus.service.BusName('org.marcoz.feedingit',
+ bus=dbus.SessionBus(),
+ do_not_queue=True)
+except Exception:
+ # We failed to acquire our bus name. Die.
+ try:
+ dbus_proxy = dbus.SessionBus().get_object(
+ 'org.freedesktop.DBus', '/org/freedesktop/DBus')
+ dbus_iface = dbus.Interface(dbus_proxy, 'org.freedesktop.DBus')
+ pid = dbus_iface.GetConnectionUnixProcessID('org.marcoz.feedingit')
+ logger.error("update_feeds already running: pid %d." % pid)
+ except Exception, e:
+ logger.error("Getting pid associated with org.marcoz.feedingit: %s"
+ % str(e))
+ logger.error("update_feeds already running.")
-app_lock = get_lock("app_lock")
+ sys.exit(1)
-if app_lock != None:
+# Run the updater. Note: we run this until feed.am_updating is false.
+# Only is this case have all worker threads exited. If the main
+# thread exits before all threads have exited and the process gets a
+# signal, the Python interpreter is unable to handle the signal and it
+# runs really slow (rescheduling after ever single instruction instead
+# of every few thousand).
+feed = FeedUpdate(bus_name)
+while True:
try:
- feed = FeedUpdate()
mainloop.run()
- del app_lock
- except:
- import traceback
- file = open("/home/user/.feedingit/feedingit_update.log", "w")
- traceback.print_exc(file=file)
- file.close()
-else:
- file = open("/home/user/.feedingit/feedingit_update.log", "a")
- file.write("Update in progress")
- file.close()
-
+ except KeyboardInterrupt:
+ logger.error("Interrupted. Quitting.")
+ JobManager().quit()
+
+ if not feed.am_updating:
+ break
+