From 228a823e3e7bf2cbf419fe83cad32d135d6896bb Mon Sep 17 00:00:00 2001 From: Ed Page Date: Fri, 26 Mar 2010 18:54:39 -0500 Subject: [PATCH] Improving support for a worker thread and moving more code over to it --- src/channel/call.py | 22 +++++++++----------- src/channel/debug_prompt.py | 45 ++++++++++++++++++++++++++++++++++++---- src/channel/text.py | 11 +++++----- src/gvoice/addressbook.py | 16 +++++++-------- src/gvoice/backend.py | 48 +++++++++++++++++++++++++------------------ src/gvoice/conversations.py | 17 ++++++++------- src/gvoice/session.py | 32 +++++++++++++++-------------- src/util/go_utils.py | 40 +++++++++++++++++++++++++++--------- 8 files changed, 147 insertions(+), 84 deletions(-) diff --git a/src/channel/call.py b/src/channel/call.py index 2d3af62..0fd08ea 100644 --- a/src/channel/call.py +++ b/src/channel/call.py @@ -104,7 +104,7 @@ class CallChannel( self.remove_from_connection() if self.__calledNumber is not None: - le = gobject_utils.LinearExecution(self._cancel) + le = gobject_utils.AsyncLinearExecution(self._conn.session.pool, self._cancel) le.start() @misc_utils.log_exception(_moduleLogger) @@ -163,7 +163,7 @@ class CallChannel( contact = self._conn.get_handle_by_id(telepathy.constants.HANDLE_TYPE_CONTACT, contactId) assert self.__contactHandle == contact, "%r != %r" % (self.__contactHandle, contact) - le = gobject_utils.LinearExecution(self._call) + le = gobject_utils.AsyncLinearExecution(self._conn.session.pool, self._call) le.start(contact) streamId = 0 @@ -172,20 +172,19 @@ class CallChannel( pendingSendFlags = telepathy.constants.MEDIA_STREAM_PENDING_REMOTE_SEND return [(streamId, contact, streamTypes[0], streamState, streamDirection, pendingSendFlags)] - def _call(self, contact, on_success, on_error): + @misc_utils.log_exception(_moduleLogger) + def _call(self, contact): contactNumber = contact.phoneNumber self.__calledNumber = contactNumber self.CallStateChanged(self.__contactHandle, telepathy.constants.CHANNEL_CALL_STATE_RINGING) try: - result = yield self._conn.session.pool.add_task, ( + result = yield ( self._conn.session.backend.call, (contactNumber, ), {}, - on_success, - on_error, - ), {} + ) except Exception: _moduleLogger.exception(result) return @@ -193,16 +192,15 @@ class CallChannel( self._delayedClose.start(seconds=0) self.CallStateChanged(self.__contactHandle, telepathy.constants.CHANNEL_CALL_STATE_FORWARDED) - def _cancel(self, on_success, on_error): + @misc_utils.log_exception(_moduleLogger) + def _cancel(self): _moduleLogger.debug("Cancelling call") try: - result = yield self._conn.session.pool.add_task, ( + result = yield ( self._conn.session.backend.cancel, (self.__calledNumber, ), {}, - on_success, - on_error, - ), {} + ) except Exception: _moduleLogger.exception(result) return diff --git a/src/channel/debug_prompt.py b/src/channel/debug_prompt.py index 2104917..47a31dd 100644 --- a/src/channel/debug_prompt.py +++ b/src/channel/debug_prompt.py @@ -12,6 +12,7 @@ import telepathy import constants import tp import util.misc as misc_utils +import util.go_utils as gobject_utils import gvoice @@ -147,12 +148,21 @@ class DebugPromptChannel(tp.ChannelTypeText, cmd.Cmd): self._report_new_message("Prints the current setting for the state machines") def do_is_authed(self, args): + le = gobject_utils.AsyncLinearExecution(self._conn.session.pool, self._is_authed) + le.start(args) + + @misc_utils.log_exception(_moduleLogger) + def _is_authed(self, args): if args: self._report_new_message("No arguments supported") return try: - isAuthed = self._conn.session.backend.is_authed() + isAuthed = yield ( + self._conn.session.backend.is_authed, + (), + {} + ) self._report_new_message(str(isAuthed)) except Exception, e: self._report_new_message(str(e)) @@ -161,12 +171,21 @@ class DebugPromptChannel(tp.ChannelTypeText, cmd.Cmd): self._report_new_message("Print whether logged in to Google Voice") def do_is_dnd(self, args): + le = gobject_utils.AsyncLinearExecution(self._conn.session.pool, self._is_dnd) + le.start(args) + + @misc_utils.log_exception(_moduleLogger) + def _is_dnd(self, args): if args: self._report_new_message("No arguments supported") return try: - isDnd = self._conn.session.backend.is_dnd() + isDnd = yield ( + self._conn.session.backend.is_dnd, + (), + {} + ) self._report_new_message(str(isDnd)) except Exception, e: self._report_new_message(str(e)) @@ -235,13 +254,22 @@ class DebugPromptChannel(tp.ChannelTypeText, cmd.Cmd): self._report_new_message("Print the callback number currently enabled") def do_call(self, args): + le = gobject_utils.AsyncLinearExecution(self._conn.session.pool, self._call) + le.start(args) + + @misc_utils.log_exception(_moduleLogger) + def _call(self, args): if not args: self._report_new_message("Must specify the phone number and only the phone nunber") return try: number = args - self._conn.session.backend.call(number) + yield ( + self._conn.session.backend.call, + (number), + {} + ) except Exception, e: self._report_new_message(str(e)) @@ -249,6 +277,11 @@ class DebugPromptChannel(tp.ChannelTypeText, cmd.Cmd): self._report_new_message("\n".join(["call NUMBER", "Initiate a callback, Google forwarding the call to the callback number"])) def do_send_sms(self, args): + le = gobject_utils.AsyncLinearExecution(self._conn.session.pool, self._send_sms) + le.start(args) + + @misc_utils.log_exception(_moduleLogger) + def _send_sms(self, args): args = args.split(" ") if 1 < len(args): self._report_new_message("Must specify the phone number and then message") @@ -257,7 +290,11 @@ class DebugPromptChannel(tp.ChannelTypeText, cmd.Cmd): try: number = args[0] message = " ".join(args[1:]) - self._conn.session.backend.send_sms([number], message) + yield ( + self._conn.session.backend.send_sms, + ([number], message), + {}, + ) except Exception, e: self._report_new_message(str(e)) diff --git a/src/channel/text.py b/src/channel/text.py index 1fd7d1e..3b9b159 100644 --- a/src/channel/text.py +++ b/src/channel/text.py @@ -55,22 +55,21 @@ class TextChannel(tp.ChannelTypeText): @misc_utils.log_exception(_moduleLogger) def Send(self, messageType, text): - le = gobject_utils.LinearExecution(self._send) + le = gobject_utils.AsyncLinearExecution(self._conn.session.pool, self._send) le.start(messageType, text) - def _send(self, messageType, text, on_success, on_error): + @misc_utils.log_exception(_moduleLogger) + def _send(self, messageType, text): if messageType != telepathy.CHANNEL_TEXT_MESSAGE_TYPE_NORMAL: raise telepathy.errors.NotImplemented("Unhandled message type: %r" % messageType) _moduleLogger.info("Sending message to %r" % (self.__otherHandle, )) try: - result = yield self._conn.session.pool.add_task, ( + result = yield ( self._conn.session.backend.send_sms, ([self.__otherHandle.phoneNumber], text), {}, - on_success, - on_error, - ), {} + ) except Exception: _moduleLogger.exception(result) return diff --git a/src/gvoice/addressbook.py b/src/gvoice/addressbook.py index fea9ef6..2a5e389 100644 --- a/src/gvoice/addressbook.py +++ b/src/gvoice/addressbook.py @@ -5,6 +5,7 @@ import logging import util.coroutines as coroutines import util.misc as misc_utils +import util.go_utils as gobject_utils _moduleLogger = logging.getLogger(__name__) @@ -25,15 +26,18 @@ class Addressbook(object): def update(self, force=False): if not force and self._numbers: return - self._asyncPool.add_task( + + le = gobject_utils.AsyncLinearExecution(self._asyncPool, self._update) + le.start() + + @misc_utils.log_exception(_moduleLogger) + def _update(self): + contacts = yield ( self._backend.get_contacts, (), {}, - self._on_get_contacts, - self._on_get_contacts_failed, ) - def _on_get_contacts(self, contacts): oldContacts = self._numbers oldContactNumbers = set(self.get_numbers()) @@ -52,10 +56,6 @@ class Addressbook(object): message = self, addedContacts, removedContacts, changedContacts self.updateSignalHandler.stage.send(message) - @misc_utils.log_exception(_moduleLogger) - def _on_get_contacts_failed(self, error): - _moduleLogger.error(error) - def get_numbers(self): return self._numbers.iterkeys() diff --git a/src/gvoice/backend.py b/src/gvoice/backend.py index 9996e74..72eda38 100755 --- a/src/gvoice/backend.py +++ b/src/gvoice/backend.py @@ -473,18 +473,15 @@ class GVoiceBackend(object): @returns Iterable of (personsName, phoneNumber, exact date, relative date, action) @blocks """ - for action, url in ( - ("Received", self._XML_RECEIVED_URL), - ("Missed", self._XML_MISSED_URL), - ("Placed", self._XML_PLACED_URL), - ): - flatXml = self._get_page(url) - - allRecentHtml = self._grab_html(flatXml) - allRecentData = self._parse_history(allRecentHtml) - for recentCallData in allRecentData: - recentCallData["action"] = action - yield recentCallData + recentPages = [ + (action, self._get_page(url)) + for action, url in ( + ("Received", self._XML_RECEIVED_URL), + ("Missed", self._XML_MISSED_URL), + ("Placed", self._XML_PLACED_URL), + ) + ] + return self._parse_recent(recentPages) def get_contacts(self): """ @@ -492,14 +489,7 @@ class GVoiceBackend(object): @blocks """ page = self._get_page(self._XML_CONTACTS_URL) - contactsBody = self._contactsBodyRe.search(page) - if contactsBody is None: - raise RuntimeError("Could not extract contact information") - accountData = _fake_parse_json(contactsBody.group(1)) - for contactId, contactDetails in accountData["contacts"].iteritems(): - # A zero contact id is the catch all for unknown contacts - if contactId != "0": - yield contactId, contactDetails + return self._process_contacts(page) def get_voicemails(self): """ @@ -584,6 +574,24 @@ class GVoiceBackend(object): raise RuntimeError("Not Authenticated") return number + def _parse_recent(self, recentPages): + for action, flatXml in recentPages: + allRecentHtml = self._grab_html(flatXml) + allRecentData = self._parse_history(allRecentHtml) + for recentCallData in allRecentData: + recentCallData["action"] = action + yield recentCallData + + def _process_contacts(self, page): + contactsBody = self._contactsBodyRe.search(page) + if contactsBody is None: + raise RuntimeError("Could not extract contact information") + accountData = _fake_parse_json(contactsBody.group(1)) + for contactId, contactDetails in accountData["contacts"].iteritems(): + # A zero contact id is the catch all for unknown contacts + if contactId != "0": + yield contactId, contactDetails + def _parse_history(self, historyHtml): splitVoicemail = self._seperateVoicemailsRegex.split(historyHtml) for messageId, messageHtml in itergroup(splitVoicemail[1:], 2): diff --git a/src/gvoice/conversations.py b/src/gvoice/conversations.py index 86697ea..149cef6 100644 --- a/src/gvoice/conversations.py +++ b/src/gvoice/conversations.py @@ -14,6 +14,7 @@ except ImportError: import constants import util.coroutines as coroutines import util.misc as misc_utils +import util.go_utils as gobject_utils _moduleLogger = logging.getLogger(__name__) @@ -74,16 +75,18 @@ class Conversations(object): def update(self, force=False): if not force and self._conversations: return - self._asyncPool.add_task( + + le = gobject_utils.AsyncLinearExecution(self._asyncPool, self._update) + le.start() + + @misc_utils.log_exception(_moduleLogger) + def _update(self): + conversationResult = yield ( self._get_raw_conversations, (), {}, - self._on_get_conversations, - self._on_get_conversations_failed, ) - @misc_utils.log_exception(_moduleLogger) - def _on_get_conversations(self, conversationResult): oldConversationIds = set(self._conversations.iterkeys()) updateConversationIds = set() @@ -117,10 +120,6 @@ class Conversations(object): self.updateSignalHandler.stage.send(message) self._hasDoneUpdate = True - @misc_utils.log_exception(_moduleLogger) - def _on_get_conversations_failed(self, error): - _moduleLogger.error(error) - def get_conversations(self): return self._conversations.iterkeys() diff --git a/src/gvoice/session.py b/src/gvoice/session.py index 5fb5a95..8151c05 100644 --- a/src/gvoice/session.py +++ b/src/gvoice/session.py @@ -150,25 +150,27 @@ class Session(object): self._masterStateMachine.close() def login(self, username, password, on_success, on_error): - self._username = username - self._password = password self._asyncPool.start() - self._asyncPool.add_task( - self._backend.login, - (self._username, self._password), - {}, - self.__on_login_success(on_success), - on_error - ) - def __on_login_success(self, user_success): + le = gobject_utils.AsyncLinearExecution(self._asyncPool, self._login) + le.start(username, password, on_success, on_error) - @misc_utils.log_exception(_moduleLogger) - def _actual_success(*args, **kwds): - self._masterStateMachine.start() - user_success(*args, **kwds) + @misc_utils.log_exception(_moduleLogger) + def _login(self, username, password, on_success, on_error): + self._username = username + self._password = password + try: + isLoggedIn = yield ( + self._backend.login, + (self._username, self._password), + {}, + ) + except Exception, e: + on_error(e) + return - return _actual_success + self._masterStateMachine.start() + on_success(isLoggedIn) def logout(self): self._asyncPool.stop() diff --git a/src/util/go_utils.py b/src/util/go_utils.py index 38e20c2..52ccf92 100644 --- a/src/util/go_utils.py +++ b/src/util/go_utils.py @@ -168,6 +168,7 @@ class AsyncPool(object): try: callback(result) except Exception: + _moduleLogger.exception("Callback errored") pass return False @@ -183,6 +184,7 @@ class AsyncPool(object): result = func(*args, **kwds) isError = False except Exception, e: + _moduleLogger.error("Error, passing it back to the main thread") result = e isError = True self.__workQueue.task_done() @@ -190,38 +192,56 @@ class AsyncPool(object): gobject.idle_add(self.__trampoline_callback, on_success, on_error, isError, result) -class LinearExecution(object): +class AsyncLinearExecution(object): - def __init__(self, func): + def __init__(self, pool, func): + self._pool = pool self._func = func self._run = None def start(self, *args, **kwds): assert self._run is None - kwds["on_success"] = self.on_success - kwds["on_error"] = self.on_error self._run = self._func(*args, **kwds) trampoline, args, kwds = self._run.send(None) # priming the function - trampoline(*args, **kwds) + self._pool.add_task( + trampoline, + args, + kwds, + self.on_success, + self.on_error, + ) @misc.log_exception(_moduleLogger) - def on_success(self, *args, **kwds): - assert not kwds + def on_success(self, result): + _moduleLogger.debug("Processing success for: %r", self._func) try: - trampoline, args, kwds = self._run.send(args) + trampoline, args, kwds = self._run.send(result) except StopIteration, e: pass else: - trampoline(*args, **kwds) + self._pool.add_task( + trampoline, + args, + kwds, + self.on_success, + self.on_error, + ) @misc.log_exception(_moduleLogger) def on_error(self, error): + _moduleLogger.debug("Processing error for: %r", self._func) try: trampoline, args, kwds = self._run.throw(error) except StopIteration, e: pass else: - trampoline(*args, **kwds) + self._pool.add_task( + trampoline, + args, + kwds, + self.on_success, + self.on_error, + ) def throttled(minDelay, queue): -- 1.7.9.5