return urllib2.build_opener(*openers)
+def transfer_stats(sent, received, **kwargs):
+ """
+ This function takes two arguments: sent is the number of bytes
+ sent so far, received is the number of bytes received. The
+ function returns a continuation that you can call later.
+
+ The continuation takes the same two arguments. It returns a tuple
+ of the number of bytes sent, the number of bytes received and the
+ time since the original function was invoked.
+ """
+ start_time = time.time()
+ start_sent = sent
+ start_received = received
+
+ def e(sent, received, **kwargs):
+ return (sent - start_sent,
+ received - start_received,
+ time.time() - start_time)
+
+ return e
+
# If not None, a subprocess.Popen object corresponding to a
# update_feeds.py process.
update_feed_process = None
jobs_at_start = 0
-class Feed:
+class BaseObject(object):
+ # Columns to cache. Classes that inherit from this and use the
+ # cache mechanism should set this to a list of tuples, each of
+ # which contains two entries: the table and the column. Note that
+ # both are case sensitive.
+ cached_columns = ()
+
+ def cache_invalidate(self, table=None):
+ """
+ Invalidate the cache.
+
+ If table is not None, invalidate only the specified table.
+ Otherwise, drop the whole cache.
+ """
+ if not hasattr(self, 'cache'):
+ return
+
+ if table is None:
+ del self.cache
+ else:
+ if table in self.cache:
+ del self.cache[table]
+
+ def lookup(self, table, column, id=None):
+ """
+ Look up a column or value. Uses a cache for columns in
+ cached_columns. Note: the column is returned unsorted.
+ """
+ if not hasattr(self, 'cache'):
+ self.cache = {}
+
+ # Cache data for at most 60 seconds.
+ now = time.time()
+ try:
+ cache = self.cache[table]
+
+ if time.time() - cache[None] > 60:
+ # logger.debug("%s: Cache too old: clearing" % (table,))
+ del self.cache[table]
+ cache = None
+ except KeyError:
+ cache = None
+
+ if (cache is None
+ or (table, column) not in self.cached_columns):
+ # The cache is empty or the caller wants a column that we
+ # don't cache.
+ if (table, column) in self.cached_columns:
+ # logger.debug("%s: Rebuilding cache" % (table,))
+
+ do_cache = True
+
+ self.cache[table] = cache = {}
+ columns = []
+ for t, c in self.cached_columns:
+ if table == t:
+ cache[c] = {}
+ columns.append(c)
+
+ columns.append('id')
+ where = ""
+ else:
+ do_cache = False
+
+ columns = (colums,)
+ if id is not None:
+ where = "where id = '%s'" % id
+ else:
+ where = ""
+
+ results = self.db.execute(
+ "SELECT %s FROM %s %s" % (','.join(columns), table, where))
+
+ if do_cache:
+ for r in results:
+ values = list(r)
+ i = values.pop()
+ for index, value in enumerate(values):
+ cache[columns[index]][i] = value
+
+ cache[None] = now
+ else:
+ results = []
+ for r in results:
+ if id is not None:
+ return values[0]
+
+ results.append(values[0])
+
+ return results
+ else:
+ cache = self.cache[table]
+
+ try:
+ if id is not None:
+ value = cache[column][id]
+ # logger.debug("%s.%s:%s -> %s" % (table, column, id, value))
+ return value
+ else:
+ return cache[column].values()
+ except KeyError:
+ # logger.debug("%s.%s:%s -> Not found" % (table, column, id))
+ return None
+
+class Feed(BaseObject):
+ # Columns to cache.
+ cached_columns = (('feed', 'read'),
+ ('feed', 'title'))
+
serial_execution_lock = threading.Lock()
def _getdb(self):
except OSError:
pass
- raise exception
+ return None
else:
#open(filename,"a").close() # "Touch" the file
file = open(filename,"a")
time.sleep(1)
def _updateFeed(self, configdir, url, etag, modified, expiryTime=24, proxy=None, imageCache=False, postFeedUpdateFunc=None, *postFeedUpdateFuncArgs):
+ logger.debug("Updating %s" % url)
+
success = False
have_serial_execution_lock = False
try:
- download_start = time.time ()
+ update_start = time.time ()
progress_handler = HTTPProgressHandler(download_callback)
openers.append (proxy)
kwargs = {'handlers':openers}
+ feed_transfer_stats = transfer_stats(0, 0)
+
tmp=feedparser.parse(url, etag=etag, modified=modified, **kwargs)
- download_duration = time.time () - download_start
-
+ download_duration = time.time () - update_start
+
opener = downloader(progress_handler, proxy)
if JobManager().do_quit:
expiry = float(expiryTime) * 3600.
currentTime = 0
-
+
+ updated_objects = 0
+ new_objects = 0
+
def wc_success():
try:
wc().stream_register (self.key, "", 6 * 60 * 60)
|woodchuck.Indicator.StreamWide),
transferred_down=progress_handler.stats['received'],
transferred_up=progress_handler.stats['sent'],
- transfer_time=download_start,
+ transfer_time=update_start,
transfer_duration=download_duration,
- new_objects=len (tmp.entries),
- objects_inline=len (tmp.entries))
+ new_objects=new_objects,
+ updated_objects=updated_objects,
+ objects_inline=new_objects + updated_objects)
except KeyError:
logger.warn(
"Failed to register update of %s with woodchuck!"
# responsive.
time.sleep(0)
+ entry_transfer_stats = transfer_stats(
+ *feed_transfer_stats(**progress_handler.stats)[0:2])
+
if JobManager().do_quit:
raise KeyboardInterrupt
- received_base = progress_handler.stats['received']
- sent_base = progress_handler.stats['sent']
object_size = 0
date = self.extractDate(entry)
entry["id"] = None
content = self.extractContent(entry)
object_size = len (content)
- received_base -= len (content)
tmpEntry = {"title":entry["title"], "content":content,
"date":date, "link":entry["link"], "author":entry["author"], "id":entry["id"]}
id = self.generateUniqueId(tmpEntry)
+ current_version \
+ = self.db.execute('select date from feed where id=?',
+ (id,)).fetchone()
+ if (current_version is not None
+ and current_version[0] == date):
+ logger.debug("ALREADY DOWNLOADED %s (%s)"
+ % (entry["title"], entry["link"]))
+ continue
+
+ if current_version is not None:
+ # The version was updated. Mark it as unread.
+ logger.debug("UPDATED: %s (%s)"
+ % (entry["title"], entry["link"]))
+ self.setEntryUnread(id)
+ updated_objects += 1
+ else:
+ logger.debug("NEW: %s (%s)"
+ % (entry["title"], entry["link"]))
+ new_objects += 1
+
#articleTime = time.mktime(self.entries[id]["dateTuple"])
soup = BeautifulSoup(self.getArticle(tmpEntry)) #tmpEntry["content"])
images = soup('img')
else:
publication_time = None
- sent = progress_handler.stats['sent'] - sent_base
- received = (progress_handler.stats['received']
- - received_base)
+ sent, received, _ \
+ = entry_transfer_stats(**progress_handler.stats)
+ # sent and received are for objects (in
+ # particular, images) associated with this
+ # item. We also want to attribute the data
+ # transferred for the item's content. This is
+ # a good first approximation.
+ received += len(content)
mainthread.execute(
register_object_transferred(
async=True)
self.db.commit()
+ sent, received, _ \
+ = feed_transfer_stats(**progress_handler.stats)
logger.debug (
"%s: Update successful: transferred: %d/%d; objects: %d)"
- % (self.key,
- progress_handler.stats['sent'],
- progress_handler.stats['received'],
- len (tmp.entries)))
+ % (url, sent, received, len (tmp.entries)))
mainthread.execute (wc_success, async=True)
success = True
postFeedUpdateFunc (self.key, updateTime, etag, modified,
title, *postFeedUpdateFuncArgs)
+ self.cache_invalidate()
+
def setEntryRead(self, id):
self.db.execute("UPDATE feed SET read=1 WHERE id=?;", (id,) )
self.db.commit()
pass
if wc().available():
mainthread.execute(doit, async=True)
+ self.cache_invalidate('feed')
def setEntryUnread(self, id):
self.db.execute("UPDATE feed SET read=0 WHERE id=?;", (id,) )
self.db.commit()
+ self.cache_invalidate('feed')
def markAllAsRead(self):
self.db.execute("UPDATE feed SET read=1 WHERE read=0;")
self.db.commit()
+ self.cache_invalidate('feed')
def isEntryRead(self, id):
- read_status = self.db.execute("SELECT read FROM feed WHERE id=?;", (id,) ).fetchone()[0]
- return read_status==1 # Returns True if read==1, and False if read==0
+ return self.lookup('feed', 'read', id) == 1
def getTitle(self, id):
- return self.db.execute("SELECT title FROM feed WHERE id=?;", (id,) ).fetchone()[0]
+ return self.lookup('feed', 'title', id)
def getContentLink(self, id):
return self.db.execute("SELECT contentLink FROM feed WHERE id=?;", (id,) ).fetchone()[0]
pass
self.removeEntry(id)
-class Listing:
+class Listing(BaseObject):
+ # Columns to cache.
+ cached_columns = (('feeds', 'updateTime'),
+ ('feeds', 'unread'),
+ ('feeds', 'title'),
+ ('categories', 'title'))
+
def _getdb(self):
try:
db = self.tls.db
self.db.execute("UPDATE feeds SET title=(case WHEN title=='' THEN ? ELSE title END) where id=?;",
(title, key))
self.db.commit()
+ self.cache_invalidate('feeds')
self.updateUnread(key)
update_server_object().ArticleCountUpdated()
else:
self.db.execute("UPDATE feeds SET title=?, url=? WHERE id=?;", (title, url, key))
self.db.commit()
+ self.cache_invalidate('feeds')
if wc().available():
try:
logger.debug("Feed %s (%s) unknown." % (key, title))
def getFeedUpdateTime(self, key):
- update_time = self.db.execute(
- "SELECT updateTime FROM feeds WHERE id=?;", (key,)).fetchone()[0]
+ update_time = self.lookup('feeds', 'updateTime', key)
if not update_time:
return "Never"
return time.strftime("%x", time.gmtime(update_time))
def getFeedNumberOfUnreadItems(self, key):
- return self.db.execute("SELECT unread FROM feeds WHERE id=?;", (key,)).fetchone()[0]
+ return self.lookup('feeds', 'unread', key)
def getFeedTitle(self, key):
- (title, url) = self.db.execute("SELECT title, url FROM feeds WHERE id=?;", (key,)).fetchone()
+ title = self.lookup('feeds', 'title', key)
if title:
return title
- return url
+
+ return self.getFeedUrl(key)
def getFeedUrl(self, key):
return self.db.execute("SELECT url FROM feeds WHERE id=?;", (key,)).fetchone()[0]
return keys
def getListOfCategories(self):
- rows = self.db.execute("SELECT id FROM categories ORDER BY rank;" )
- keys = []
- for row in rows:
- if row[0]:
- keys.append(row[0])
- return keys
+ return list(row[0] for row in self.db.execute(
+ "SELECT id FROM categories ORDER BY rank;"))
def getCategoryTitle(self, id):
- row = self.db.execute("SELECT title FROM categories WHERE id=?;", (id, )).fetchone()
- return row[0]
+ return self.lookup('categories', 'title', id)
def getSortedListOfKeys(self, order, onlyUnread=False, category=1):
if order == "Most unread":
feed = self.getFeed(key)
self.db.execute("UPDATE feeds SET unread=? WHERE id=?;", (feed.getNumberOfUnreadItems(), key))
self.db.commit()
+ self.cache_invalidate('feeds')
def addFeed(self, title, url, id=None, category=1):
if not id: