From 8ce31474ed5fabdcc3a47b7e494f40adbab77fcb Mon Sep 17 00:00:00 2001 From: "Neal H. Walfield" Date: Sat, 23 Jul 2011 11:13:47 +0200 Subject: [PATCH] Add support for Woodchuck. --- Makefile | 4 ++ src/jobmanager.py | 14 ++++- src/rss_sqlite.py | 160 +++++++++++++++++++++++++++++++++++++++++++++++++++-- src/wc.py | 80 +++++++++++++++++++++++++++ 4 files changed, 251 insertions(+), 7 deletions(-) create mode 100644 src/wc.py diff --git a/Makefile b/Makefile index 7d2ca79..f322164 100644 --- a/Makefile +++ b/Makefile @@ -31,6 +31,10 @@ install: install src/aboutdialog.py ${DESTDIR}/opt/FeedingIt install src/rss_sqlite.py ${DESTDIR}/opt/FeedingIt install src/style.py ${DESTDIR}/opt/FeedingIt + install src/mainthread.py ${DESTDIR}/opt/FeedingIt + install src/jobmanager.py ${DESTDIR}/opt/FeedingIt + install src/httpprogresshandler.py ${DESTDIR}/opt/FeedingIt + install src/wc.py ${DESTDIR}/opt/FeedingIt install -d ${DESTDIR}/usr/share/applications/hildon install src/FeedingIt.desktop ${DESTDIR}/usr/share/applications/hildon install -d ${DESTDIR}/usr/share/icons/hicolor/48x48/apps/ diff --git a/src/jobmanager.py b/src/jobmanager.py index 8ca4df8..42c7441 100644 --- a/src/jobmanager.py +++ b/src/jobmanager.py @@ -40,7 +40,10 @@ class JobRunner(threading.Thread): have_lock = True self.job_manager.lock.acquire () try: - while self.job_manager.pause == 0 and not self.job_manager.do_quit: + while (self.job_manager.pause == 0 + and not self.job_manager.do_quit + and (len (self.job_manager.threads) + <= self.job_manager.num_threads)): try: _, key, job = heapq.heappop (self.job_manager.queue) except IndexError: @@ -127,7 +130,7 @@ class _JobManager(object): self.started = started # The maximum number of threads to use for executing jobs. - self.num_threads = num_threads + self._num_threads = num_threads # List of jobs (priority, key, job) that are queued for # execution. @@ -159,6 +162,13 @@ class _JobManager(object): self.lock.release() return wrapper + def get_num_threads(self): + return self._num_threads + def set_num_threads(self, value): + self._num_threads = value + self.tickle () + num_threads = property(get_num_threads, set_num_threads) + @_lock def start(self): """ diff --git a/src/rss_sqlite.py b/src/rss_sqlite.py index e8b44a2..b3dc2b0 100644 --- a/src/rss_sqlite.py +++ b/src/rss_sqlite.py @@ -39,9 +39,13 @@ from calendar import timegm from updatedbus import get_lock, release_lock import threading import traceback +from wc import wc, wc_init +import woodchuck from jobmanager import JobManager import mainthread from httpprogresshandler import HTTPProgressHandler +import random +import sys def getId(string): return md5.new(string).hexdigest() @@ -101,7 +105,7 @@ class Feed: outf.write(f.read()) f.close() outf.close() - except (urllib2.HTTPError, urllib2.URLError), exception: + except (urllib2.HTTPError, urllib2.URLError, IOError), exception: print ("Could not download image %s: %s" % (abs_url, str (exception))) return None @@ -109,7 +113,6 @@ class Feed: exception = sys.exc_info()[0] print "Downloading image: %s" % abs_url - import traceback traceback.print_exc() try: @@ -165,6 +168,28 @@ class Feed: expiry = float(expiryTime) * 3600. currentTime = 0 + + have_woodchuck = mainthread.execute (wc().available) + + def wc_success(): + try: + wc().stream_register (self.key, "", 6 * 60 * 60) + except woodchuck.ObjectExistsError: + pass + try: + wc()[self.key].updated ( + indicator=(woodchuck.Indicator.ApplicationVisual + |woodchuck.Indicator.StreamWide), + transferred_down=progress_handler.stats['received'], + transferred_up=progress_handler.stats['sent'], + transfer_time=download_start, + transfer_duration=download_duration, + new_objects=len (tmp.entries), + objects_inline=len (tmp.entries)) + except KeyError: + print "Failed to register update with woodchuck!" + pass + http_status = tmp.get ('status', 200) # Check if the parse was succesful. If the http status code @@ -174,6 +199,8 @@ class Feed: # parse fails. But really, everything went great! Check for # this first. if http_status == 304: + print "%s: No changes to feed." % (self.key,) + mainthread.execute (wc_success, async=True) success = True elif len(tmp["entries"])==0 and not tmp.version: # An error occured fetching or parsing the feed. (Version @@ -183,6 +210,26 @@ class Feed: % (url, str (tmp.version), str (tmp.get ('bozo_exception', 'Unknown error')))) print tmp + if have_woodchuck: + def e(): + print "%s: stream update failed!" % self.key + + try: + # It's not easy to get the feed's title from here. + # At the latest, the next time the application is + # started, we'll fix up the human readable name. + wc().stream_register (self.key, "", 6 * 60 * 60) + except woodchuck.ObjectExistsError: + pass + ec = woodchuck.TransferStatus.TransientOther + if 300 <= http_status and http_status < 400: + ec = woodchuck.TransferStatus.TransientNetwork + if 400 <= http_status and http_status < 500: + ec = woodchuck.TransferStatus.FailureGone + if 500 <= http_status and http_status < 600: + ec = woodchuck.TransferStatus.TransientNetwork + wc()[self.key].update_failed(ec) + mainthread.execute (e, async=True) else: currentTime = time.time() # The etag and modified value should only be updated if the content was not null @@ -303,9 +350,41 @@ class Feed: # except: # pass + # Register the object with Woodchuck and mark it as + # downloaded. + if have_woodchuck: + def e(): + try: + obj = wc()[self.key].object_register( + object_identifier=id, + human_readable_name=tmpEntry["title"]) + except woodchuck.ObjectExistsError: + obj = wc()[self.key][id] + else: + # If the entry does not contain a publication + # time, the attribute won't exist. + pubtime = entry.get ('date_parsed', None) + if pubtime: + obj.publication_time = time.mktime (pubtime) + received = (progress_handler.stats['received'] + - received_base) + sent = progress_handler.stats['sent'] - sent_base + obj.transferred ( + indicator=(woodchuck.Indicator.ApplicationVisual + |woodchuck.Indicator.StreamWide), + transferred_down=received, + transferred_up=sent, + object_size=object_size) + mainthread.execute(e, async=True) self.db.commit() + print ("%s: Update successful: transferred: %d/%d; objects: %d)" + % (self.key, + progress_handler.stats['sent'], + progress_handler.stats['received'], + len (tmp.entries))) + mainthread.execute (wc_success, async=True) success = True rows = self.db.execute("SELECT id FROM feed WHERE (read=0 AND updated?;", (rank,) ) diff --git a/src/wc.py b/src/wc.py new file mode 100644 index 0000000..15afc62 --- /dev/null +++ b/src/wc.py @@ -0,0 +1,80 @@ +# Copyright (c) 2011 Neal H. Walfield +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import traceback + +# Don't fail if the Woodchuck modules are not available. Just disable +# Woodchuck's functionality. + +# Whether we imported the woodchuck modules successfully. +woodchuck_imported = True +try: + import pywoodchuck + from pywoodchuck import PyWoodchuck + from pywoodchuck import woodchuck +except ImportError, exception: + print ("Unable to load Woodchuck modules: disabling Woodchuck support: %s" + % traceback.format_exc ()) + woodchuck_imported = False + class PyWoodchuck (object): + def available(): + return False + woodchuck = None + +# The default channel refresh interval: 6 hours. +refresh_interval = 6 * 60 * 60 + +class mywoodchuck (PyWoodchuck): + def __init__(self, listing, *args): + PyWoodchuck.__init__ (self, *args) + + self.listing = listing + + # Woodchuck upcalls. + def stream_update_cb(self, stream): + print ("stream update called on %s (%s)" + % (stream.human_readable_name, stream.identifier,)) + + # Make sure no one else is concurrently updating this + # feed. + try: + self.listing.updateFeed(stream.identifier) + except: + print ("Updating %s: %s" + % (stream.identifier, traceback.format_exc ())) + + def object_transfer_cb(self, stream, object, + version, filename, quality): + log ("object transfer called on %s (%s) in stream %s (%s)" + % (object.human_readable_name, object.identifier, + stream.human_readable_name, stream.identifier)) + +_w = None +def wc_init(listing): + global _w + assert _w is None + + _w = mywoodchuck (listing, "FeedingIt", "org.maemo.feedingit") + + if not woodchuck_imported or not _w.available (): + print "Unable to contact Woodchuck server." + else: + print "Woodchuck appears to be available." + +def wc(): + """Connect to the woodchuck server and initialize any state.""" + global _w + assert _w is not None + return _w -- 1.7.9.5