Improving support for a worker thread and moving more code over to it
authorEd Page <eopage@byu.net>
Fri, 26 Mar 2010 23:54:39 +0000 (18:54 -0500)
committerEd Page <eopage@byu.net>
Fri, 26 Mar 2010 23:54:39 +0000 (18:54 -0500)
src/channel/call.py
src/channel/debug_prompt.py
src/channel/text.py
src/gvoice/addressbook.py
src/gvoice/backend.py
src/gvoice/conversations.py
src/gvoice/session.py
src/util/go_utils.py

index 2d3af62..0fd08ea 100644 (file)
@@ -104,7 +104,7 @@ class CallChannel(
                self.remove_from_connection()
 
                if self.__calledNumber is not None:
-                       le = gobject_utils.LinearExecution(self._cancel)
+                       le = gobject_utils.AsyncLinearExecution(self._conn.session.pool, self._cancel)
                        le.start()
 
        @misc_utils.log_exception(_moduleLogger)
@@ -163,7 +163,7 @@ class CallChannel(
                contact = self._conn.get_handle_by_id(telepathy.constants.HANDLE_TYPE_CONTACT, contactId)
                assert self.__contactHandle == contact, "%r != %r" % (self.__contactHandle, contact)
 
-               le = gobject_utils.LinearExecution(self._call)
+               le = gobject_utils.AsyncLinearExecution(self._conn.session.pool, self._call)
                le.start(contact)
 
                streamId = 0
@@ -172,20 +172,19 @@ class CallChannel(
                pendingSendFlags = telepathy.constants.MEDIA_STREAM_PENDING_REMOTE_SEND
                return [(streamId, contact, streamTypes[0], streamState, streamDirection, pendingSendFlags)]
 
-       def _call(self, contact, on_success, on_error):
+       @misc_utils.log_exception(_moduleLogger)
+       def _call(self, contact):
                contactNumber = contact.phoneNumber
 
                self.__calledNumber = contactNumber
                self.CallStateChanged(self.__contactHandle, telepathy.constants.CHANNEL_CALL_STATE_RINGING)
 
                try:
-                       result = yield self._conn.session.pool.add_task, (
+                       result = yield (
                                self._conn.session.backend.call,
                                (contactNumber, ),
                                {},
-                               on_success,
-                               on_error,
-                       ), {}
+                       )
                except Exception:
                        _moduleLogger.exception(result)
                        return
@@ -193,16 +192,15 @@ class CallChannel(
                self._delayedClose.start(seconds=0)
                self.CallStateChanged(self.__contactHandle, telepathy.constants.CHANNEL_CALL_STATE_FORWARDED)
 
-       def _cancel(self, on_success, on_error):
+       @misc_utils.log_exception(_moduleLogger)
+       def _cancel(self):
                _moduleLogger.debug("Cancelling call")
                try:
-                       result = yield self._conn.session.pool.add_task, (
+                       result = yield (
                                self._conn.session.backend.cancel,
                                (self.__calledNumber, ),
                                {},
-                               on_success,
-                               on_error,
-                       ), {}
+                       )
                except Exception:
                        _moduleLogger.exception(result)
                        return
index 2104917..47a31dd 100644 (file)
@@ -12,6 +12,7 @@ import telepathy
 import constants
 import tp
 import util.misc as misc_utils
+import util.go_utils as gobject_utils
 import gvoice
 
 
@@ -147,12 +148,21 @@ class DebugPromptChannel(tp.ChannelTypeText, cmd.Cmd):
                self._report_new_message("Prints the current setting for the state machines")
 
        def do_is_authed(self, args):
+               le = gobject_utils.AsyncLinearExecution(self._conn.session.pool, self._is_authed)
+               le.start(args)
+
+       @misc_utils.log_exception(_moduleLogger)
+       def _is_authed(self, args):
                if args:
                        self._report_new_message("No arguments supported")
                        return
 
                try:
-                       isAuthed = self._conn.session.backend.is_authed()
+                       isAuthed = yield (
+                               self._conn.session.backend.is_authed,
+                               (),
+                               {}
+                       )
                        self._report_new_message(str(isAuthed))
                except Exception, e:
                        self._report_new_message(str(e))
@@ -161,12 +171,21 @@ class DebugPromptChannel(tp.ChannelTypeText, cmd.Cmd):
                self._report_new_message("Print whether logged in to Google Voice")
 
        def do_is_dnd(self, args):
+               le = gobject_utils.AsyncLinearExecution(self._conn.session.pool, self._is_dnd)
+               le.start(args)
+
+       @misc_utils.log_exception(_moduleLogger)
+       def _is_dnd(self, args):
                if args:
                        self._report_new_message("No arguments supported")
                        return
 
                try:
-                       isDnd = self._conn.session.backend.is_dnd()
+                       isDnd = yield (
+                               self._conn.session.backend.is_dnd,
+                               (),
+                               {}
+                       )
                        self._report_new_message(str(isDnd))
                except Exception, e:
                        self._report_new_message(str(e))
@@ -235,13 +254,22 @@ class DebugPromptChannel(tp.ChannelTypeText, cmd.Cmd):
                self._report_new_message("Print the callback number currently enabled")
 
        def do_call(self, args):
+               le = gobject_utils.AsyncLinearExecution(self._conn.session.pool, self._call)
+               le.start(args)
+
+       @misc_utils.log_exception(_moduleLogger)
+       def _call(self, args):
                if not args:
                        self._report_new_message("Must specify the phone number and only the phone nunber")
                        return
 
                try:
                        number = args
-                       self._conn.session.backend.call(number)
+                       yield (
+                               self._conn.session.backend.call,
+                               (number),
+                               {}
+                       )
                except Exception, e:
                        self._report_new_message(str(e))
 
@@ -249,6 +277,11 @@ class DebugPromptChannel(tp.ChannelTypeText, cmd.Cmd):
                self._report_new_message("\n".join(["call NUMBER", "Initiate a callback, Google forwarding the call to the callback number"]))
 
        def do_send_sms(self, args):
+               le = gobject_utils.AsyncLinearExecution(self._conn.session.pool, self._send_sms)
+               le.start(args)
+
+       @misc_utils.log_exception(_moduleLogger)
+       def _send_sms(self, args):
                args = args.split(" ")
                if 1 < len(args):
                        self._report_new_message("Must specify the phone number and then message")
@@ -257,7 +290,11 @@ class DebugPromptChannel(tp.ChannelTypeText, cmd.Cmd):
                try:
                        number = args[0]
                        message = " ".join(args[1:])
-                       self._conn.session.backend.send_sms([number], message)
+                       yield (
+                               self._conn.session.backend.send_sms,
+                               ([number], message),
+                               {},
+                       )
                except Exception, e:
                        self._report_new_message(str(e))
 
index 1fd7d1e..3b9b159 100644 (file)
@@ -55,22 +55,21 @@ class TextChannel(tp.ChannelTypeText):
 
        @misc_utils.log_exception(_moduleLogger)
        def Send(self, messageType, text):
-               le = gobject_utils.LinearExecution(self._send)
+               le = gobject_utils.AsyncLinearExecution(self._conn.session.pool, self._send)
                le.start(messageType, text)
 
-       def _send(self, messageType, text, on_success, on_error):
+       @misc_utils.log_exception(_moduleLogger)
+       def _send(self, messageType, text):
                if messageType != telepathy.CHANNEL_TEXT_MESSAGE_TYPE_NORMAL:
                        raise telepathy.errors.NotImplemented("Unhandled message type: %r" % messageType)
 
                _moduleLogger.info("Sending message to %r" % (self.__otherHandle, ))
                try:
-                       result = yield self._conn.session.pool.add_task, (
+                       result = yield (
                                self._conn.session.backend.send_sms,
                                ([self.__otherHandle.phoneNumber], text),
                                {},
-                               on_success,
-                               on_error,
-                       ), {}
+                       )
                except Exception:
                        _moduleLogger.exception(result)
                        return
index fea9ef6..2a5e389 100644 (file)
@@ -5,6 +5,7 @@ import logging
 
 import util.coroutines as coroutines
 import util.misc as misc_utils
+import util.go_utils as gobject_utils
 
 
 _moduleLogger = logging.getLogger(__name__)
@@ -25,15 +26,18 @@ class Addressbook(object):
        def update(self, force=False):
                if not force and self._numbers:
                        return
-               self._asyncPool.add_task(
+
+               le = gobject_utils.AsyncLinearExecution(self._asyncPool, self._update)
+               le.start()
+
+       @misc_utils.log_exception(_moduleLogger)
+       def _update(self):
+               contacts = yield (
                        self._backend.get_contacts,
                        (),
                        {},
-                       self._on_get_contacts,
-                       self._on_get_contacts_failed,
                )
 
-       def _on_get_contacts(self, contacts):
                oldContacts = self._numbers
                oldContactNumbers = set(self.get_numbers())
 
@@ -52,10 +56,6 @@ class Addressbook(object):
                        message = self, addedContacts, removedContacts, changedContacts
                        self.updateSignalHandler.stage.send(message)
 
-       @misc_utils.log_exception(_moduleLogger)
-       def _on_get_contacts_failed(self, error):
-               _moduleLogger.error(error)
-
        def get_numbers(self):
                return self._numbers.iterkeys()
 
index 9996e74..72eda38 100755 (executable)
@@ -473,18 +473,15 @@ class GVoiceBackend(object):
                @returns Iterable of (personsName, phoneNumber, exact date, relative date, action)
                @blocks
                """
-               for action, url in (
-                       ("Received", self._XML_RECEIVED_URL),
-                       ("Missed", self._XML_MISSED_URL),
-                       ("Placed", self._XML_PLACED_URL),
-               ):
-                       flatXml = self._get_page(url)
-
-                       allRecentHtml = self._grab_html(flatXml)
-                       allRecentData = self._parse_history(allRecentHtml)
-                       for recentCallData in allRecentData:
-                               recentCallData["action"] = action
-                               yield recentCallData
+               recentPages = [
+                       (action, self._get_page(url))
+                       for action, url in (
+                               ("Received", self._XML_RECEIVED_URL),
+                               ("Missed", self._XML_MISSED_URL),
+                               ("Placed", self._XML_PLACED_URL),
+                       )
+               ]
+               return self._parse_recent(recentPages)
 
        def get_contacts(self):
                """
@@ -492,14 +489,7 @@ class GVoiceBackend(object):
                @blocks
                """
                page = self._get_page(self._XML_CONTACTS_URL)
-               contactsBody = self._contactsBodyRe.search(page)
-               if contactsBody is None:
-                       raise RuntimeError("Could not extract contact information")
-               accountData = _fake_parse_json(contactsBody.group(1))
-               for contactId, contactDetails in accountData["contacts"].iteritems():
-                       # A zero contact id is the catch all for unknown contacts
-                       if contactId != "0":
-                               yield contactId, contactDetails
+               return self._process_contacts(page)
 
        def get_voicemails(self):
                """
@@ -584,6 +574,24 @@ class GVoiceBackend(object):
                        raise RuntimeError("Not Authenticated")
                return number
 
+       def _parse_recent(self, recentPages):
+               for action, flatXml in recentPages:
+                       allRecentHtml = self._grab_html(flatXml)
+                       allRecentData = self._parse_history(allRecentHtml)
+                       for recentCallData in allRecentData:
+                               recentCallData["action"] = action
+                               yield recentCallData
+
+       def _process_contacts(self, page):
+               contactsBody = self._contactsBodyRe.search(page)
+               if contactsBody is None:
+                       raise RuntimeError("Could not extract contact information")
+               accountData = _fake_parse_json(contactsBody.group(1))
+               for contactId, contactDetails in accountData["contacts"].iteritems():
+                       # A zero contact id is the catch all for unknown contacts
+                       if contactId != "0":
+                               yield contactId, contactDetails
+
        def _parse_history(self, historyHtml):
                splitVoicemail = self._seperateVoicemailsRegex.split(historyHtml)
                for messageId, messageHtml in itergroup(splitVoicemail[1:], 2):
index 86697ea..149cef6 100644 (file)
@@ -14,6 +14,7 @@ except ImportError:
 import constants
 import util.coroutines as coroutines
 import util.misc as misc_utils
+import util.go_utils as gobject_utils
 
 
 _moduleLogger = logging.getLogger(__name__)
@@ -74,16 +75,18 @@ class Conversations(object):
        def update(self, force=False):
                if not force and self._conversations:
                        return
-               self._asyncPool.add_task(
+
+               le = gobject_utils.AsyncLinearExecution(self._asyncPool, self._update)
+               le.start()
+
+       @misc_utils.log_exception(_moduleLogger)
+       def _update(self):
+               conversationResult = yield (
                        self._get_raw_conversations,
                        (),
                        {},
-                       self._on_get_conversations,
-                       self._on_get_conversations_failed,
                )
 
-       @misc_utils.log_exception(_moduleLogger)
-       def _on_get_conversations(self, conversationResult):
                oldConversationIds = set(self._conversations.iterkeys())
 
                updateConversationIds = set()
@@ -117,10 +120,6 @@ class Conversations(object):
                        self.updateSignalHandler.stage.send(message)
                self._hasDoneUpdate = True
 
-       @misc_utils.log_exception(_moduleLogger)
-       def _on_get_conversations_failed(self, error):
-               _moduleLogger.error(error)
-
        def get_conversations(self):
                return self._conversations.iterkeys()
 
index 5fb5a95..8151c05 100644 (file)
@@ -150,25 +150,27 @@ class Session(object):
                self._masterStateMachine.close()
 
        def login(self, username, password, on_success, on_error):
-               self._username = username
-               self._password = password
                self._asyncPool.start()
-               self._asyncPool.add_task(
-                       self._backend.login,
-                       (self._username, self._password),
-                       {},
-                       self.__on_login_success(on_success),
-                       on_error
-               )
 
-       def __on_login_success(self, user_success):
+               le = gobject_utils.AsyncLinearExecution(self._asyncPool, self._login)
+               le.start(username, password, on_success, on_error)
 
-               @misc_utils.log_exception(_moduleLogger)
-               def _actual_success(*args, **kwds):
-                       self._masterStateMachine.start()
-                       user_success(*args, **kwds)
+       @misc_utils.log_exception(_moduleLogger)
+       def _login(self, username, password, on_success, on_error):
+               self._username = username
+               self._password = password
+               try:
+                       isLoggedIn = yield (
+                               self._backend.login,
+                               (self._username, self._password),
+                               {},
+                       )
+               except Exception, e:
+                       on_error(e)
+                       return
 
-               return _actual_success
+               self._masterStateMachine.start()
+               on_success(isLoggedIn)
 
        def logout(self):
                self._asyncPool.stop()
index 38e20c2..52ccf92 100644 (file)
@@ -168,6 +168,7 @@ class AsyncPool(object):
                try:
                        callback(result)
                except Exception:
+                       _moduleLogger.exception("Callback errored")
                        pass
                return False
 
@@ -183,6 +184,7 @@ class AsyncPool(object):
                                result = func(*args, **kwds)
                                isError = False
                        except Exception, e:
+                               _moduleLogger.error("Error, passing it back to the main thread")
                                result = e
                                isError = True
                        self.__workQueue.task_done()
@@ -190,38 +192,56 @@ class AsyncPool(object):
                        gobject.idle_add(self.__trampoline_callback, on_success, on_error, isError, result)
 
 
-class LinearExecution(object):
+class AsyncLinearExecution(object):
 
-       def __init__(self, func):
+       def __init__(self, pool, func):
+               self._pool = pool
                self._func = func
                self._run = None
 
        def start(self, *args, **kwds):
                assert self._run is None
-               kwds["on_success"] = self.on_success
-               kwds["on_error"] = self.on_error
                self._run = self._func(*args, **kwds)
                trampoline, args, kwds = self._run.send(None) # priming the function
-               trampoline(*args, **kwds)
+               self._pool.add_task(
+                       trampoline,
+                       args,
+                       kwds,
+                       self.on_success,
+                       self.on_error,
+               )
 
        @misc.log_exception(_moduleLogger)
-       def on_success(self, *args, **kwds):
-               assert not kwds
+       def on_success(self, result):
+               _moduleLogger.debug("Processing success for: %r", self._func)
                try:
-                       trampoline, args, kwds = self._run.send(args)
+                       trampoline, args, kwds = self._run.send(result)
                except StopIteration, e:
                        pass
                else:
-                       trampoline(*args, **kwds)
+                       self._pool.add_task(
+                               trampoline,
+                               args,
+                               kwds,
+                               self.on_success,
+                               self.on_error,
+                       )
 
        @misc.log_exception(_moduleLogger)
        def on_error(self, error):
+               _moduleLogger.debug("Processing error for: %r", self._func)
                try:
                        trampoline, args, kwds = self._run.throw(error)
                except StopIteration, e:
                        pass
                else:
-                       trampoline(*args, **kwds)
+                       self._pool.add_task(
+                               trampoline,
+                               args,
+                               kwds,
+                               self.on_success,
+                               self.on_error,
+                       )
 
 
 def throttled(minDelay, queue):