from BeautifulSoup import BeautifulSoup
from urlparse import urljoin
from calendar import timegm
-from updatedbus import get_lock, release_lock
import threading
import traceback
+from wc import wc, wc_init, woodchuck
+import subprocess
+import dbus
+from updatedbus import update_server_object
+
from jobmanager import JobManager
import mainthread
from httpprogresshandler import HTTPProgressHandler
+import random
+import sys
+import logging
+logger = logging.getLogger(__name__)
def getId(string):
return md5.new(string).hexdigest()
return urllib2.build_opener (*openers)
+# If not None, a subprocess.Popen object corresponding to a
+# update_feeds.py process.
+update_feed_process = None
+
+update_feeds_iface = None
+
+jobs_at_start = 0
+
class Feed:
serial_execution_lock = threading.Lock()
outf.write(f.read())
f.close()
outf.close()
- except (urllib2.HTTPError, urllib2.URLError), exception:
- print ("Could not download image %s: %s"
- % (abs_url, str (exception)))
+ except (urllib2.HTTPError, urllib2.URLError, IOError), exception:
+ logger.info("Could not download image %s: %s"
+ % (abs_url, str (exception)))
return None
except:
exception = sys.exc_info()[0]
- print "Downloading image: %s" % abs_url
- import traceback
- traceback.print_exc()
-
+ logger.info("Downloading image %s: %s" %
+ (abs_url, traceback.format_exc()))
try:
remove(filename)
except OSError:
return filename
def updateFeed(self, configdir, url, etag, modified, expiryTime=24, proxy=None, imageCache=False, priority=0, postFeedUpdateFunc=None, *postFeedUpdateFuncArgs):
- def doit():
- def it():
- self._updateFeed(configdir, url, etag, modified, expiryTime, proxy, imageCache, postFeedUpdateFunc, *postFeedUpdateFuncArgs)
- return it
- JobManager().execute(doit(), self.key, priority=priority)
+ if (os.path.basename(sys.argv[0]) == 'update_feeds.py'):
+ def doit():
+ def it():
+ self._updateFeed(configdir, url, etag, modified, expiryTime, proxy, imageCache, postFeedUpdateFunc, *postFeedUpdateFuncArgs)
+ return it
+ JobManager().execute(doit(), self.key, priority=priority)
+ else:
+ def send_update_request():
+ global update_feeds_iface
+ if update_feeds_iface is None:
+ bus=dbus.SessionBus()
+ remote_object = bus.get_object(
+ "org.marcoz.feedingit", # Connection name
+ "/org/marcoz/feedingit/update" # Object's path
+ )
+ update_feeds_iface = dbus.Interface(
+ remote_object, 'org.marcoz.feedingit')
+
+ try:
+ update_feeds_iface.Update(self.key)
+ except Exception, e:
+ logger.error("Invoking org.marcoz.feedingit.Update: %s"
+ % str(e))
+ update_feeds_iface = None
+ else:
+ return True
+
+ if send_update_request():
+ # Success! It seems we were able to start the update
+ # daemon via dbus (or, it was already running).
+ return
+
+ global update_feed_process
+ if (update_feed_process is None
+ or update_feed_process.poll() is not None):
+ # The update_feeds process is not running. Start it.
+ update_feeds = os.path.join(os.path.dirname(__file__),
+ 'update_feeds.py')
+ argv = ['/usr/bin/env', 'python', update_feeds, '--daemon' ]
+ logger.debug("Starting update_feeds: running %s"
+ % (str(argv),))
+ update_feed_process = subprocess.Popen(argv)
+ # Make sure the dbus calls go to the right process:
+ # rebind.
+ update_feeds_iface = None
+
+ for _ in xrange(5):
+ if send_update_request():
+ break
+ time.sleep(1)
def _updateFeed(self, configdir, url, etag, modified, expiryTime=24, proxy=None, imageCache=False, postFeedUpdateFunc=None, *postFeedUpdateFuncArgs):
success = False
have_serial_execution_lock = False
try:
- update_lock = None
- update_lock = get_lock("key")
- if not update_lock:
- # Someone else is doing an update.
- return
-
download_start = time.time ()
progress_handler = HTTPProgressHandler(download_callback)
expiry = float(expiryTime) * 3600.
currentTime = 0
+
+ have_woodchuck = mainthread.execute (wc().available)
+
+ def wc_success():
+ try:
+ wc().stream_register (self.key, "", 6 * 60 * 60)
+ except woodchuck.ObjectExistsError:
+ pass
+ try:
+ wc()[self.key].updated (
+ indicator=(woodchuck.Indicator.ApplicationVisual
+ |woodchuck.Indicator.StreamWide),
+ transferred_down=progress_handler.stats['received'],
+ transferred_up=progress_handler.stats['sent'],
+ transfer_time=download_start,
+ transfer_duration=download_duration,
+ new_objects=len (tmp.entries),
+ objects_inline=len (tmp.entries))
+ except KeyError:
+ logger.warn(
+ "Failed to register update of %s with woodchuck!"
+ % (self.key))
+
http_status = tmp.get ('status', 200)
# Check if the parse was succesful. If the http status code
# parse fails. But really, everything went great! Check for
# this first.
if http_status == 304:
+ logger.debug("%s: No changes to feed." % (self.key,))
+ mainthread.execute (wc_success, async=True)
success = True
elif len(tmp["entries"])==0 and not tmp.version:
# An error occured fetching or parsing the feed. (Version
# will be either None if e.g. the connection timed our or
# '' if the data is not a proper feed)
- print ("Error fetching %s: version is: %s: error: %s"
- % (url, str (tmp.version),
- str (tmp.get ('bozo_exception', 'Unknown error'))))
- print tmp
+ logger.error(
+ "Error fetching %s: version is: %s: error: %s"
+ % (url, str (tmp.version),
+ str (tmp.get ('bozo_exception', 'Unknown error'))))
+ logger.debug(tmp)
+ if have_woodchuck:
+ def e():
+ logger.debug("%s: stream update failed!" % self.key)
+
+ try:
+ # It's not easy to get the feed's title from here.
+ # At the latest, the next time the application is
+ # started, we'll fix up the human readable name.
+ wc().stream_register (self.key, "", 6 * 60 * 60)
+ except woodchuck.ObjectExistsError:
+ pass
+ ec = woodchuck.TransferStatus.TransientOther
+ if 300 <= http_status and http_status < 400:
+ ec = woodchuck.TransferStatus.TransientNetwork
+ if 400 <= http_status and http_status < 500:
+ ec = woodchuck.TransferStatus.FailureGone
+ if 500 <= http_status and http_status < 600:
+ ec = woodchuck.TransferStatus.TransientNetwork
+ wc()[self.key].update_failed(ec)
+ mainthread.execute (e, async=True)
else:
currentTime = time.time()
# The etag and modified value should only be updated if the content was not null
outf.close()
del data
except (urllib2.HTTPError, urllib2.URLError), exception:
- print ("Could not download favicon %s: %s"
- % (abs_url, str (exception)))
+ logger.debug("Could not download favicon %s: %s"
+ % (abs_url, str (exception)))
self.serial_execution_lock.acquire ()
have_serial_execution_lock = True
self.serial_execution_lock.release ()
have_serial_execution_lock = False
for img in images:
- filename = self.addImage(configdir, self.key, baseurl, img['src'], proxy=proxy)
- if filename:
+ filename = self.addImage(
+ configdir, self.key, baseurl, img['src'],
+ opener=opener)
+ if filename:
img['src']="file://%s" %filename
count = self.db.execute("SELECT count(1) FROM images where id=? and imagePath=?;", (id, filename )).fetchone()[0]
if count == 0:
try:
object_size += os.path.getsize (filename)
except os.error, exception:
- print ("Error getting size of %s: %s"
- % (filename, exception))
- pass
+ logger.error ("Error getting size of %s: %s"
+ % (filename, exception))
self.serial_execution_lock.acquire ()
have_serial_execution_lock = True
# except:
# pass
+ # Register the object with Woodchuck and mark it as
+ # downloaded.
+ if have_woodchuck:
+ def e():
+ try:
+ obj = wc()[self.key].object_register(
+ object_identifier=id,
+ human_readable_name=tmpEntry["title"])
+ except woodchuck.ObjectExistsError:
+ obj = wc()[self.key][id]
+ else:
+ # If the entry does not contain a publication
+ # time, the attribute won't exist.
+ pubtime = entry.get ('date_parsed', None)
+ if pubtime:
+ obj.publication_time = time.mktime (pubtime)
+ received = (progress_handler.stats['received']
+ - received_base)
+ sent = progress_handler.stats['sent'] - sent_base
+ obj.transferred (
+ indicator=(woodchuck.Indicator.ApplicationVisual
+ |woodchuck.Indicator.StreamWide),
+ transferred_down=received,
+ transferred_up=sent,
+ object_size=object_size)
+ mainthread.execute(e, async=True)
self.db.commit()
+ logger.debug (
+ "%s: Update successful: transferred: %d/%d; objects: %d)"
+ % (self.key,
+ progress_handler.stats['sent'],
+ progress_handler.stats['received'],
+ len (tmp.entries)))
+ mainthread.execute (wc_success, async=True)
success = True
rows = self.db.execute("SELECT id FROM feed WHERE (read=0 AND updated<?) OR (read=1 AND updated<?);", (currentTime-2*expiry, currentTime-expiry))
#
#print 'Removing', file
#
+ # XXX: Tell woodchuck.
remove(file) # commented out for testing
#
except OSError, exception:
#
- print 'Could not remove %s: %s' % (file, str (exception))
- print ("updated %s: %fs in download, %fs in processing"
- % (self.key, download_duration,
- time.time () - process_start))
+ logger.error('Could not remove %s: %s'
+ % (file, str (exception)))
+ logger.debug("updated %s: %fs in download, %fs in processing"
+ % (self.key, download_duration,
+ time.time () - process_start))
+ except:
+ logger.error("Updating %s: %s" % (self.key, traceback.format_exc()))
finally:
self.db.commit ()
if have_serial_execution_lock:
self.serial_execution_lock.release ()
- if update_lock is not None:
- release_lock (update_lock)
-
updateTime = 0
try:
rows = self.db.execute("SELECT MAX(date) FROM feed;")
for row in rows:
updateTime=row[0]
- except:
- print "Fetching update time."
- import traceback
- traceback.print_exc()
+ except Exception, e:
+ logger.error("Fetching update time: %s: %s"
+ % (str(e), traceback.format_exc()))
finally:
if not success:
etag = None
modified = None
+ title = None
+ try:
+ title = tmp.feed.title
+ except (AttributeError, UnboundLocalError), exception:
+ pass
if postFeedUpdateFunc is not None:
postFeedUpdateFunc (self.key, updateTime, etag, modified,
- *postFeedUpdateFuncArgs)
+ title, *postFeedUpdateFuncArgs)
def setEntryRead(self, id):
self.db.execute("UPDATE feed SET read=1 WHERE id=?;", (id,) )
self.db.commit()
-
+
+ def e():
+ if wc().available():
+ try:
+ wc()[self.key][id].used()
+ except KeyError:
+ pass
+
def setEntryUnread(self, id):
self.db.execute("UPDATE feed SET read=0 WHERE id=?;", (id,) )
self.db.commit()
return self.db.execute("SELECT date FROM feed WHERE id=?;", (id,) ).fetchone()[0]
def generateUniqueId(self, entry):
- if(entry["id"] != None):
- return getId(str(entry["id"]))
- else:
- try:
- return getId(str(entry["date"]) + str(entry["title"]))
- except:
- #print entry["title"]
- return getId(str(entry["date"]))
+ """
+ Generate a stable identifier for the article. For the same
+ entry, this should result in the same identifier. If
+ possible, the identifier should remain the same even if the
+ article is updated.
+ """
+ # Prefer the entry's id, which is supposed to be globally
+ # unique.
+ key = entry.get('id', None)
+ if not key:
+ # Next, try the link to the content.
+ key = entry.get('link', None)
+ if not key:
+ # Ok, the title and the date concatenated are likely to be
+ # relatively stable.
+ key = entry.get('title', None) + entry.get('date', None)
+ if not key:
+ # Hmm, the article's content will at least guarantee no
+ # false negatives (i.e., missing articles)
+ key = entry.get('content', None)
+ if not key:
+ # If all else fails, just use a random number.
+ key = str (random.random ())
+ return getId (key)
def getIds(self, onlyUnread=False):
if onlyUnread:
#ids.reverse()
return ids
- def getNextId(self, id):
+ def getNextId(self, id, forward=True):
+ if forward:
+ delta = 1
+ else:
+ delta = -1
ids = self.getIds()
index = ids.index(id)
- return ids[(index+1)%len(ids)]
+ return ids[(index + delta) % len(ids)]
def getPreviousId(self, id):
- ids = self.getIds()
- index = ids.index(id)
- return ids[(index-1)%len(ids)]
+ return self.getNextId(id, forward=False)
def getNumberOfUnreadItems(self):
return self.db.execute("SELECT count(*) FROM feed WHERE read=0;").fetchone()[0]
try:
remove(contentLink)
except OSError, exception:
- print "Deleting %s: %s" % (contentLink, str (exception))
+ logger.error("Deleting %s: %s" % (contentLink, str (exception)))
self.db.execute("DELETE FROM feed WHERE id=?;", (id,) )
self.db.execute("DELETE FROM images WHERE id=?;", (id,) )
self.db.commit()
+
+ def e():
+ if wc().available():
+ try:
+ wc()[self.key][id].files_deleted (
+ woodchuck.DeletionResponse.Deleted)
+ del wc()[self.key][id]
+ except KeyError:
+ pass
+ mainthread.execute (e, async=True)
class ArchivedArticles(Feed):
def addArchivedArticle(self, title, link, date, configdir):
except:
pass
+ # Check that Woodchuck's state is up to date with respect our
+ # state.
+ updater = os.path.basename(sys.argv[0]) == 'update_feeds.py'
+ wc_init (self, True if updater else False)
+ if wc().available() and updater:
+ # The list of known streams.
+ streams = wc().streams_list ()
+ stream_ids = [s.identifier for s in streams]
+
+ # Register any unknown streams. Remove known streams from
+ # STREAMS_IDS.
+ for key in self.getListOfFeeds():
+ title = self.getFeedTitle(key)
+ # XXX: We should also check whether the list of
+ # articles/objects in each feed/stream is up to date.
+ if key not in stream_ids:
+ logger.debug(
+ "Registering previously unknown channel: %s (%s)"
+ % (key, title,))
+ # Use a default refresh interval of 6 hours.
+ wc().stream_register (key, title, 6 * 60 * 60)
+ else:
+ # Make sure the human readable name is up to date.
+ if wc()[key].human_readable_name != title:
+ wc()[key].human_readable_name = title
+ stream_ids.remove (key)
+
+
+ # Unregister any streams that are no longer subscribed to.
+ for id in stream_ids:
+ logger.debug("Unregistering %s" % (id,))
+ w.stream_unregister (id)
+
def importOldFormatFeeds(self):
"""This function loads feeds that are saved in an outdated format, and converts them to sqlite"""
import rss
pass
self.updateUnread(id)
except:
- import traceback
- traceback.print_exc()
+ logger.error("importOldFormatFeeds: %s"
+ % (traceback.format_exc(),))
remove(self.configdir+"feeds.pickle")
def _queuePostFeedUpdate(self, *args, **kwargs):
mainthread.execute (self._postFeedUpdate, async=True, *args, **kwargs)
- def _postFeedUpdate(self, key, updateTime, etag, modified):
+ def _postFeedUpdate(self, key, updateTime, etag, modified, title):
if modified==None:
modified="None"
else:
self.db.execute("UPDATE feeds SET updateTime=?, etag=?, modified=? WHERE id=?;", (updateTime, etag, modified, key) )
else:
self.db.execute("UPDATE feeds SET etag=?, modified=? WHERE id=?;", (etag, modified, key) )
+
+ if title is not None:
+ self.db.execute("UPDATE feeds SET title=(case WHEN title=='' THEN ? ELSE title END) where id=?;",
+ (title, key))
self.db.commit()
self.updateUnread(key)
+
+ update_server_object().ArticleCountUpdated()
+
+ stats = JobManager().stats()
+ global jobs_at_start
+ completed = stats['jobs-completed'] - jobs_at_start
+ in_progress = stats['jobs-in-progress']
+ queued = stats['jobs-queued']
+
+ percent = (100 * ((completed + in_progress / 2.))
+ / (completed + in_progress + queued))
+
+ update_server_object().UpdateProgress(
+ percent, completed, in_progress, queued, 0, 0, 0, key)
+
+ if in_progress == 0 and queued == 0:
+ jobs_at_start = stats['jobs-completed']
def getFeed(self, key):
if key == "ArchivedArticles":
else:
self.db.execute("UPDATE feeds SET title=?, url=? WHERE id=?;", (title, url, key))
self.db.commit()
+
+ if wc().available():
+ try:
+ wc()[key].human_readable_name = title
+ except KeyError:
+ logger.debug("Feed %s (%s) unknown." % (key, title))
def getFeedUpdateTime(self, key):
return time.ctime(self.db.execute("SELECT updateTime FROM feeds WHERE id=?;", (key,)).fetchone()[0])
return self.db.execute("SELECT unread FROM feeds WHERE id=?;", (key,)).fetchone()[0]
def getFeedTitle(self, key):
- return self.db.execute("SELECT title FROM feeds WHERE id=?;", (key,)).fetchone()[0]
+ (title, url) = self.db.execute("SELECT title, url FROM feeds WHERE id=?;", (key,)).fetchone()
+ if title:
+ return title
+ return url
def getFeedUrl(self, key):
return self.db.execute("SELECT url FROM feeds WHERE id=?;", (key,)).fetchone()[0]
def addFeed(self, title, url, id=None, category=1):
if not id:
- id = getId(title)
+ id = getId(url)
count = self.db.execute("SELECT count(*) FROM feeds WHERE id=?;", (id,) ).fetchone()[0]
if count == 0:
max_rank = self.db.execute("SELECT MAX(rank) FROM feeds;").fetchone()[0]
self.db.commit()
# Ask for the feed object, it will create the necessary tables
self.getFeed(id)
+
+ if wc().available():
+ # Register the stream with Woodchuck. Update approximately
+ # every 6 hours.
+ wc().stream_register(stream_identifier=id,
+ human_readable_name=title,
+ freshness=6*60*60)
+
return True
else:
return False
self.db.commit()
def removeFeed(self, key):
+ if wc().available ():
+ try:
+ del wc()[key]
+ except KeyError:
+ logger.debug("Removing unregistered feed %s failed" % (key,))
+
rank = self.db.execute("SELECT rank FROM feeds WHERE id=?;", (key,) ).fetchone()[0]
self.db.execute("DELETE FROM feeds WHERE id=?;", (key, ))
self.db.execute("UPDATE feeds SET rank=rank-1 WHERE rank>?;", (rank,) )