#
# Copyright (c) 2007-2008 INdT.
+# Copyright (c) 2011 Neal H. Walfield
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# Description : Simple RSS Reader
# ============================================================================
+from __future__ import with_statement
+
import sqlite3
from os.path import isfile, isdir
from shutil import rmtree
from os import mkdir, remove, utime
+import os
import md5
import feedparser
import time
import urllib2
from BeautifulSoup import BeautifulSoup
from urlparse import urljoin
+from calendar import timegm
+import threading
+import traceback
+from wc import wc, wc_init, woodchuck
+import subprocess
+import dbus
+from updatedbus import update_server_object
+
+from jobmanager import JobManager
+import mainthread
+from httpprogresshandler import HTTPProgressHandler
+import random
+import sys
+import logging
+logger = logging.getLogger(__name__)
def getId(string):
return md5.new(string).hexdigest()
-class Feed:
+def download_callback(connection):
+ if JobManager().do_quit:
+ raise KeyboardInterrupt
+
+def downloader(progress_handler=None, proxy=None):
+ openers = []
+
+ if progress_handler is not None:
+ openers.append(progress_handler)
+ else:
+ openers.append(HTTPProgressHandler(download_callback))
+
+ if proxy:
+ openers.append(proxy)
+
+ return urllib2.build_opener(*openers)
+
+def transfer_stats(sent, received, **kwargs):
+ """
+ This function takes two arguments: sent is the number of bytes
+ sent so far, received is the number of bytes received. The
+ function returns a continuation that you can call later.
+
+ The continuation takes the same two arguments. It returns a tuple
+ of the number of bytes sent, the number of bytes received and the
+ time since the original function was invoked.
+ """
+ start_time = time.time()
+ start_sent = sent
+ start_received = received
+
+ def e(sent, received, **kwargs):
+ return (sent - start_sent,
+ received - start_received,
+ time.time() - start_time)
+
+ return e
+
+# If not None, a subprocess.Popen object corresponding to a
+# update_feeds.py process.
+update_feed_process = None
+
+update_feeds_iface = None
+
+jobs_at_start = 0
+
+class BaseObject(object):
+ # Columns to cache. Classes that inherit from this and use the
+ # cache mechanism should set this to a list of tuples, each of
+ # which contains two entries: the table and the column. Note that
+ # both are case sensitive.
+ cached_columns = ()
+
+ def cache_invalidate(self, table=None):
+ """
+ Invalidate the cache.
+
+ If table is not None, invalidate only the specified table.
+ Otherwise, drop the whole cache.
+ """
+ if not hasattr(self, 'cache'):
+ return
+
+ if table is None:
+ del self.cache
+ else:
+ if table in self.cache:
+ del self.cache[table]
+
+ def lookup(self, table, column, id=None):
+ """
+ Look up a column or value. Uses a cache for columns in
+ cached_columns. Note: the column is returned unsorted.
+ """
+ if not hasattr(self, 'cache'):
+ self.cache = {}
+
+ # Cache data for at most 60 seconds.
+ now = time.time()
+ try:
+ cache = self.cache[table]
+
+ if time.time() - cache[None] > 60:
+ # logger.debug("%s: Cache too old: clearing" % (table,))
+ del self.cache[table]
+ cache = None
+ except KeyError:
+ cache = None
+
+ if (cache is None
+ or (table, column) not in self.cached_columns):
+ # The cache is empty or the caller wants a column that we
+ # don't cache.
+ if (table, column) in self.cached_columns:
+ # logger.debug("%s: Rebuilding cache" % (table,))
+
+ do_cache = True
+
+ self.cache[table] = cache = {}
+ columns = []
+ for t, c in self.cached_columns:
+ if table == t:
+ cache[c] = {}
+ columns.append(c)
+
+ columns.append('id')
+ where = ""
+ else:
+ do_cache = False
+
+ columns = (colums,)
+ if id is not None:
+ where = "where id = '%s'" % id
+ else:
+ where = ""
+
+ results = self.db.execute(
+ "SELECT %s FROM %s %s" % (','.join(columns), table, where))
+
+ if do_cache:
+ for r in results:
+ values = list(r)
+ i = values.pop()
+ for index, value in enumerate(values):
+ cache[columns[index]][i] = value
+
+ cache[None] = now
+ else:
+ results = []
+ for r in results:
+ if id is not None:
+ return values[0]
+
+ results.append(values[0])
+
+ return results
+ else:
+ cache = self.cache[table]
+
+ try:
+ if id is not None:
+ value = cache[column][id]
+ # logger.debug("%s.%s:%s -> %s" % (table, column, id, value))
+ return value
+ else:
+ return cache[column].values()
+ except KeyError:
+ # logger.debug("%s.%s:%s -> Not found" % (table, column, id))
+ return None
+
+class Feed(BaseObject):
+ # Columns to cache.
+ cached_columns = (('feed', 'read'),
+ ('feed', 'title'))
+
+ serial_execution_lock = threading.Lock()
+
+ def _getdb(self):
+ try:
+ db = self.tls.db
+ except AttributeError:
+ db = sqlite3.connect("%s/%s.db" % (self.dir, self.key), timeout=120)
+ self.tls.db = db
+ return db
+ db = property(_getdb)
+
def __init__(self, configdir, key):
self.key = key
self.configdir = configdir
self.dir = "%s/%s.d" %(self.configdir, self.key)
+ self.tls = threading.local ()
+
if not isdir(self.dir):
mkdir(self.dir)
if not isfile("%s/%s.db" %(self.dir, self.key)):
- self.db = sqlite3.connect("%s/%s.db" %(self.dir, self.key) )
self.db.execute("CREATE TABLE feed (id text, title text, contentLink text, date float, updated float, link text, read int);")
self.db.execute("CREATE TABLE images (id text, imagePath text);")
self.db.commit()
- else:
- self.db = sqlite3.connect("%s/%s.db" %(self.dir, self.key) )
- def addImage(self, configdir, key, baseurl, url):
+ def addImage(self, configdir, key, baseurl, url, proxy=None, opener=None):
filename = configdir+key+".d/"+getId(url)
if not isfile(filename):
try:
- f = urllib2.urlopen(urljoin(baseurl,url))
- outf = open(filename, "w")
- outf.write(f.read())
- f.close()
- outf.close()
+ if not opener:
+ opener = downloader(proxy=proxy)
+
+ abs_url = urljoin(baseurl,url)
+ f = opener.open(abs_url)
+ try:
+ with open(filename, "w") as outf:
+ for data in f:
+ outf.write(data)
+ finally:
+ f.close()
+ except (urllib2.HTTPError, urllib2.URLError, IOError), exception:
+ logger.info("Could not download image %s: %s"
+ % (abs_url, str (exception)))
+ return None
except:
- print "Could not download " + url
+ exception = sys.exc_info()[0]
+
+ logger.info("Downloading image %s: %s" %
+ (abs_url, traceback.format_exc()))
+ try:
+ remove(filename)
+ except OSError:
+ pass
+
+ return None
else:
#open(filename,"a").close() # "Touch" the file
file = open(filename,"a")
file.close()
return filename
- def updateFeed(self, configdir, url, etag, modified, expiryTime=24, proxy=None, imageCache=False):
- # Expiry time is in hours
- if proxy == None:
- tmp=feedparser.parse(url, etag = etag, modified = modified)
+ def updateFeed(self, configdir, url, etag, modified, expiryTime=24, proxy=None, imageCache=False, priority=0, postFeedUpdateFunc=None, *postFeedUpdateFuncArgs):
+ if (os.path.basename(sys.argv[0]) == 'update_feeds.py'):
+ def doit():
+ def it():
+ self._updateFeed(configdir, url, etag, modified, expiryTime, proxy, imageCache, postFeedUpdateFunc, *postFeedUpdateFuncArgs)
+ return it
+ JobManager().execute(doit(), self.key, priority=priority)
else:
- tmp=feedparser.parse(url, etag = etag, modified = modified, handlers = [proxy])
- expiry = float(expiryTime) * 3600.
-
- currentTime = time.time()
- # Check if the parse was succesful (number of entries > 0, else do nothing)
- if len(tmp["entries"])>0:
- # The etag and modified value should only be updated if the content was not null
- try:
- etag = tmp["etag"]
- except KeyError:
- etag = None
- try:
- modified = tmp["modified"]
- except KeyError:
- modified = None
- try:
- f = urllib2.urlopen(urljoin(tmp["feed"]["link"],"/favicon.ico"))
- data = f.read()
- f.close()
- outf = open(self.dir+"/favicon.ico", "w")
- outf.write(data)
- outf.close()
- del data
- except:
- #import traceback
- #traceback.print_exc()
- pass
+ def send_update_request():
+ global update_feeds_iface
+ if update_feeds_iface is None:
+ bus=dbus.SessionBus()
+ remote_object = bus.get_object(
+ "org.marcoz.feedingit", # Connection name
+ "/org/marcoz/feedingit/update" # Object's path
+ )
+ update_feeds_iface = dbus.Interface(
+ remote_object, 'org.marcoz.feedingit')
+
+ try:
+ update_feeds_iface.Update(self.key)
+ except Exception, e:
+ logger.error("Invoking org.marcoz.feedingit.Update: %s"
+ % str(e))
+ update_feeds_iface = None
+ else:
+ return True
+
+ if send_update_request():
+ # Success! It seems we were able to start the update
+ # daemon via dbus (or, it was already running).
+ return
+
+ global update_feed_process
+ if (update_feed_process is None
+ or update_feed_process.poll() is not None):
+ # The update_feeds process is not running. Start it.
+ update_feeds = os.path.join(os.path.dirname(__file__),
+ 'update_feeds.py')
+ argv = ['/usr/bin/env', 'python', update_feeds, '--daemon' ]
+ logger.debug("Starting update_feeds: running %s"
+ % (str(argv),))
+ update_feed_process = subprocess.Popen(argv)
+ # Make sure the dbus calls go to the right process:
+ # rebind.
+ update_feeds_iface = None
+ for _ in xrange(5):
+ if send_update_request():
+ break
+ time.sleep(1)
+
+ def _updateFeed(self, configdir, url, etag, modified, expiryTime=24, proxy=None, imageCache=False, postFeedUpdateFunc=None, *postFeedUpdateFuncArgs):
+ logger.debug("Updating %s" % url)
+
+ success = False
+ have_serial_execution_lock = False
+ try:
+ update_start = time.time ()
+
+ progress_handler = HTTPProgressHandler(download_callback)
+
+ openers = [progress_handler]
+ if proxy:
+ openers.append (proxy)
+ kwargs = {'handlers':openers}
+
+ feed_transfer_stats = transfer_stats(0, 0)
- #reversedEntries = self.getEntries()
- #reversedEntries.reverse()
+ tmp=feedparser.parse(url, etag=etag, modified=modified, **kwargs)
+ download_duration = time.time () - update_start
- ids = self.getIds()
+ opener = downloader(progress_handler, proxy)
- tmp["entries"].reverse()
- for entry in tmp["entries"]:
- date = self.extractDate(entry)
+ if JobManager().do_quit:
+ raise KeyboardInterrupt
+
+ process_start = time.time()
+
+ # Expiry time is in hours
+ expiry = float(expiryTime) * 3600.
+
+ currentTime = 0
+
+ updated_objects = 0
+ new_objects = 0
+
+ def wc_success():
+ try:
+ wc().stream_register (self.key, "", 6 * 60 * 60)
+ except woodchuck.ObjectExistsError:
+ pass
+ try:
+ wc()[self.key].updated (
+ indicator=(woodchuck.Indicator.ApplicationVisual
+ |woodchuck.Indicator.StreamWide),
+ transferred_down=progress_handler.stats['received'],
+ transferred_up=progress_handler.stats['sent'],
+ transfer_time=update_start,
+ transfer_duration=download_duration,
+ new_objects=new_objects,
+ updated_objects=updated_objects,
+ objects_inline=new_objects + updated_objects)
+ except KeyError:
+ logger.warn(
+ "Failed to register update of %s with woodchuck!"
+ % (self.key))
+
+ http_status = tmp.get ('status', 200)
+
+ # Check if the parse was succesful. If the http status code
+ # is 304, then the download was successful, but there is
+ # nothing new. Indeed, no content is returned. This make a
+ # 304 look like an error because there are no entries and the
+ # parse fails. But really, everything went great! Check for
+ # this first.
+ if http_status == 304:
+ logger.debug("%s: No changes to feed." % (self.key,))
+ mainthread.execute(wc_success, async=True)
+ success = True
+ elif len(tmp["entries"])==0 and not tmp.get('version', None):
+ # An error occured fetching or parsing the feed. (Version
+ # will be either None if e.g. the connection timed our or
+ # '' if the data is not a proper feed)
+ logger.error(
+ "Error fetching %s: version is: %s: error: %s"
+ % (url, str (tmp.get('version', 'unset')),
+ str (tmp.get ('bozo_exception', 'Unknown error'))))
+ logger.debug(tmp)
+ def register_stream_update_failed(http_status):
+ def doit():
+ logger.debug("%s: stream update failed!" % self.key)
+
+ try:
+ # It's not easy to get the feed's title from here.
+ # At the latest, the next time the application is
+ # started, we'll fix up the human readable name.
+ wc().stream_register (self.key, "", 6 * 60 * 60)
+ except woodchuck.ObjectExistsError:
+ pass
+ ec = woodchuck.TransferStatus.TransientOther
+ if 300 <= http_status and http_status < 400:
+ ec = woodchuck.TransferStatus.TransientNetwork
+ if 400 <= http_status and http_status < 500:
+ ec = woodchuck.TransferStatus.FailureGone
+ if 500 <= http_status and http_status < 600:
+ ec = woodchuck.TransferStatus.TransientNetwork
+ wc()[self.key].update_failed(ec)
+ return doit
+ if wc().available:
+ mainthread.execute(
+ register_stream_update_failed(
+ http_status=http_status),
+ async=True)
+ else:
+ currentTime = time.time()
+ # The etag and modified value should only be updated if the content was not null
try:
- entry["title"]
- except:
- entry["title"] = "No Title"
+ etag = tmp["etag"]
+ except KeyError:
+ etag = None
try:
- entry["link"]
- except:
- entry["link"] = ""
- tmpEntry = {"title":entry["title"], "content":self.extractContent(entry),
- "date":date, "link":entry["link"]}
- id = self.generateUniqueId(tmpEntry)
-
- #articleTime = time.mktime(self.entries[id]["dateTuple"])
- if not id in ids:
+ modified = tmp["modified"]
+ except KeyError:
+ modified = None
+ try:
+ abs_url = urljoin(tmp["feed"]["link"],"/favicon.ico")
+ f = opener.open(abs_url)
+ data = f.read()
+ f.close()
+ outf = open(self.dir+"/favicon.ico", "w")
+ outf.write(data)
+ outf.close()
+ del data
+ except (urllib2.HTTPError, urllib2.URLError), exception:
+ logger.debug("Could not download favicon %s: %s"
+ % (abs_url, str (exception)))
+
+ self.serial_execution_lock.acquire ()
+ have_serial_execution_lock = True
+
+ #reversedEntries = self.getEntries()
+ #reversedEntries.reverse()
+
+ tmp["entries"].reverse()
+ for entry in tmp["entries"]:
+ # Yield so as to make the main thread a bit more
+ # responsive.
+ time.sleep(0)
+
+ entry_transfer_stats = transfer_stats(
+ *feed_transfer_stats(**progress_handler.stats)[0:2])
+
+ if JobManager().do_quit:
+ raise KeyboardInterrupt
+
+ object_size = 0
+
+ date = self.extractDate(entry)
+ try:
+ entry["title"]
+ except KeyError:
+ entry["title"] = "No Title"
+ try :
+ entry["link"]
+ except KeyError:
+ entry["link"] = ""
+ try:
+ entry["author"]
+ except KeyError:
+ entry["author"] = None
+ if(not(entry.has_key("id"))):
+ entry["id"] = None
+ content = self.extractContent(entry)
+ object_size = len (content)
+ tmpEntry = {"title":entry["title"], "content":content,
+ "date":date, "link":entry["link"], "author":entry["author"], "id":entry["id"]}
+ id = self.generateUniqueId(tmpEntry)
+
+ current_version = self.db.execute(
+ 'select date, ROWID from feed where id=?',
+ (id,)).fetchone()
+ if (current_version is not None
+ and current_version[0] == date):
+ logger.debug("ALREADY DOWNLOADED %s (%s)"
+ % (entry["title"], entry["link"]))
+ continue
+
+ if current_version is not None:
+ # The version was updated. Mark it as unread.
+ logger.debug("UPDATED: %s (%s)"
+ % (entry["title"], entry["link"]))
+ updated_objects += 1
+ else:
+ logger.debug("NEW: %s (%s)"
+ % (entry["title"], entry["link"]))
+ new_objects += 1
+
+ #articleTime = time.mktime(self.entries[id]["dateTuple"])
soup = BeautifulSoup(self.getArticle(tmpEntry)) #tmpEntry["content"])
images = soup('img')
baseurl = tmpEntry["link"]
- if imageCache:
- for img in images:
- try:
- filename = self.addImage(configdir, self.key, baseurl, img['src'])
- img['src']=filename
- self.db.execute("INSERT INTO images (id, imagePath) VALUES (?, ?);", (id, filename) )
- except:
- import traceback
- traceback.print_exc()
- print "Error downloading image %s" % img
+ if imageCache and len(images) > 0:
+ self.serial_execution_lock.release ()
+ have_serial_execution_lock = False
+ for img in images:
+ if not img.has_key('src'):
+ continue
+
+ filename = self.addImage(
+ configdir, self.key, baseurl, img['src'],
+ opener=opener)
+ if filename:
+ img['src']="file://%s" %filename
+ count = self.db.execute("SELECT count(1) FROM images where id=? and imagePath=?;", (id, filename )).fetchone()[0]
+ if count == 0:
+ self.db.execute("INSERT INTO images (id, imagePath) VALUES (?, ?);", (id, filename) )
+ self.db.commit()
+
+ try:
+ object_size += os.path.getsize (filename)
+ except os.error, exception:
+ logger.error ("Error getting size of %s: %s"
+ % (filename, exception))
+ self.serial_execution_lock.acquire ()
+ have_serial_execution_lock = True
+
tmpEntry["contentLink"] = configdir+self.key+".d/"+id+".html"
file = open(tmpEntry["contentLink"], "w")
file.write(soup.prettify())
file.close()
- values = (id, tmpEntry["title"], tmpEntry["contentLink"], tmpEntry["date"], currentTime, tmpEntry["link"], 0)
- self.db.execute("INSERT INTO feed (id, title, contentLink, date, updated, link, read) VALUES (?, ?, ?, ?, ?, ?, ?);", values)
- else:
- try:
- filename = configdir+self.key+".d/"+id+".html"
- file = open(filename,"a")
- utime(filename, None)
- file.close()
- images = self.db.execute("SELECT imagePath FROM images where id=?;", (id, )).fetchall()
- for image in images:
- file = open(image[0],"a")
- utime(image[0], None)
- file.close()
- except:
- pass
- self.db.commit()
-
-
- rows = self.db.execute("SELECT id FROM feed WHERE (read=0 AND updated<?) OR (read=1 AND updated<?);", (2*expiry, expiry))
- for row in rows:
+
+ values = {'id': id,
+ 'title': tmpEntry["title"],
+ 'contentLink': tmpEntry["contentLink"],
+ 'date': tmpEntry["date"],
+ 'updated': currentTime,
+ 'link': tmpEntry["link"],
+ 'read': 0}
+
+ if current_version is not None:
+ # This is an update. Ensure that the existing
+ # entry is replaced.
+ values['ROWID'] = current_version[1]
+
+ cols, values = zip(*values.items())
+ self.db.execute(
+ "INSERT OR REPLACE INTO feed (%s) VALUES (%s);"
+ % (','.join(cols), ','.join(('?',) * len(values))),
+ values)
+ self.db.commit()
+
+ # Register the object with Woodchuck and mark it as
+ # downloaded.
+ def register_object_transferred(
+ id, title, publication_time,
+ sent, received, object_size):
+ def doit():
+ logger.debug("Registering transfer of object %s"
+ % title)
+ try:
+ obj = wc()[self.key].object_register(
+ object_identifier=id,
+ human_readable_name=title)
+ except woodchuck.ObjectExistsError:
+ obj = wc()[self.key][id]
+ else:
+ obj.publication_time = publication_time
+ obj.transferred(
+ indicator=(
+ woodchuck.Indicator.ApplicationVisual
+ |woodchuck.Indicator.StreamWide),
+ transferred_down=received,
+ transferred_up=sent,
+ object_size=object_size)
+ return doit
+ if wc().available:
+ # If the entry does not contain a publication
+ # time, the attribute won't exist.
+ pubtime = entry.get('date_parsed', None)
+ if pubtime:
+ publication_time = time.mktime (pubtime)
+ else:
+ publication_time = None
+
+ sent, received, _ \
+ = entry_transfer_stats(**progress_handler.stats)
+ # sent and received are for objects (in
+ # particular, images) associated with this
+ # item. We also want to attribute the data
+ # transferred for the item's content. This is
+ # a good first approximation.
+ received += len(content)
+
+ mainthread.execute(
+ register_object_transferred(
+ id=id,
+ title=tmpEntry["title"],
+ publication_time=publication_time,
+ sent=sent, received=received,
+ object_size=object_size),
+ async=True)
+ self.db.commit()
+
+ sent, received, _ \
+ = feed_transfer_stats(**progress_handler.stats)
+ logger.debug (
+ "%s: Update successful: transferred: %d/%d; objects: %d)"
+ % (url, sent, received, len (tmp.entries)))
+ mainthread.execute (wc_success, async=True)
+ success = True
+
+ rows = self.db.execute("SELECT id FROM feed WHERE (read=0 AND updated<?) OR (read=1 AND updated<?);", (currentTime-2*expiry, currentTime-expiry))
+ for row in rows:
self.removeEntry(row[0])
- from glob import glob
- from os import stat
- for file in glob(configdir+self.key+".d/*"):
+ from glob import glob
+ from os import stat
+ for file in glob(configdir+self.key+".d/*"):
#
stats = stat(file)
#
#
#print 'Removing', file
#
+ # XXX: Tell woodchuck.
remove(file) # commented out for testing
#
- except OSError:
+ except OSError, exception:
#
- print 'Could not remove', file
- return (currentTime, etag, modified)
-
+ logger.error('Could not remove %s: %s'
+ % (file, str (exception)))
+ logger.debug("updated %s: %fs in download, %fs in processing"
+ % (self.key, download_duration,
+ time.time () - process_start))
+ except:
+ logger.error("Updating %s: %s" % (self.key, traceback.format_exc()))
+ finally:
+ self.db.commit ()
+
+ if have_serial_execution_lock:
+ self.serial_execution_lock.release ()
+
+ updateTime = 0
+ try:
+ rows = self.db.execute("SELECT MAX(date) FROM feed;")
+ for row in rows:
+ updateTime=row[0]
+ except Exception, e:
+ logger.error("Fetching update time: %s: %s"
+ % (str(e), traceback.format_exc()))
+ finally:
+ if not success:
+ etag = None
+ modified = None
+ title = None
+ try:
+ title = tmp.feed.title
+ except (AttributeError, UnboundLocalError), exception:
+ pass
+ if postFeedUpdateFunc is not None:
+ postFeedUpdateFunc (self.key, updateTime, etag, modified,
+ title, *postFeedUpdateFuncArgs)
+
+ self.cache_invalidate()
+
def setEntryRead(self, id):
self.db.execute("UPDATE feed SET read=1 WHERE id=?;", (id,) )
self.db.commit()
-
+
+ def doit():
+ try:
+ wc()[self.key][id].used()
+ except KeyError:
+ pass
+ if wc().available():
+ mainthread.execute(doit, async=True)
+ self.cache_invalidate('feed')
+
def setEntryUnread(self, id):
self.db.execute("UPDATE feed SET read=0 WHERE id=?;", (id,) )
self.db.commit()
+ self.cache_invalidate('feed')
def markAllAsRead(self):
self.db.execute("UPDATE feed SET read=1 WHERE read=0;")
self.db.commit()
+ self.cache_invalidate('feed')
def isEntryRead(self, id):
- read_status = self.db.execute("SELECT read FROM feed WHERE id=?;", (id,) ).fetchone()[0]
- return read_status==1 # Returns True if read==1, and False if read==0
+ return self.lookup('feed', 'read', id) == 1
def getTitle(self, id):
- return self.db.execute("SELECT title FROM feed WHERE id=?;", (id,) ).fetchone()[0]
+ return self.lookup('feed', 'title', id)
def getContentLink(self, id):
return self.db.execute("SELECT contentLink FROM feed WHERE id=?;", (id,) ).fetchone()[0]
return self.db.execute("SELECT date FROM feed WHERE id=?;", (id,) ).fetchone()[0]
def generateUniqueId(self, entry):
- return getId(str(entry["date"]) + str(entry["title"]))
+ """
+ Generate a stable identifier for the article. For the same
+ entry, this should result in the same identifier. If
+ possible, the identifier should remain the same even if the
+ article is updated.
+ """
+ # Prefer the entry's id, which is supposed to be globally
+ # unique.
+ key = entry.get('id', None)
+ if not key:
+ # Next, try the link to the content.
+ key = entry.get('link', None)
+ if not key:
+ # Ok, the title and the date concatenated are likely to be
+ # relatively stable.
+ key = entry.get('title', None) + entry.get('date', None)
+ if not key:
+ # Hmm, the article's content will at least guarantee no
+ # false negatives (i.e., missing articles)
+ key = entry.get('content', None)
+ if not key:
+ # If all else fails, just use a random number.
+ key = str (random.random ())
+ return getId (key)
- def getIds(self):
- rows = self.db.execute("SELECT id FROM feed ORDER BY date DESC;").fetchall()
+ def getIds(self, onlyUnread=False):
+ if onlyUnread:
+ rows = self.db.execute("SELECT id FROM feed where read=0 ORDER BY date DESC;").fetchall()
+ else:
+ rows = self.db.execute("SELECT id FROM feed ORDER BY date DESC;").fetchall()
ids = []
for row in rows:
ids.append(row[0])
#ids.reverse()
return ids
- def getNextId(self, id):
+ def getNextId(self, id, forward=True):
+ if forward:
+ delta = 1
+ else:
+ delta = -1
ids = self.getIds()
index = ids.index(id)
- return ids[(index+1)%len(ids)]
+ return ids[(index + delta) % len(ids)]
def getPreviousId(self, id):
- ids = self.getIds()
- index = ids.index(id)
- return ids[(index-1)%len(ids)]
+ return self.getNextId(id, forward=False)
def getNumberOfUnreadItems(self):
return self.db.execute("SELECT count(*) FROM feed WHERE read=0;").fetchone()[0]
content = entry["content"]
link = entry['link']
+ author = entry['author']
date = time.strftime("%a, %d %b %Y %H:%M:%S", time.localtime(entry["date"]) )
#text = '''<div style="color: black; background-color: white;">'''
text += "<html><head><title>" + title + "</title>"
text += '<meta http-equiv="Content-Type" content="text/html; charset=UTF-8"/>\n'
#text += '<style> body {-webkit-user-select: none;} </style>'
- text += '</head><body><div><a href=\"' + link + '\">' + title + "</a>"
+ text += '</head><body bgcolor=\"#ffffff\"><div><a href=\"' + link + '\">' + title + "</a>"
+ if author != None:
+ text += "<BR /><small><i>Author: " + author + "</i></small>"
text += "<BR /><small><i>Date: " + date + "</i></small></div>"
text += "<BR /><BR />"
text += content
def extractDate(self, entry):
if entry.has_key("updated_parsed"):
- return time.mktime(entry["updated_parsed"])
+ return timegm(entry["updated_parsed"])
elif entry.has_key("published_parsed"):
- return time.mktime(entry["published_parsed"])
+ return timegm(entry["published_parsed"])
else:
- return 0
+ return time.time()
def extractContent(self, entry):
content = ""
contentLink = self.db.execute("SELECT contentLink FROM feed WHERE id=?;", (id,)).fetchone()[0]
if contentLink:
try:
- os.remove(contentLink)
- except:
- print "File not found for deletion: %s" % contentLink
+ remove(contentLink)
+ except OSError, exception:
+ logger.error("Deleting %s: %s" % (contentLink, str (exception)))
self.db.execute("DELETE FROM feed WHERE id=?;", (id,) )
self.db.execute("DELETE FROM images WHERE id=?;", (id,) )
self.db.commit()
+
+ def doit():
+ try:
+ wc()[self.key][id].files_deleted (
+ woodchuck.DeletionResponse.Deleted)
+ del wc()[self.key][id]
+ except KeyError:
+ pass
+ if wc().available():
+ mainthread.execute (doit, async=True)
class ArchivedArticles(Feed):
def addArchivedArticle(self, title, link, date, configdir):
self.db.commit()
def updateFeed(self, configdir, url, etag, modified, expiryTime=24, proxy=None, imageCache=False):
- currentTime = time.time()
+ currentTime = 0
rows = self.db.execute("SELECT id, link FROM feed WHERE updated=0;")
for row in rows:
+ currentTime = time.time()
id = row[0]
link = row[1]
f = urllib2.urlopen(link)
images = soup('img')
baseurl = link
for img in images:
- filename = self.addImage(configdir, self.key, baseurl, img['src'])
+ filename = self.addImage(configdir, self.key, baseurl, img['src'], proxy=proxy)
img['src']=filename
self.db.execute("INSERT INTO images (id, imagePath) VALUES (?, ?);", (id, filename) )
+ self.db.commit()
contentLink = configdir+self.key+".d/"+id+".html"
file = open(contentLink, "w")
file.write(soup.prettify())
#ids = self.getIds()
for row in rows:
self.removeArticle(row[0])
-
+
def removeArticle(self, id):
rows = self.db.execute("SELECT imagePath FROM images WHERE id=?;", (id,) )
for row in rows:
pass
self.removeEntry(id)
-class Listing:
+class Listing(BaseObject):
+ # Columns to cache.
+ cached_columns = (('feeds', 'updateTime'),
+ ('feeds', 'unread'),
+ ('feeds', 'title'),
+ ('categories', 'title'))
+
+ def _getdb(self):
+ try:
+ db = self.tls.db
+ except AttributeError:
+ db = sqlite3.connect("%s/feeds.db" % self.configdir, timeout=120)
+ self.tls.db = db
+ return db
+ db = property(_getdb)
+
# Lists all the feeds in a dictionary, and expose the data
- def __init__(self, configdir):
+ def __init__(self, config, configdir):
+ self.config = config
self.configdir = configdir
-
- self.db = sqlite3.connect("%s/feeds.db" % self.configdir)
+
+ self.tls = threading.local ()
try:
- self.db.execute("create table feeds(id text, url text, title text, unread int, updateTime float, rank int, etag text, modified text);")
- if isfile(self.configdir+"feeds.pickle"):
- self.importOldFormatFeeds()
+ table = self.db.execute("SELECT sql FROM sqlite_master").fetchone()
+ if table == None:
+ self.db.execute("CREATE TABLE feeds(id text, url text, title text, unread int, updateTime float, rank int, etag text, modified text, widget int, category int);")
+ self.db.execute("CREATE TABLE categories(id text, title text, unread int, rank int);")
+ self.addCategory("Default Category")
+ if isfile(self.configdir+"feeds.pickle"):
+ self.importOldFormatFeeds()
+ else:
+ self.addFeed("Maemo News", "http://maemo.org/news/items.xml")
else:
- self.addFeed("Maemo News", "http://maemo.org/news/items.xml")
+ from string import find, upper
+ if find(upper(table[0]), "WIDGET")<0:
+ self.db.execute("ALTER TABLE feeds ADD COLUMN widget int;")
+ self.db.execute("UPDATE feeds SET widget=1;")
+ self.db.commit()
+ if find(upper(table[0]), "CATEGORY")<0:
+ self.db.execute("CREATE TABLE categories(id text, title text, unread int, rank int);")
+ self.addCategory("Default Category")
+ self.db.execute("ALTER TABLE feeds ADD COLUMN category int;")
+ self.db.execute("UPDATE feeds SET category=1;")
+ self.db.commit()
except:
- # Table already created
pass
-
+
+ # Check that Woodchuck's state is up to date with respect our
+ # state.
+ try:
+ updater = os.path.basename(sys.argv[0]) == 'update_feeds.py'
+ wc_init (self, True if updater else False)
+ if wc().available() and updater:
+ # The list of known streams.
+ streams = wc().streams_list ()
+ stream_ids = [s.identifier for s in streams]
+
+ # Register any unknown streams. Remove known streams from
+ # STREAMS_IDS.
+ for key in self.getListOfFeeds():
+ title = self.getFeedTitle(key)
+ # XXX: We should also check whether the list of
+ # articles/objects in each feed/stream is up to date.
+ if key not in stream_ids:
+ logger.debug(
+ "Registering previously unknown channel: %s (%s)"
+ % (key, title,))
+ # Use a default refresh interval of 6 hours.
+ wc().stream_register (key, title, 6 * 60 * 60)
+ else:
+ # Make sure the human readable name is up to date.
+ if wc()[key].human_readable_name != title:
+ wc()[key].human_readable_name = title
+ stream_ids.remove (key)
+
+
+ # Unregister any streams that are no longer subscribed to.
+ for id in stream_ids:
+ logger.debug("Unregistering %s" % (id,))
+ w.stream_unregister (id)
+ except Exception:
+ logger.exception("Registering streams with Woodchuck")
+
def importOldFormatFeeds(self):
"""This function loads feeds that are saved in an outdated format, and converts them to sqlite"""
import rss
for id in listing.getListOfFeeds():
try:
rank += 1
- values = (id, listing.getFeedTitle(id) , listing.getFeedUrl(id), 0, time.time(), rank, None, "None")
- self.db.execute("INSERT INTO feeds (id, title, url, unread, updateTime, rank, etag, modified) VALUES (?, ?, ? ,? ,? ,?, ?, ?);", values)
+ values = (id, listing.getFeedTitle(id) , listing.getFeedUrl(id), 0, time.time(), rank, None, "None", 1)
+ self.db.execute("INSERT INTO feeds (id, title, url, unread, updateTime, rank, etag, modified, widget, category) VALUES (?, ?, ? ,? ,? ,?, ?, ?, ?, 1);", values)
self.db.commit()
feed = listing.getFeed(id)
read_status = 1
else:
read_status = 0
- date = time.mktime(feed.getDateTuple(item))
+ date = timegm(feed.getDateTuple(item))
title = feed.getTitle(item)
newId = new_feed.generateUniqueId({"date":date, "title":title})
- values = (newId, title , feed.getContentLink(item), date, time.time(), feed.getExternalLink(item), read_status)
+ values = (newId, title , feed.getContentLink(item), date, tuple(time.time()), feed.getExternalLink(item), read_status)
new_feed.db.execute("INSERT INTO feed (id, title, contentLink, date, updated, link, read) VALUES (?, ?, ?, ?, ?, ?, ?);", values)
new_feed.db.commit()
try:
pass
self.updateUnread(id)
except:
- import traceback
- traceback.print_exc()
+ logger.error("importOldFormatFeeds: %s"
+ % (traceback.format_exc(),))
remove(self.configdir+"feeds.pickle")
archFeed = self.getFeed("ArchivedArticles")
archFeed.addArchivedArticle(title, link, date, self.configdir)
self.updateUnread("ArchivedArticles")
-
- def updateFeed(self, key, expiryTime=24, proxy=None, imageCache=False):
+
+ def updateFeed(self, key, expiryTime=None, proxy=None, imageCache=None,
+ priority=0):
+ if expiryTime is None:
+ expiryTime = self.config.getExpiry()
+ if not expiryTime:
+ # Default to 24 hours
+ expriyTime = 24
+ if proxy is None:
+ (use_proxy, proxy) = self.config.getProxy()
+ if not use_proxy:
+ proxy = None
+ if imageCache is None:
+ imageCache = self.config.getImageCache()
+
feed = self.getFeed(key)
- db = sqlite3.connect("%s/feeds.db" % self.configdir)
- (url, etag, modified) = db.execute("SELECT url, etag, modified FROM feeds WHERE id=?;", (key,) ).fetchone()
- (updateTime, etag, modified) = feed.updateFeed(self.configdir, url, etag, eval(modified), expiryTime, proxy, imageCache)
- db.execute("UPDATE feeds SET updateTime=?, etag=?, modified=? WHERE id=?;", (updateTime, etag, str(modified), key) )
- db.commit()
- self.updateUnread(key, db=db)
+ (url, etag, modified) = self.db.execute("SELECT url, etag, modified FROM feeds WHERE id=?;", (key,) ).fetchone()
+ try:
+ modified = time.struct_time(eval(modified))
+ except:
+ modified = None
+ feed.updateFeed(
+ self.configdir, url, etag, modified, expiryTime, proxy, imageCache,
+ priority, postFeedUpdateFunc=self._queuePostFeedUpdate)
+
+ def _queuePostFeedUpdate(self, *args, **kwargs):
+ mainthread.execute (self._postFeedUpdate, async=True, *args, **kwargs)
+
+ def _postFeedUpdate(self, key, updateTime, etag, modified, title):
+ if modified==None:
+ modified="None"
+ else:
+ modified=str(tuple(modified))
+ if updateTime > 0:
+ self.db.execute("UPDATE feeds SET updateTime=?, etag=?, modified=? WHERE id=?;", (updateTime, etag, modified, key) )
+ else:
+ self.db.execute("UPDATE feeds SET etag=?, modified=? WHERE id=?;", (etag, modified, key) )
+
+ if title is not None:
+ self.db.execute("UPDATE feeds SET title=(case WHEN title=='' THEN ? ELSE title END) where id=?;",
+ (title, key))
+ self.db.commit()
+ self.cache_invalidate('feeds')
+ self.updateUnread(key)
+
+ update_server_object().ArticleCountUpdated()
+
+ stats = JobManager().stats()
+ global jobs_at_start
+ completed = stats['jobs-completed'] - jobs_at_start
+ in_progress = stats['jobs-in-progress']
+ queued = stats['jobs-queued']
+
+ try:
+ percent = (100 * ((completed + in_progress / 2.))
+ / (completed + in_progress + queued))
+ except ZeroDivisionError:
+ percent = 100
+
+ update_server_object().UpdateProgress(
+ percent, completed, in_progress, queued, 0, 0, 0, key)
+
+ if in_progress == 0 and queued == 0:
+ jobs_at_start = stats['jobs-completed']
def getFeed(self, key):
if key == "ArchivedArticles":
return ArchivedArticles(self.configdir, key)
return Feed(self.configdir, key)
- def editFeed(self, key, title, url):
- self.db.execute("UPDATE feeds SET title=?, url=? WHERE id=?;", (title, url, key))
+ def editFeed(self, key, title, url, category=None):
+ if category:
+ self.db.execute("UPDATE feeds SET title=?, url=?, category=? WHERE id=?;", (title, url, category, key))
+ else:
+ self.db.execute("UPDATE feeds SET title=?, url=? WHERE id=?;", (title, url, key))
self.db.commit()
+ self.cache_invalidate('feeds')
+
+ if wc().available():
+ try:
+ wc()[key].human_readable_name = title
+ except KeyError:
+ logger.debug("Feed %s (%s) unknown." % (key, title))
def getFeedUpdateTime(self, key):
- return time.ctime(self.db.execute("SELECT updateTime FROM feeds WHERE id=?;", (key,)).fetchone()[0])
+ update_time = self.lookup('feeds', 'updateTime', key)
+
+ if not update_time:
+ return "Never"
+
+ delta = time.time() - update_time
+
+ delta_hours = delta / (60. * 60.)
+ if delta_hours < .1:
+ return "A few minutes ago"
+ if delta_hours < .75:
+ return "Less than an hour ago"
+ if delta_hours < 1.5:
+ return "About an hour ago"
+ if delta_hours < 18:
+ return "About %d hours ago" % (int(delta_hours + 0.5),)
+
+ delta_days = delta_hours / 24.
+ if delta_days < 1.5:
+ return "About a day ago"
+ if delta_days < 18:
+ return "%d days ago" % (int(delta_days + 0.5),)
+
+ delta_weeks = delta_days / 7.
+ if delta_weeks <= 8:
+ return "%d weeks ago" % int(delta_weeks + 0.5)
+
+ delta_months = delta_days / 30.
+ if delta_months <= 30:
+ return "%d months ago" % int(delta_months + 0.5)
+
+ return time.strftime("%x", time.gmtime(update_time))
def getFeedNumberOfUnreadItems(self, key):
- return self.db.execute("SELECT unread FROM feeds WHERE id=?;", (key,)).fetchone()[0]
+ return self.lookup('feeds', 'unread', key)
def getFeedTitle(self, key):
- return self.db.execute("SELECT title FROM feeds WHERE id=?;", (key,)).fetchone()[0]
+ title = self.lookup('feeds', 'title', key)
+ if title:
+ return title
+
+ return self.getFeedUrl(key)
def getFeedUrl(self, key):
return self.db.execute("SELECT url FROM feeds WHERE id=?;", (key,)).fetchone()[0]
+
+ def getFeedCategory(self, key):
+ return self.db.execute("SELECT category FROM feeds WHERE id=?;", (key,)).fetchone()[0]
- def getListOfFeeds(self):
- rows = self.db.execute("SELECT id FROM feeds ORDER BY rank;" )
+ def getListOfFeeds(self, category=None):
+ if category:
+ rows = self.db.execute("SELECT id FROM feeds WHERE category=? ORDER BY rank;", (category, ) )
+ else:
+ rows = self.db.execute("SELECT id FROM feeds ORDER BY rank;" )
keys = []
for row in rows:
if row[0]:
keys.append(row[0])
return keys
- def getSortedListOfKeys(self, order):
+ def getListOfCategories(self):
+ return list(row[0] for row in self.db.execute(
+ "SELECT id FROM categories ORDER BY rank;"))
+
+ def getCategoryTitle(self, id):
+ return self.lookup('categories', 'title', id)
+
+ def getSortedListOfKeys(self, order, onlyUnread=False, category=1):
if order == "Most unread":
- tmp = "ORDER BY unread"
+ tmp = "ORDER BY unread DESC"
#keyorder = sorted(feedInfo, key = lambda k: feedInfo[k][1], reverse=True)
elif order == "Least unread":
- tmp = "ORDER BY unread DESC"
+ tmp = "ORDER BY unread"
#keyorder = sorted(feedInfo, key = lambda k: feedInfo[k][1])
elif order == "Most recent":
- tmp = "ORDER BY updateTime"
+ tmp = "ORDER BY updateTime DESC"
#keyorder = sorted(feedInfo, key = lambda k: feedInfo[k][2], reverse=True)
elif order == "Least recent":
- tmp = "ORDER BY updateTime DESC"
+ tmp = "ORDER BY updateTime"
#keyorder = sorted(feedInfo, key = lambda k: feedInfo[k][2])
else: # order == "Manual" or invalid value...
tmp = "ORDER BY rank"
#keyorder = sorted(feedInfo, key = lambda k: feedInfo[k][0])
- sql = "SELECT id FROM feeds " + tmp
+ if onlyUnread:
+ sql = "SELECT id FROM feeds WHERE unread>0 AND category=%s " %category + tmp
+ else:
+ sql = "SELECT id FROM feeds WHERE category=%s " %category + tmp
rows = self.db.execute(sql)
keys = []
for row in rows:
else:
return False
- def updateUnread(self, key, db=None):
- if db == None:
- db = self.db
+ def updateUnread(self, key):
feed = self.getFeed(key)
- db.execute("UPDATE feeds SET unread=? WHERE id=?;", (feed.getNumberOfUnreadItems(), key))
- db.commit()
+ self.db.execute("UPDATE feeds SET unread=? WHERE id=?;", (feed.getNumberOfUnreadItems(), key))
+ self.db.commit()
+ self.cache_invalidate('feeds')
- def addFeed(self, title, url, id=None):
+ def addFeed(self, title, url, id=None, category=1):
if not id:
- id = getId(title)
+ id = getId(url)
count = self.db.execute("SELECT count(*) FROM feeds WHERE id=?;", (id,) ).fetchone()[0]
if count == 0:
max_rank = self.db.execute("SELECT MAX(rank) FROM feeds;").fetchone()[0]
if max_rank == None:
max_rank = 0
- values = (id, title, url, 0, 0, max_rank+1, None, "None")
- self.db.execute("INSERT INTO feeds (id, title, url, unread, updateTime, rank, etag, modified) VALUES (?, ?, ? ,? ,? ,?, ?, ?);", values)
+ values = (id, title, url, 0, 0, max_rank+1, None, "None", 1, category)
+ self.db.execute("INSERT INTO feeds (id, title, url, unread, updateTime, rank, etag, modified, widget, category) VALUES (?, ?, ? ,? ,? ,?, ?, ?, ?,?);", values)
self.db.commit()
# Ask for the feed object, it will create the necessary tables
self.getFeed(id)
+
+ if wc().available():
+ # Register the stream with Woodchuck. Update approximately
+ # every 6 hours.
+ wc().stream_register(stream_identifier=id,
+ human_readable_name=title,
+ freshness=6*60*60)
+
return True
else:
return False
+
+ def addCategory(self, title):
+ rank = self.db.execute("SELECT MAX(rank)+1 FROM categories;").fetchone()[0]
+ if rank==None:
+ rank=1
+ id = self.db.execute("SELECT MAX(id)+1 FROM categories;").fetchone()[0]
+ if id==None:
+ id=1
+ self.db.execute("INSERT INTO categories (id, title, unread, rank) VALUES (?, ?, 0, ?)", (id, title, rank))
+ self.db.commit()
def removeFeed(self, key):
+ if wc().available ():
+ try:
+ del wc()[key]
+ except KeyError:
+ logger.debug("Removing unregistered feed %s failed" % (key,))
+
rank = self.db.execute("SELECT rank FROM feeds WHERE id=?;", (key,) ).fetchone()[0]
self.db.execute("DELETE FROM feeds WHERE id=?;", (key, ))
self.db.execute("UPDATE feeds SET rank=rank-1 WHERE rank>?;", (rank,) )
if isdir(self.configdir+key+".d/"):
rmtree(self.configdir+key+".d/")
- #self.saveConfig()
+
+ def removeCategory(self, key):
+ if self.db.execute("SELECT count(*) FROM categories;").fetchone()[0] > 1:
+ rank = self.db.execute("SELECT rank FROM categories WHERE id=?;", (key,) ).fetchone()[0]
+ self.db.execute("DELETE FROM categories WHERE id=?;", (key, ))
+ self.db.execute("UPDATE categories SET rank=rank-1 WHERE rank>?;", (rank,) )
+ self.db.execute("UPDATE feeds SET category=1 WHERE category=?;", (key,) )
+ self.db.commit()
#def saveConfig(self):
# self.listOfFeeds["feedingit-order"] = self.sortedKeys
self.db.execute("UPDATE feeds SET rank=? WHERE rank=?;", (rank, rank-1) )
self.db.execute("UPDATE feeds SET rank=? WHERE id=?;", (rank-1, key) )
self.db.commit()
+
+ def moveCategoryUp(self, key):
+ rank = self.db.execute("SELECT rank FROM categories WHERE id=?;", (key,)).fetchone()[0]
+ if rank>0:
+ self.db.execute("UPDATE categories SET rank=? WHERE rank=?;", (rank, rank-1) )
+ self.db.execute("UPDATE categories SET rank=? WHERE id=?;", (rank-1, key) )
+ self.db.commit()
def moveDown(self, key):
rank = self.db.execute("SELECT rank FROM feeds WHERE id=?;", (key,)).fetchone()[0]
self.db.execute("UPDATE feeds SET rank=? WHERE id=?;", (rank+1, key) )
self.db.commit()
+ def moveCategoryDown(self, key):
+ rank = self.db.execute("SELECT rank FROM categories WHERE id=?;", (key,)).fetchone()[0]
+ max_rank = self.db.execute("SELECT MAX(rank) FROM categories;").fetchone()[0]
+ if rank<max_rank:
+ self.db.execute("UPDATE categories SET rank=? WHERE rank=?;", (rank, rank+1) )
+ self.db.execute("UPDATE categories SET rank=? WHERE id=?;", (rank+1, key) )
+ self.db.commit()