def start_server():
global listing
- listing = Listing(CONFIGDIR)
+ listing = Listing(config, CONFIGDIR)
httpd = BaseHTTPServer.HTTPServer(("127.0.0.1", PORT), Handler)
httpd.serve_forever()
# download = Download(listing, feeds)
# download.start()
-class Download(Thread):
- def __init__(self, listing, keys):
- Thread.__init__(self)
- self.listing = listing
- self.keys = keys
-
- def run (self):
- updateDbusHandler.UpdateStarted()
- for key in self.keys:
- print "Start update: %s" % key
- updatingFeeds.append(key)
- (use_proxy, proxy) = config.getProxy()
- try:
- if use_proxy:
- self.listing.updateFeed(key, proxy=proxy, imageCache=config.getImageCache() )
- else:
- self.listing.updateFeed(key, imageCache=config.getImageCache() )
- except:
- print "Error updating feed: %s" %key
- updatingFeeds.remove(key)
- print "End update: %s" % key
- updateDbusHandler.UpdateFinished()
-
class Handler(BaseHTTPServer.BaseHTTPRequestHandler):
def updateAll(self):
- feeds = []
for cat in listing.getListOfCategories():
for feed in listing.getSortedListOfKeys("Manual", category=cat):
- feeds.append(feed)
- download = Download(listing, feeds)
- download.start()
+ listing.updateFeed(feed)
def openTaskSwitch(self):
import subprocess
-# Start the HTTP server in a new thread
-thread.start_new_thread(start_server, ())
-
# Initialize the glib mainloop, for dbus purposes
from feedingitdbus import ServerObject
from updatedbus import UpdateServerObject, get_lock
gobject.threads_init()
import dbus.mainloop.glib
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
+import mainthread
+mainthread.init()
+from jobmanager import JobManager
+JobManager(True)
global updateDbusHandler, dbusHandler
app = App()
dbusHandler = ServerObject(app)
updateDbusHandler = UpdateServerObject(app)
+# Start the HTTP server in a new thread
+thread.start_new_thread(start_server, ())
+
mainloop = gobject.MainLoop()
mainloop.run()
#
# Copyright (c) 2007-2008 INdT.
+# Copyright (c) 2011 Neal H. Walfield
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
from updatedbus import UpdateServerObject, get_lock
from config import Config
from cgi import escape
+import weakref
from rss_sqlite import Listing
from opml import GetOpmlData, ExportOpmlData
-from urllib2 import install_opener, build_opener
+import mainthread
+from jobmanager import JobManager
from socket import setdefaulttimeout
timeout = 5
def getData(self):
return self.nameEntry.get_text()
-class Download(Thread):
- def __init__(self, listing, key, config):
- Thread.__init__(self)
- self.listing = listing
- self.key = key
- self.config = config
-
- def run (self):
- (use_proxy, proxy) = self.config.getProxy()
- key_lock = get_lock(self.key)
- if key_lock != None:
- if use_proxy:
- self.listing.updateFeed(self.key, self.config.getExpiry(), proxy=proxy, imageCache=self.config.getImageCache() )
- else:
- self.listing.updateFeed(self.key, self.config.getExpiry(), imageCache=self.config.getImageCache() )
- del key_lock
-
-
class DownloadBar(gtk.ProgressBar):
- def __init__(self, parent, listing, listOfKeys, config, single=False):
-
- update_lock = get_lock("update_lock")
- if update_lock != None:
- gtk.ProgressBar.__init__(self)
- self.listOfKeys = listOfKeys[:]
- self.listing = listing
- self.total = len(self.listOfKeys)
- self.config = config
- self.current = 0
- self.single = single
- (use_proxy, proxy) = self.config.getProxy()
- if use_proxy:
- opener = build_opener(proxy)
- else:
- opener = build_opener()
-
- opener.addheaders = [('User-agent', USER_AGENT)]
- install_opener(opener)
-
- if self.total>0:
- # In preparation for i18n/l10n
- def N_(a, b, n):
- return (a if n == 1 else b)
-
- self.set_text(N_('Updating %d feed', 'Updating %d feeds', self.total) % self.total)
-
- self.fraction = 0
- self.set_fraction(self.fraction)
- self.show_all()
- # Create a timeout
- self.timeout_handler_id = gobject.timeout_add(50, self.update_progress_bar)
-
- def update_progress_bar(self):
- #self.progress_bar.pulse()
- if activeCount() < 4:
- x = activeCount() - 1
- k = len(self.listOfKeys)
- fin = self.total - k - x
- fraction = float(fin)/float(self.total) + float(x)/(self.total*2.)
- #print x, k, fin, fraction
- self.set_fraction(fraction)
-
- if len(self.listOfKeys)>0:
- self.current = self.current+1
- key = self.listOfKeys.pop()
- #if self.single == True:
- # Check if the feed is being displayed
- download = Download(self.listing, key, self.config)
- download.start()
- return True
- elif activeCount() > 1:
- return True
+ @classmethod
+ def class_init(cls):
+ if hasattr (cls, 'class_init_done'):
+ return
+
+ jm = JobManager ()
+ jm.stats_hook_register (cls.update_progress,
+ run_in_main_thread=True)
+
+ cls.downloadbars = []
+ # Total number of jobs we are monitoring.
+ cls.total = 0
+ # Number of jobs complete (of those that we are monitoring).
+ cls.done = 0
+ # Percent complete.
+ cls.progress = 0
+
+ cls.class_init_done = True
+
+ def __init__(self, parent):
+ self.class_init ()
+
+ gtk.ProgressBar.__init__(self)
+
+ self.downloadbars.append(weakref.ref (self))
+ self.set_fraction(0)
+ self.__class__.update_bars()
+ self.show_all()
+
+ @classmethod
+ def downloading(cls):
+ return hasattr (cls, 'jobs_at_start')
+
+ @classmethod
+ def update_progress(cls, jm, old_stats, new_stats, updated_feed):
+ if not cls.downloading():
+ cls.jobs_at_start = old_stats['jobs-completed']
+
+ if not cls.downloadbars:
+ return
+
+ if new_stats['jobs-in-progress'] + new_stats['jobs-queued'] == 0:
+ del cls.jobs_at_start
+ for ref in cls.downloadbars:
+ bar = ref ()
+ if bar is None:
+ # The download bar disappeared.
+ cls.downloadbars.remove (ref)
+ else:
+ bar.emit("download-done", None)
+ return
+
+ # This should never be called if new_stats['jobs'] is 0, but
+ # just in case...
+ cls.total = max (1, new_stats['jobs'] - cls.jobs_at_start)
+ cls.done = new_stats['jobs-completed'] - cls.jobs_at_start
+ cls.progress = 1 - (new_stats['jobs-in-progress'] / 2.
+ + new_stats['jobs-queued']) / cls.total
+ cls.update_bars()
+
+ if updated_feed:
+ for ref in cls.downloadbars:
+ bar = ref ()
+ if bar is None:
+ # The download bar disappeared.
+ cls.downloadbars.remove (ref)
+ else:
+ bar.emit("download-done", updated_feed)
+
+ @classmethod
+ def update_bars(cls):
+ # In preparation for i18n/l10n
+ def N_(a, b, n):
+ return (a if n == 1 else b)
+
+ text = (N_('Updated %d of %d feeds ', 'Updated %d of %d feeds',
+ cls.total)
+ % (cls.done, cls.total))
+
+ for ref in cls.downloadbars:
+ bar = ref ()
+ if bar is None:
+ # The download bar disappeared.
+ cls.downloadbars.remove (ref)
else:
- #self.waitingWindow.destroy()
- #self.destroy()
- try:
- del self.update_lock
- except:
- pass
- self.emit("download-done", "success")
- return False
- return True
-
-
+ bar.set_text(text)
+ bar.set_fraction(cls.progress)
+
class SortList(hildon.StackableWindow):
def __init__(self, parent, listing, feedingit, after_closing, category=None):
hildon.StackableWindow.__init__(self)
self.set_app_menu(menu)
menu.show_all()
+ self.main_vbox = gtk.VBox(False, 0)
+ self.add(self.main_vbox)
+
+ self.pannableFeed = None
self.displayFeed()
+
+ if DownloadBar.downloading ():
+ self.show_download_bar ()
self.connect('configure-event', self.on_configure_event)
self.connect("destroy", self.destroyWindow)
return escape(unescape(title).replace("<em>","").replace("</em>","").replace("<nobr>","").replace("</nobr>","").replace("<wbr>",""))
def displayFeed(self):
+ if self.pannableFeed:
+ self.pannableFeed.destroy()
+
self.pannableFeed = hildon.PannableArea()
self.pannableFeed.set_property('hscrollbar-policy', gtk.POLICY_NEVER)
markup = ENTRY_TEMPLATE % (self.config.getFontSize(), "No Articles To Display")
self.feedItems.append((markup, ""))
- self.add(self.pannableFeed)
+ self.main_vbox.pack_start(self.pannableFeed)
self.show_all()
def clear(self):
self.displayFeed()
def button_update_clicked(self, button):
- #bar = DownloadBar(self, self.listing, [self.key,], self.config )
+ self.listing.updateFeed (self.key, priority=-1)
+
+ def show_download_bar(self):
if not type(self.downloadDialog).__name__=="DownloadBar":
- self.pannableFeed.destroy()
- self.vbox = gtk.VBox(False, 10)
- self.downloadDialog = DownloadBar(self.window, self.listing, [self.key,], self.config, single=True )
- self.downloadDialog.connect("download-done", self.onDownloadsDone)
- self.vbox.pack_start(self.downloadDialog, expand=False, fill=False)
- self.add(self.vbox)
+ self.downloadDialog = DownloadBar(self.window)
+ self.downloadDialog.connect("download-done", self.onDownloadDone)
+ self.main_vbox.pack_end(self.downloadDialog,
+ expand=False, fill=False)
self.show_all()
-
- def onDownloadsDone(self, *widget):
- self.vbox.destroy()
- self.feed = self.listing.getFeed(self.key)
- self.displayFeed()
- self.updateDbusHandler.ArticleCountUpdated()
+ def onDownloadDone(self, widget, feed):
+ if feed == self.feed or feed is None:
+ self.downloadDialog.destroy()
+ self.downloadDialog = False
+ self.feed = self.listing.getFeed(self.key)
+ self.displayFeed()
+ self.updateDbusHandler.ArticleCountUpdated()
+
def buttonReadAllClicked(self, button):
#self.clear()
self.feed.markAllAsRead()
self.stopButton.destroy()
except:
pass
- self.listing = Listing(CONFIGDIR)
-
+ self.listing = Listing(self.config, CONFIGDIR)
+
self.downloadDialog = False
try:
self.orientation = FremantleRotation(__appname__, main_window=self.window, app=self)
self.checkAutoUpdate()
hildon.hildon_gtk_window_set_progress_indicator(self.window, 0)
- gobject.idle_add(self.enableDbus)
+ gobject.idle_add(self.late_init)
+ def job_manager_update(self, jm, old_stats, new_stats, updated_feed):
+ if (not self.downloadDialog
+ and new_stats['jobs-in-progress'] + new_stats['jobs-queued'] > 0):
+ self.updateDbusHandler.UpdateStarted()
+
+ self.downloadDialog = DownloadBar(self.window)
+ self.downloadDialog.connect("download-done", self.onDownloadDone)
+ self.mainVbox.pack_end(self.downloadDialog, expand=False, fill=False)
+ self.mainVbox.show_all()
+
+ if self.__dict__.get ('disp', None):
+ self.disp.show_download_bar ()
+
+ def onDownloadDone(self, widget, feed):
+ if feed is None:
+ self.downloadDialog.destroy()
+ self.downloadDialog = False
+ self.displayListing()
+ self.updateDbusHandler.UpdateFinished()
+ self.updateDbusHandler.ArticleCountUpdated()
+
def stop_running_update(self, button):
self.stopButton.set_sensitive(False)
import dbus
iface = dbus.Interface(remote_object, 'org.marcoz.feedingit')
iface.StopUpdate()
- def enableDbus(self):
+ def late_init(self):
self.dbusHandler = ServerObject(self)
self.updateDbusHandler = UpdateServerObject(self)
+ jm = JobManager()
+ jm.stats_hook_register (self.job_manager_update,
+ run_in_main_thread=True)
+ JobManager(True)
+
def button_markAll(self, button):
for key in self.listing.getListOfFeeds():
feed = self.listing.getFeed(key)
SortList(self.window, self.listing, self, after_closing)
def button_update_clicked(self, button, key):
- if not type(self.downloadDialog).__name__=="DownloadBar":
- self.updateDbusHandler.UpdateStarted()
- self.downloadDialog = DownloadBar(self.window, self.listing, self.listing.getListOfFeeds(), self.config )
- self.downloadDialog.connect("download-done", self.onDownloadsDone)
- self.mainVbox.pack_end(self.downloadDialog, expand=False, fill=False)
- self.mainVbox.show_all()
+ for k in self.listing.getListOfFeeds():
+ self.listing.updateFeed (k)
#self.displayListing()
def onDownloadsDone(self, *widget):
def onFeedClosedTimeout(self):
del self.feed_lock
self.updateDbusHandler.ArticleCountUpdated()
-
+
+ def quit(self, *args):
+ self.window.hide()
+
+ if hasattr (self, 'app_lock'):
+ del self.app_lock
+
+ # Wait until all slave threads have properly exited before
+ # terminating the mainloop.
+ jm = JobManager()
+ jm.quit ()
+ stats = jm.stats()
+ if stats['jobs-in-progress'] == 0 and stats['jobs-queued'] == 0:
+ gtk.main_quit ()
+ else:
+ gobject.timeout_add(500, self.quit)
+
+ return False
+
def run(self):
- self.window.connect("destroy", gtk.main_quit)
+ self.window.connect("destroy", self.quit)
gtk.main()
- del self.app_lock
def prefsClosed(self, *widget):
try:
def stopUpdate(self):
# Not implemented in the app (see update_feeds.py)
try:
- self.downloadDialog.listOfKeys = []
+ JobManager().cancel ()
except:
pass
return status
if __name__ == "__main__":
+ mainthread.init ()
+
gobject.signal_new("feed-closed", DisplayFeed, gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, (gobject.TYPE_PYOBJECT,))
gobject.signal_new("article-closed", DisplayArticle, gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, (gobject.TYPE_PYOBJECT,))
gobject.signal_new("article-deleted", DisplayArticle, gobject.SIGNAL_RUN_LAST, gobject.TYPE_NONE, (gobject.TYPE_PYOBJECT,))
from ConfigParser import RawConfigParser
from gconf import client_get_default
from urllib2 import ProxyHandler
+from mainthread import mainthread
VERSION = "52"
return ranges["orientation"].index(self.config["orientation"])
def getImageCache(self):
return self.config["imageCache"]
+ @mainthread
def getProxy(self):
if self.config["proxy"] == False:
return (False, None)
--- /dev/null
+#!/usr/bin/env python2.5
+
+# Copyright (c) 2011 Neal H. Walfield <neal@walfield.org>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import urllib2
+import httplib
+import time
+
+class ProgressSocket(object):
+ """
+ Monitor what is being sent and received.
+ """
+ def __init__(self, socket, connection):
+ self.socket = socket
+ self.connection = connection
+
+ def __getattribute__(self, attr):
+ # print "%s.__getattribute__(%s)" % (self.__class__.__name__, attr)
+
+ def send(data):
+ # 100k at a time.
+ bs = 100 * 1024
+ sent = 0
+ while sent < len (data):
+ remaining = len (data) - sent
+ if remaining < bs:
+ amount = remaining
+ else:
+ amount = bs
+
+ self.socket.sendall(data[sent:sent+amount])
+ sent += amount
+ self.connection.stats['sent'] += amount
+ self.connection.opener.stats['sent'] += amount
+
+ if self.connection.callback is not None:
+ self.connection.callback ()
+
+ def read(*args, **kwargs):
+ data = self.socket.read (*args, **kwargs)
+ # print "GOT: %s" % (data[0:240],)
+ self.connection.stats['received'] += len (data)
+ self.connection.opener.stats['received'] += len (data)
+ if self.connection.callback is not None:
+ self.connection.callback ()
+ return data
+
+ if attr == 'send' or attr == 'sendall':
+ return send
+ if attr == 'read':
+ return read
+
+ try:
+ return super (ProgressSocket, self).__getattribute__(attr)
+ except AttributeError:
+ socket = super (ProgressSocket, self).__getattribute__('socket')
+ return socket.__getattribute__(attr)
+
+ def makefile(self, mode, bufsize):
+ return ProgressSocket (socket=self.socket.makefile(mode, bufsize),
+ connection=self.connection)
+
+ def close(self):
+ return self.socket.close ()
+
+def HTTPProgressConnectionBuilder(callback, opener):
+ class HTTPProgressConnection(httplib.HTTPConnection):
+ def __init__(self, *args, **kwargs):
+ self.method = None
+ self.url = None
+ return httplib.HTTPConnection.__init__ (self, *args, **kwargs)
+
+ def putrequest(self, method, url, *args, **kwargs):
+ self.method = method
+ self.url = url
+ return httplib.HTTPConnection.putrequest (
+ self, method, url, *args, **kwargs)
+
+ def connect(self):
+ httplib.HTTPConnection.connect(self)
+ # Wrap the socket.
+ self.sock = ProgressSocket(socket=self.sock,
+ connection=self)
+
+ HTTPProgressConnection.callback = callback
+ HTTPProgressConnection.opener = opener
+ HTTPProgressConnection.stats \
+ = {'sent': 0, 'received': 0, 'started':time.time()}
+ return HTTPProgressConnection
+
+class HTTPProgressHandler(urllib2.HTTPHandler):
+ def __init__(self, callback):
+ self.callback = callback
+ self.stats = {'sent': 0, 'received': 0, 'started':time.time()}
+ return urllib2.HTTPHandler.__init__(self)
+
+ def http_open(self, request):
+ return self.do_open(
+ HTTPProgressConnectionBuilder(self.callback, self),
+ request)
+
+if __name__ == '__main__':
+ def callback(connection):
+ req = ""
+ if connection.method:
+ req += connection.method + " "
+ req += connection.host + ':' + str (connection.port)
+ if connection.url:
+ req += connection.url
+
+ cstats = connection.stats
+ ostats = connection.opener.stats
+
+ print (("%s: connection: %d sent, %d received: %d kb/s; "
+ + "opener: %d sent, %d received, %d kb/s")
+ % (req,
+ cstats['sent'], cstats['received'],
+ ((cstats['sent'] + cstats['received'])
+ / (time.time() - cstats['started']) / 1024),
+ ostats['sent'], ostats['received'],
+ ((ostats['sent'] + ostats['received'])
+ / (time.time() - ostats['started']) / 1024)))
+
+ opener = urllib2.build_opener(HTTPProgressHandler(callback))
+
+ data = opener.open ('http://google.com')
+ downloaded = 0
+ for d in data:
+ downloaded += len (d)
+ print "Document is %d bytes in size" % (downloaded,)
--- /dev/null
+#!/usr/bin/env python2.5
+
+# Copyright (c) 2011 Neal H. Walfield <neal@walfield.org>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import threading
+import thread
+import traceback
+import heapq
+import sys
+import mainthread
+
+def debug(*args):
+ if False:
+ sys.stdout.write(*args)
+ sys.stdout.write("\n")
+
+# The default priority. Like nice(), a smaller numeric priority
+# corresponds to a higher priority class.
+default_priority = 0
+
+class JobRunner(threading.Thread):
+ def __init__(self, job_manager):
+ threading.Thread.__init__(self)
+ self.job_manager = job_manager
+
+ def run (self):
+ have_lock = True
+ self.job_manager.lock.acquire ()
+ try:
+ while self.job_manager.pause == 0 and not self.job_manager.do_quit:
+ try:
+ _, key, job = heapq.heappop (self.job_manager.queue)
+ except IndexError:
+ return
+
+ try:
+ self.job_manager.in_progress.append (key)
+ self.job_manager.lock.release ()
+ have_lock = False
+
+ # Execute the job.
+ try:
+ job ()
+ except KeyboardInterrupt:
+ # This is handled below and doesn't require a
+ # traceback.
+ raise
+ except:
+ print ("Executing job %s (%s) from thread %s: %s"
+ % (str (key), str (job),
+ threading.currentThread(),
+ traceback.format_exc ()))
+
+ self.job_manager.lock.acquire ()
+ have_lock = True
+
+ assert key in self.job_manager.in_progress
+ finally:
+ try:
+ self.job_manager.in_progress.remove (key)
+ except ValueError:
+ pass
+
+ debug("Finished executing job %s (%s)" % (key, job,))
+
+ self.job_manager._stats_hooks_run ({'job':job, 'key':key})
+ except KeyboardInterrupt:
+ debug("%s: KeyboardInterrupt" % threading.currentThread())
+ thread.interrupt_main()
+ debug("%s: Forwarded KeyboardInterrupt to main thread"
+ % threading.currentThread())
+ finally:
+ if have_lock:
+ self.job_manager.lock.release ()
+
+ assert self in self.job_manager.threads
+ self.job_manager.threads.remove (self)
+
+ debug ("Job runner %s (%d left) exiting."
+ % (threading.currentThread(),
+ len (self.job_manager.threads)))
+
+_jm = None
+def JobManager(start=False):
+ """
+ Return the job manager instance. The job manager will not start
+ executing jobs until this is called with start set to True. Note:
+ you can still queue jobs.
+ """
+ global _jm
+ if _jm is None:
+ _jm = _JobManager ()
+ if start and not _jm.started:
+ _jm.started = True
+ if _jm.jobs > 0:
+ _jm._stats_hooks_run ()
+ _jm.tickle ()
+
+ return _jm
+
+class _JobManager(object):
+ def __init__(self, started=False, num_threads=4):
+ """
+ Initialize the job manager.
+
+ If started is false, jobs may be queued, but jobs will not be
+ started until start() is called.
+ """
+ # A reentrant lock so that a job runner can call stat without
+ # dropping the lock.
+ self.lock = threading.RLock()
+
+ # If we can start executing jobs.
+ self.started = started
+
+ # The maximum number of threads to use for executing jobs.
+ self.num_threads = num_threads
+
+ # List of jobs (priority, key, job) that are queued for
+ # execution.
+ self.queue = []
+ # List of keys of the jobs that are being executed.
+ self.in_progress = []
+ # List of threads.
+ self.threads = []
+
+ # If 0, jobs may execute, otherwise, job execution is paused.
+ self.pause = 0
+
+ # The total number of jobs that this manager ever executed.
+ self.jobs = 0
+
+ # A list of status hooks to execute when the stats change.
+ self._stats_hooks = []
+ self._current_stats = self.stats ()
+
+ self.do_quit = False
+
+ def _lock(f):
+ def wrapper(*args, **kwargs):
+ self = args[0]
+ self.lock.acquire ()
+ try:
+ return f(*args, **kwargs)
+ finally:
+ self.lock.release()
+ return wrapper
+
+ @_lock
+ def start(self):
+ """
+ Start executing jobs.
+ """
+ if self.started:
+ return
+ if self.jobs > 0:
+ self._stats_hooks_run ()
+ self.tickle ()
+
+ @_lock
+ def tickle(self):
+ """
+ Ensure that there are enough job runners for the number of
+ pending jobs.
+ """
+ if self.do_quit:
+ debug("%s.quit called, not creating new threads."
+ % self.__class__.__name__)
+ return
+
+ if self.pause > 0:
+ # Job execution is paused. Don't start any new threads.
+ debug("%s.tickle(): Not doing anything: paused"
+ % (self.__class__.__name__))
+ return
+
+ debug("%s.tickle: Have %d threads (can start %d); %d jobs queued"
+ % (self.__class__.__name__,
+ len (self.threads), self.num_threads, len (self.queue)))
+ if len (self.threads) < self.num_threads:
+ for _ in range (min (len (self.queue),
+ self.num_threads - len (self.threads))):
+ thread = JobRunner (self)
+ # Setting threads as daemons means faster shutdown
+ # when the main thread exists, but it results in
+ # exceptions and occassional setfaults.
+ # thread.setDaemon(True)
+ self.threads.append (thread)
+ thread.start ()
+ debug("Now have %d threads" % len (self.threads))
+
+ @_lock
+ def execute(self, job, key=None, priority=default_priority):
+ """
+ Enqueue a job for execution. job is a function to execute.
+ If key is not None, the job is only enqueued if there is no
+ job that is inprogress or enqueued with the same key.
+ priority is the job's priority. Like nice(), a smaller
+ numeric priority corresponds to a higher priority class. Jobs
+ are executed highest priority first, in the order that they
+ were added.
+ """
+ if self.do_quit:
+ debug("%s.quit called, not enqueuing new jobs."
+ % self.__class__.__name__)
+
+ if key is not None:
+ if key in self.in_progress:
+ return
+ for item in self.queue:
+ if item[1] == key:
+ if item[0][0] < priority:
+ # Priority raised.
+ item[0][0] = priority
+ self.queue = heapq.heapify (self.queue)
+ return
+
+ # To ensure that jobs with the same priority are executed
+ # in the order they are added, we set the priority to
+ # [priority, next (monotomic counter)].
+ self.jobs += 1
+ heapq.heappush (self.queue, [[priority, self.jobs], key, job])
+
+ if self.started:
+ self._stats_hooks_run ()
+ self.tickle ()
+ else:
+ debug("%s not initialized. delaying execution of %s (%s)"
+ % (self.__class__.__name__, key, str (job),))
+
+ @_lock
+ def pause(self):
+ """
+ Increasement the pause count. When the pause count is greater
+ than 0, job execution is suspended.
+ """
+ self.pause += 1
+
+ if self.pause == 1:
+ self._stats_hooks_run ()
+
+ @_lock
+ def resume(self):
+ """
+ Decrement the pause count. If the pause count is greater than
+ 0 and this decrement brings it to 0, enqueued jobs are
+ resumed.
+ """
+ assert self.pause > 0
+ self.pause -= 1
+ if not self.paused():
+ self._stats_hooks_run ()
+ self.tickle ()
+
+ @_lock
+ def paused(self):
+ """
+ Returns whether job execution is paused.
+ """
+ return self.pause > 0
+
+ @_lock
+ def cancel(self):
+ """
+ Cancel any pending jobs.
+ """
+ self.queue = []
+ self._stats_hooks_run ()
+
+ def quit(self):
+ self.cancel ()
+ self.do_quit = True
+
+ @_lock
+ def stats(self):
+ """
+ Return a dictionary consisting of:
+
+ - 'paused': whether execution is paused
+ - 'jobs': the total number of jobs this manager has
+ executed, is executing or are queued
+ - 'jobs-completed': the numer of jobs that have completed
+ - 'jobs-in-progress': the number of jobs in progress
+ - 'jobs-queued': the number of jobs currently queued
+ """
+ return {'paused': self.paused(),
+ 'jobs': self.jobs,
+ 'jobs-completed':
+ self.jobs - len (self.in_progress) - len (self.queue),
+ 'jobs-in-progress': len (self.in_progress),
+ 'jobs-queued': len (self.queue)
+ }
+
+ def stats_hook_register(self, func, *args, **kwargs):
+ """
+ Registers a function to be called when the job status changes.
+ Passed the following parameters:
+
+ - the JobManager instance.
+ - the previous stats (as returned by stats)
+ - the current stats
+ - the job that was completed (or None)
+
+ Note: the hook may not be run in the main thread!
+ """
+ mainthread=False
+ try:
+ mainthread = kwargs['run_in_main_thread']
+ del kwargs['run_in_main_thread']
+ except KeyError:
+ pass
+ self._stats_hooks.append ([func, mainthread, args, kwargs])
+
+ def _stats_hooks_run(self, completed_job=None):
+ """
+ Run the stats hooks.
+ """
+ # if not self._stats_hooks:
+ # return
+
+ self.lock.acquire ()
+ try:
+ old_stats = self._current_stats
+ self._current_stats = self.stats ()
+ current_stats = self._current_stats
+ finally:
+ self.lock.release ()
+
+ debug("%s -> %s" % (str (old_stats), str (current_stats)))
+
+ for (f, run_in_main_thread, args, kwargs) in self._stats_hooks:
+ if run_in_main_thread:
+ debug("JobManager._stats_hooks_run: Running %s in main thread"
+ % f)
+ mainthread.execute(
+ f, self, old_stats, current_stats, completed_job,
+ async=True, *args, **kwargs)
+ else:
+ debug("JobManager._stats_hooks_run: Running %s in any thread"
+ % f)
+ f(self, old_stats, current_stats, completed_job,
+ *args, **kwargs)
--- /dev/null
+#!/usr/bin/env python2.5
+
+# Copyright (c) 2011 Neal H. Walfield <neal@walfield.org>
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+
+import threading
+import traceback
+
+_run_in_main_thread = None
+_main_thread = None
+
+def init(run_in_main_thread=None):
+ """
+ run_in_main_thread is a function that takes a single argument, a
+ callable and returns False. run_in_main_thread should run the
+ function in the main thread.
+
+ If you are using glib, gobject.idle_add (the default) is
+ sufficient. (gobject.idle_add is thread-safe.)
+ """
+ if run_in_main_thread is None:
+ import gobject
+ run_in_main_thread = gobject.idle_add
+
+ global _run_in_main_thread
+ assert _run_in_main_thread is None
+ _run_in_main_thread = run_in_main_thread
+
+ global _main_thread
+ _main_thread = threading.currentThread ()
+
+def execute(func, *args, **kwargs):
+ """
+ Execute FUNC in the main thread.
+
+ If kwargs['async'] exists and is True, the function is executed
+ asynchronously (i.e., the thread does not wait for the function to
+ return in which case the function's return value is discarded).
+ Otherwise, this function waits until the function is executed and
+ returns its return value.
+ """
+ async = False
+ try:
+ async = kwargs['async']
+ del kwargs['async']
+ except KeyError:
+ pass
+
+ if threading.currentThread() == _main_thread:
+ if async:
+ try:
+ func (*args, **kwargs)
+ except:
+ print ("mainthread.execute: Executing %s: %s"
+ % (func, traceback.format_exc ()))
+ return
+ else:
+ return func (*args, **kwargs)
+
+ assert _run_in_main_thread is not None, \
+ "You can't call this function from a non-main thread until you've called init()"
+
+ if not async:
+ cond = threading.Condition()
+
+ result = {}
+ result['done'] = False
+
+ def doit():
+ def it():
+ # Execute the function.
+ assert threading.currentThread() == _main_thread
+
+ try:
+ result['result'] = func (*args, **kwargs)
+ except:
+ print ("mainthread.execute: Executing %s: %s"
+ % (func, traceback.format_exc ()))
+
+ if not async:
+ cond.acquire ()
+ result['done'] = True
+ if not async:
+ cond.notify ()
+ cond.release ()
+
+ return False
+ return it
+
+ if not async:
+ cond.acquire ()
+ _run_in_main_thread (doit())
+
+ if async:
+ # Don't wait for the method to complete execution.
+ return
+
+ # Wait for the result to become available.
+ while not result['done']:
+ cond.wait ()
+
+ return result.get ('result', None)
+
+if __name__ == "__main__":
+ import sys
+ import gobject
+
+ init()
+
+ def in_main_thread(test_num):
+ assert threading.currentThread() == _main_thread, \
+ "Test %d failed" % (test_num,)
+ return test_num
+
+ mainloop = gobject.MainLoop()
+ gobject.threads_init()
+
+ assert execute (in_main_thread, 1) == 1
+ assert (execute (in_main_thread, 2, async=False) == 2)
+ execute (in_main_thread, 3, async=True)
+
+ class T(threading.Thread):
+ def __init__(self):
+ threading.Thread.__init__(self)
+
+ def run(self):
+ assert threading.currentThread() != _main_thread
+
+ assert execute (in_main_thread, 4) == 4
+ assert (execute (in_main_thread, 5, async=False) == 5)
+ execute (in_main_thread, 6, async=True)
+ execute (mainloop.quit, async=False)
+
+ def start_thread():
+ t = T()
+ t.start()
+ return False
+
+ gobject.idle_add (start_thread)
+ mainloop.run()
+
+def mainthread(f):
+ def wrapper(*args, **kwargs):
+ return execute (f, *args, **kwargs)
+ return wrapper
from os.path import isfile, isdir
from shutil import rmtree
from os import mkdir, remove, utime
+import os
import md5
import feedparser
import time
from BeautifulSoup import BeautifulSoup
from urlparse import urljoin
from calendar import timegm
+from updatedbus import get_lock, release_lock
import threading
+import traceback
+from jobmanager import JobManager
+import mainthread
+from httpprogresshandler import HTTPProgressHandler
def getId(string):
return md5.new(string).hexdigest()
+def download_callback(connection):
+ if JobManager().do_quit:
+ raise KeyboardInterrupt
+
+def downloader(progress_handler=None, proxy=None):
+ openers = []
+
+ if progress_handler:
+ openers.append (progress_handler)
+ else:
+ openers.append(HTTPProgressHandler(download_callback))
+
+ if proxy:
+ openers.append (proxy)
+
+ return urllib2.build_opener (*openers)
+
class Feed:
+ serial_execution_lock = threading.Lock()
+
def _getdb(self):
try:
db = self.tls.db
self.db.execute("CREATE TABLE images (id text, imagePath text);")
self.db.commit()
- def addImage(self, configdir, key, baseurl, url):
+ def addImage(self, configdir, key, baseurl, url, proxy=None, opener=None):
filename = configdir+key+".d/"+getId(url)
if not isfile(filename):
try:
- f = urllib2.urlopen(urljoin(baseurl,url))
+ if not opener:
+ opener = downloader(proxy=proxy)
+
+ abs_url = urljoin(baseurl,url)
+ f = opener.open(abs_url)
outf = open(filename, "w")
outf.write(f.read())
f.close()
outf.close()
+ except (urllib2.HTTPError, urllib2.URLError), exception:
+ print ("Could not download image %s: %s"
+ % (abs_url, str (exception)))
+ return None
except:
- print "Could not download " + url
+ exception = sys.exc_info()[0]
+
+ print "Downloading image: %s" % abs_url
+ import traceback
+ traceback.print_exc()
+
+ try:
+ remove(filename)
+ except OSError:
+ pass
+
+ raise exception
else:
#open(filename,"a").close() # "Touch" the file
file = open(filename,"a")
file.close()
return filename
- def updateFeed(self, configdir, url, etag, modified, expiryTime=24, proxy=None, imageCache=False):
- # Expiry time is in hours
- if proxy == None:
- tmp=feedparser.parse(url, etag = etag, modified = modified)
- else:
- tmp=feedparser.parse(url, etag = etag, modified = modified, handlers = [proxy])
- expiry = float(expiryTime) * 3600.
+ def updateFeed(self, configdir, url, etag, modified, expiryTime=24, proxy=None, imageCache=False, priority=0, postFeedUpdateFunc=None, *postFeedUpdateFuncArgs):
+ def doit():
+ def it():
+ self._updateFeed(configdir, url, etag, modified, expiryTime, proxy, imageCache, postFeedUpdateFunc, *postFeedUpdateFuncArgs)
+ return it
+ JobManager().execute(doit(), self.key, priority=priority)
- currentTime = 0
- # Check if the parse was succesful (number of entries > 0, else do nothing)
- if len(tmp["entries"])>0:
- currentTime = time.time()
- # The etag and modified value should only be updated if the content was not null
- try:
- etag = tmp["etag"]
- except KeyError:
- etag = None
- try:
- modified = tmp["modified"]
- except KeyError:
- modified = None
- try:
- f = urllib2.urlopen(urljoin(tmp["feed"]["link"],"/favicon.ico"))
- data = f.read()
- f.close()
- outf = open(self.dir+"/favicon.ico", "w")
- outf.write(data)
- outf.close()
- del data
- except:
- #import traceback
- #traceback.print_exc()
- pass
+ def _updateFeed(self, configdir, url, etag, modified, expiryTime=24, proxy=None, imageCache=False, postFeedUpdateFunc=None, *postFeedUpdateFuncArgs):
+ success = False
+ have_serial_execution_lock = False
+ try:
+ update_lock = None
+ update_lock = get_lock("key")
+ if not update_lock:
+ # Someone else is doing an update.
+ return
+
+ download_start = time.time ()
+ progress_handler = HTTPProgressHandler(download_callback)
- #reversedEntries = self.getEntries()
- #reversedEntries.reverse()
+ openers = [progress_handler]
+ if proxy:
+ openers.append (proxy)
+ kwargs = {'handlers':openers}
+
+ tmp=feedparser.parse(url, etag=etag, modified=modified, **kwargs)
+ download_duration = time.time () - download_start
+
+ opener = downloader(progress_handler, proxy)
- ids = self.getIds()
+ if JobManager().do_quit:
+ raise KeyboardInterrupt
- tmp["entries"].reverse()
- for entry in tmp["entries"]:
- date = self.extractDate(entry)
+ process_start = time.time()
+
+ # Expiry time is in hours
+ expiry = float(expiryTime) * 3600.
+
+ currentTime = 0
+ http_status = tmp.get ('status', 200)
+
+ # Check if the parse was succesful. If the http status code
+ # is 304, then the download was successful, but there is
+ # nothing new. Indeed, no content is returned. This make a
+ # 304 look like an error because there are no entries and the
+ # parse fails. But really, everything went great! Check for
+ # this first.
+ if http_status == 304:
+ success = True
+ elif len(tmp["entries"])==0 and not tmp.version:
+ # An error occured fetching or parsing the feed. (Version
+ # will be either None if e.g. the connection timed our or
+ # '' if the data is not a proper feed)
+ print ("Error fetching %s: version is: %s: error: %s"
+ % (url, str (tmp.version),
+ str (tmp.get ('bozo_exception', 'Unknown error'))))
+ print tmp
+ else:
+ currentTime = time.time()
+ # The etag and modified value should only be updated if the content was not null
try:
- entry["title"]
- except:
- entry["title"] = "No Title"
+ etag = tmp["etag"]
+ except KeyError:
+ etag = None
try:
- entry["link"]
- except:
- entry["link"] = ""
+ modified = tmp["modified"]
+ except KeyError:
+ modified = None
try:
- entry["author"]
- except:
- entry["author"] = None
- if(not(entry.has_key("id"))):
- entry["id"] = None
- tmpEntry = {"title":entry["title"], "content":self.extractContent(entry),
- "date":date, "link":entry["link"], "author":entry["author"], "id":entry["id"]}
- id = self.generateUniqueId(tmpEntry)
-
- #articleTime = time.mktime(self.entries[id]["dateTuple"])
- soup = BeautifulSoup(self.getArticle(tmpEntry)) #tmpEntry["content"])
- images = soup('img')
- baseurl = tmpEntry["link"]
- #if not id in ids:
- if imageCache:
- for img in images:
- try:
- filename = self.addImage(configdir, self.key, baseurl, img['src'])
- img['src']="file://%s" %filename
- count = self.db.execute("SELECT count(1) FROM images where id=? and imagePath=?;", (id, filename )).fetchone()[0]
- if count == 0:
- self.db.execute("INSERT INTO images (id, imagePath) VALUES (?, ?);", (id, filename) )
- except:
- import traceback
- traceback.print_exc()
- print "Error downloading image %s" % img
- tmpEntry["contentLink"] = configdir+self.key+".d/"+id+".html"
- file = open(tmpEntry["contentLink"], "w")
- file.write(soup.prettify())
- file.close()
- if id in ids:
- self.db.execute("UPDATE feed SET updated=? WHERE id=?;", (currentTime, id) )
- self.db.commit()
- else:
- values = (id, tmpEntry["title"], tmpEntry["contentLink"], tmpEntry["date"], currentTime, tmpEntry["link"], 0)
- self.db.execute("INSERT INTO feed (id, title, contentLink, date, updated, link, read) VALUES (?, ?, ?, ?, ?, ?, ?);", values)
- self.db.commit()
-# else:
-# try:
-# self.db.execute("UPDATE feed SET updated=? WHERE id=?;", (currentTime, id) )
-# self.db.commit()
-# filename = configdir+self.key+".d/"+id+".html"
-# file = open(filename,"a")
-# utime(filename, None)
-# file.close()
-# images = self.db.execute("SELECT imagePath FROM images where id=?;", (id, )).fetchall()
-# for image in images:
-# file = open(image[0],"a")
-# utime(image[0], None)
-# file.close()
-# except:
-# pass
- self.db.commit()
-
-
- rows = self.db.execute("SELECT id FROM feed WHERE (read=0 AND updated<?) OR (read=1 AND updated<?);", (currentTime-2*expiry, currentTime-expiry))
- for row in rows:
- self.removeEntry(row[0])
+ abs_url = urljoin(tmp["feed"]["link"],"/favicon.ico")
+ f = opener.open(abs_url)
+ data = f.read()
+ f.close()
+ outf = open(self.dir+"/favicon.ico", "w")
+ outf.write(data)
+ outf.close()
+ del data
+ except (urllib2.HTTPError, urllib2.URLError), exception:
+ print ("Could not download favicon %s: %s"
+ % (abs_url, str (exception)))
+
+ self.serial_execution_lock.acquire ()
+ have_serial_execution_lock = True
+
+ #reversedEntries = self.getEntries()
+ #reversedEntries.reverse()
+
+ ids = self.getIds()
+
+ tmp["entries"].reverse()
+ for entry in tmp["entries"]:
+ # Yield so as to make the main thread a bit more
+ # responsive.
+ time.sleep(0)
+
+ if JobManager().do_quit:
+ raise KeyboardInterrupt
+
+ received_base = progress_handler.stats['received']
+ sent_base = progress_handler.stats['sent']
+ object_size = 0
+
+ date = self.extractDate(entry)
+ try:
+ entry["title"]
+ except KeyError:
+ entry["title"] = "No Title"
+ try :
+ entry["link"]
+ except KeyError:
+ entry["link"] = ""
+ try:
+ entry["author"]
+ except KeyError:
+ entry["author"] = None
+ if(not(entry.has_key("id"))):
+ entry["id"] = None
+ content = self.extractContent(entry)
+ object_size = len (content)
+ received_base -= len (content)
+ tmpEntry = {"title":entry["title"], "content":content,
+ "date":date, "link":entry["link"], "author":entry["author"], "id":entry["id"]}
+ id = self.generateUniqueId(tmpEntry)
+
+ #articleTime = time.mktime(self.entries[id]["dateTuple"])
+ soup = BeautifulSoup(self.getArticle(tmpEntry)) #tmpEntry["content"])
+ images = soup('img')
+ baseurl = tmpEntry["link"]
+ #if not id in ids:
+ if imageCache and len(images) > 0:
+ self.serial_execution_lock.release ()
+ have_serial_execution_lock = False
+ for img in images:
+ filename = self.addImage(configdir, self.key, baseurl, img['src'], proxy=proxy)
+ if filename:
+ img['src']="file://%s" %filename
+ count = self.db.execute("SELECT count(1) FROM images where id=? and imagePath=?;", (id, filename )).fetchone()[0]
+ if count == 0:
+ self.db.execute("INSERT INTO images (id, imagePath) VALUES (?, ?);", (id, filename) )
+ self.db.commit()
+
+ try:
+ object_size += os.path.getsize (filename)
+ except os.error, exception:
+ print ("Error getting size of %s: %s"
+ % (filename, exception))
+ pass
+ self.serial_execution_lock.acquire ()
+ have_serial_execution_lock = True
+
+ tmpEntry["contentLink"] = configdir+self.key+".d/"+id+".html"
+ file = open(tmpEntry["contentLink"], "w")
+ file.write(soup.prettify())
+ file.close()
+ if id in ids:
+ self.db.execute("UPDATE feed SET updated=? WHERE id=?;", (currentTime, id) )
+ self.db.commit()
+ else:
+ values = (id, tmpEntry["title"], tmpEntry["contentLink"], tmpEntry["date"], currentTime, tmpEntry["link"], 0)
+ self.db.execute("INSERT INTO feed (id, title, contentLink, date, updated, link, read) VALUES (?, ?, ?, ?, ?, ?, ?);", values)
+ self.db.commit()
+# else:
+# try:
+# self.db.execute("UPDATE feed SET updated=? WHERE id=?;", (currentTime, id) )
+# self.db.commit()
+# filename = configdir+self.key+".d/"+id+".html"
+# file = open(filename,"a")
+# utime(filename, None)
+# file.close()
+# images = self.db.execute("SELECT imagePath FROM images where id=?;", (id, )).fetchall()
+# for image in images:
+# file = open(image[0],"a")
+# utime(image[0], None)
+# file.close()
+# except:
+# pass
+
- from glob import glob
- from os import stat
- for file in glob(configdir+self.key+".d/*"):
- #
- stats = stat(file)
- #
- # put the two dates into matching format
- #
- lastmodDate = stats[8]
- #
- expDate = time.time()-expiry*3
- # check if image-last-modified-date is outdated
- #
- if expDate > lastmodDate:
+ self.db.commit()
+
+ success = True
+
+ rows = self.db.execute("SELECT id FROM feed WHERE (read=0 AND updated<?) OR (read=1 AND updated<?);", (currentTime-2*expiry, currentTime-expiry))
+ for row in rows:
+ self.removeEntry(row[0])
+
+ from glob import glob
+ from os import stat
+ for file in glob(configdir+self.key+".d/*"):
#
- try:
- #
- #print 'Removing', file
- #
- remove(file) # commented out for testing
- #
- except OSError:
+ stats = stat(file)
+ #
+ # put the two dates into matching format
+ #
+ lastmodDate = stats[8]
+ #
+ expDate = time.time()-expiry*3
+ # check if image-last-modified-date is outdated
+ #
+ if expDate > lastmodDate:
#
- print 'Could not remove', file
- updateTime = 0
- rows = self.db.execute("SELECT MAX(date) FROM feed;")
- for row in rows:
- updateTime=row[0]
- return (updateTime, etag, modified)
-
+ try:
+ #
+ #print 'Removing', file
+ #
+ remove(file) # commented out for testing
+ #
+ except OSError, exception:
+ #
+ print 'Could not remove %s: %s' % (file, str (exception))
+ print ("updated %s: %fs in download, %fs in processing"
+ % (self.key, download_duration,
+ time.time () - process_start))
+ finally:
+ self.db.commit ()
+
+ if have_serial_execution_lock:
+ self.serial_execution_lock.release ()
+
+ if update_lock is not None:
+ release_lock (update_lock)
+
+ updateTime = 0
+ try:
+ rows = self.db.execute("SELECT MAX(date) FROM feed;")
+ for row in rows:
+ updateTime=row[0]
+ except:
+ print "Fetching update time."
+ import traceback
+ traceback.print_exc()
+ finally:
+ if not success:
+ etag = None
+ modified = None
+ if postFeedUpdateFunc is not None:
+ postFeedUpdateFunc (self.key, updateTime, etag, modified,
+ *postFeedUpdateFuncArgs)
+
def setEntryRead(self, id):
self.db.execute("UPDATE feed SET read=1 WHERE id=?;", (id,) )
self.db.commit()
images = soup('img')
baseurl = link
for img in images:
- filename = self.addImage(configdir, self.key, baseurl, img['src'])
+ filename = self.addImage(configdir, self.key, baseurl, img['src'], proxy=proxy)
img['src']=filename
self.db.execute("INSERT INTO images (id, imagePath) VALUES (?, ?);", (id, filename) )
+ self.db.commit()
contentLink = configdir+self.key+".d/"+id+".html"
file = open(contentLink, "w")
file.write(soup.prettify())
db = property(_getdb)
# Lists all the feeds in a dictionary, and expose the data
- def __init__(self, configdir):
+ def __init__(self, config, configdir):
+ self.config = config
self.configdir = configdir
self.tls = threading.local ()
self.addCategory("Default Category")
self.db.execute("ALTER TABLE feeds ADD COLUMN category int;")
self.db.execute("UPDATE feeds SET category=1;")
- self.db.commit()
+ self.db.commit()
except:
pass
archFeed.addArchivedArticle(title, link, date, self.configdir)
self.updateUnread("ArchivedArticles")
- def updateFeed(self, key, expiryTime=24, proxy=None, imageCache=False):
+ def updateFeed(self, key, expiryTime=None, proxy=None, imageCache=None,
+ priority=0):
+ if expiryTime is None:
+ expiryTime = self.config.getExpiry()
+ if not expiryTime:
+ # Default to 24 hours
+ expriyTime = 24
+ if proxy is None:
+ (use_proxy, proxy) = self.config.getProxy()
+ if not use_proxy:
+ proxy = None
+ if imageCache is None:
+ imageCache = self.config.getImageCache()
+
feed = self.getFeed(key)
(url, etag, modified) = self.db.execute("SELECT url, etag, modified FROM feeds WHERE id=?;", (key,) ).fetchone()
try:
modified = time.struct_time(eval(modified))
except:
modified = None
- (updateTime, etag, modified) = feed.updateFeed(self.configdir, url, etag, modified, expiryTime, proxy, imageCache)
+ feed.updateFeed(
+ self.configdir, url, etag, modified, expiryTime, proxy, imageCache,
+ priority, postFeedUpdateFunc=self._queuePostFeedUpdate)
+
+ def _queuePostFeedUpdate(self, *args, **kwargs):
+ mainthread.execute (self._postFeedUpdate, async=True, *args, **kwargs)
+
+ def _postFeedUpdate(self, key, updateTime, etag, modified):
if modified==None:
modified="None"
else:
#
# Copyright (c) 2007-2008 INdT.
+# Copyright (c) 2011 Neal H. Walfield
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
from rss_sqlite import Listing
from config import Config
+from updatedbus import get_lock, UpdateServerObject
-import threading
import os
import gobject
+import traceback
+
+from jobmanager import JobManager
+import mainthread
CONFIGDIR="/home/user/.feedingit/"
#DESKTOP_FILE = "/usr/share/applications/hildon-status-menu/feedingit_status.desktop"
setdefaulttimeout(timeout)
del timeout
-from updatedbus import UpdateServerObject, get_lock
-
-class Download(threading.Thread):
- def __init__(self, config, dbusHandler):
- global dbug
- threading.Thread.__init__(self)
- self.running = True
- self.config = config
- self.dbusHandler = dbusHandler
- if dbug:
- self.dbug = open(CONFIGDIR+"dbug.log", "w")
-
- def run(self):
- global dbug
- if dbug:
- self.dbug.write("Starting updates\n")
- try:
- self.dbusHandler.UpdateStarted()
- (use_proxy, proxy) = self.config.getProxy()
- listing = Listing(CONFIGDIR)
- for key in listing.getListOfFeeds():
- if dbug:
- self.dbug.write("updating %s\n" %key)
- try:
- if use_proxy:
- from urllib2 import install_opener, build_opener
- install_opener(build_opener(proxy))
- listing.updateFeed(key, self.config.getExpiry(), proxy=proxy, imageCache=self.config.getImageCache() )
- else:
- listing.updateFeed(key, self.config.getExpiry(), imageCache=self.config.getImageCache() )
- except:
- import traceback
- file = open("/home/user/.feedingit/feedingit_update.log", "a")
- traceback.print_exc(file=file)
- file.close()
- if not self.running:
- if dbug:
- self.dbug.write("received stopUpdate after %s\n" %key)
- break
- self.dbusHandler.UpdateFinished()
- self.dbusHandler.ArticleCountUpdated()
- if dbug:
- self.dbug.write("Dbus ArticleCountUpdated signal sent\n")
- except:
- import traceback
- file = open("/home/user/.feedingit/feedingit_update.log", "a")
- traceback.print_exc(file=file)
- file.close()
- #pass
- if dbug:
- self.dbug.write("About to main_quit\n")
- mainloop.quit()
- if dbug:
- self.dbug.write("After main_quit\n")
- self.dbug.close()
+from updatedbus import UpdateServerObject
+
+debug_file = None
+def debug(*args):
+ global debug_file
+ if not debug_file:
+ debug_file = open("/home/user/.feedingit/feedingit_update.log", "a")
+
+ debug_file.write (*args)
class FeedUpdate():
def __init__(self):
self.config = Config(self, CONFIGDIR+"config.ini")
self.dbusHandler = UpdateServerObject(self)
- self.updateThread = False
-
- def automaticUpdate(self):
- #self.listing.updateFeeds()
- if self.updateThread == False:
- self.updateThread = Download(self.config, self.dbusHandler)
- self.updateThread.start()
+ self.listing = Listing(self.config, CONFIGDIR)
+ self.done = False
+
+ jm = JobManager(True)
+ jm.stats_hook_register (self.job_manager_update,
+ run_in_main_thread=True)
+
+ self.dbusHandler.UpdateStarted()
+ for k in self.listing.getListOfFeeds():
+ self.listing.updateFeed (k)
- def stopUpdate(self):
- try:
- self.updateThread.running = False
- except:
+ def job_manager_update(self, jm, old_stats, new_stats, updated_feed):
+ if not new_stats['jobs-queued'] and not new_stats['jobs-in-progress']:
+ self.dbusHandler.UpdateFinished()
+ self.dbusHandler.ArticleCountUpdated()
+ self.done = True
mainloop.quit()
+ def stopUpdate(self):
+ print "Stop update called."
+ JobManager().quit()
+
import dbus.mainloop.glib
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
gobject.threads_init()
+mainthread.init()
mainloop = gobject.MainLoop()
app_lock = get_lock("app_lock")
if app_lock != None:
- try:
- feed = FeedUpdate()
- mainloop.run()
- del app_lock
- except:
- import traceback
- file = open("/home/user/.feedingit/feedingit_update.log", "w")
- traceback.print_exc(file=file)
- file.close()
+ feed = FeedUpdate()
+ while not feed.done:
+ try:
+ mainloop.run()
+ except KeyboardInterrupt:
+ print "Interrupted. Quitting."
+ JobManager().quit()
+ del app_lock
else:
- file = open("/home/user/.feedingit/feedingit_update.log", "a")
- file.write("Update in progress")
- file.close()
-
+ debug("Update in progress")
import dbus
import dbus.service
+import mainthread
+@mainthread.mainthread
def get_lock(key):
try:
bus_name = dbus.service.BusName('org.marcoz.feedingit.lock_%s' %key,bus=dbus.SessionBus(), do_not_queue=True)
except:
bus_name = None
return bus_name
-
+
+def release_lock(lock):
+ assert lock is not None
+ mainthread.execute(lock.__del__, async=True)
class UpdateServerObject(dbus.service.Object):
def __init__(self, app):