recipientsChanged = qt_compat.Signal()
- def __init__(self, pool, backend, errorLog):
+ def __init__(self, asyncQueue, backend, errorLog):
QtCore.QObject.__init__(self)
self._errorLog = errorLog
self._contacts = {}
- self._pool = pool
+ self._asyncQueue = asyncQueue
self._backend = backend
self._busyReason = None
self._message = ""
assert 0 < len(self._contacts), "No contacts selected"
assert 0 < len(self._message), "No message to send"
numbers = [misc_utils.make_ugly(contact.selectedNumber) for contact in self._contacts.itervalues()]
- le = concurrent.AsyncLinearExecution(self._pool, self._send)
+ le = self._asyncQueue.add_async(self._send)
le.start(numbers, self._message)
def call(self):
assert len(self._message) == 0, "Cannot send message with call"
(contact, ) = self._contacts.itervalues()
number = misc_utils.make_ugly(contact.selectedNumber)
- le = concurrent.AsyncLinearExecution(self._pool, self._call)
+ le = self._asyncQueue.add_async(self._call)
le.start(number)
def cancel(self):
- le = concurrent.AsyncLinearExecution(self._pool, self._cancel)
+ le = self._asyncQueue.add_async(self._cancel)
le.start()
def _get_message(self):
def __init__(self, errorLog, cachePath = None):
QtCore.QObject.__init__(self)
self._errorLog = errorLog
- self._pool = qore_utils.AsyncPool()
+ self._pool = qore_utils.FutureThread()
+ self._asyncQueue = concurrent.AsyncTaskQueue(self._pool)
self._backend = []
self._loggedInTime = self._LOGGEDOUT_TIME
self._loginOps = []
self._voicemailCachePath = None
self._username = None
self._password = None
- self._draft = Draft(self._pool, self._backend, self._errorLog)
+ self._draft = Draft(self._asyncQueue, self._backend, self._errorLog)
self._delayedRelogin = QtCore.QTimer()
self._delayedRelogin.setInterval(0)
self._delayedRelogin.setSingleShot(True)
self._backend[0:0] = [gv_backend.GVDialer(cookiePath)]
self._pool.start()
- le = concurrent.AsyncLinearExecution(self._pool, self._login)
+ le = self._asyncQueue.add_async(self._login)
le.start(username, password)
def logout(self):
def update_account(self, force = True):
if not force and self._contacts:
return
- le = concurrent.AsyncLinearExecution(self._pool, self._update_account), (), {}
+ le = self._asyncQueue.add_async(self._update_account), (), {}
self._perform_op_while_loggedin(le)
def refresh_connection(self):
- le = concurrent.AsyncLinearExecution(self._pool, self._refresh_authentication)
+ le = self._asyncQueue.add_async(self._refresh_authentication)
le.start()
def get_contacts(self):
def update_messages(self, messageType, force = True):
if not force and self._messages:
return
- le = concurrent.AsyncLinearExecution(self._pool, self._update_messages), (messageType, ), {}
+ le = self._asyncQueue.add_async(self._update_messages), (messageType, ), {}
self._perform_op_while_loggedin(le)
def get_messages(self):
def update_history(self, historyType, force = True):
if not force and self._history:
return
- le = concurrent.AsyncLinearExecution(self._pool, self._update_history), (historyType, ), {}
+ le = self._asyncQueue.add_async(self._update_history), (historyType, ), {}
self._perform_op_while_loggedin(le)
def get_history(self):
return self._historyUpdateTime
def update_dnd(self):
- le = concurrent.AsyncLinearExecution(self._pool, self._update_dnd), (), {}
+ le = self._asyncQueue.add_async(self._update_dnd), (), {}
self._perform_op_while_loggedin(le)
def set_dnd(self, dnd):
- le = concurrent.AsyncLinearExecution(self._pool, self._set_dnd)
+ le = self._asyncQueue.add_async(self._set_dnd)
le.start(dnd)
def is_available(self, messageId):
return actualPath
def download_voicemail(self, messageId):
- le = concurrent.AsyncLinearExecution(self._pool, self._download_voicemail)
+ le = self._asyncQueue.add_async(self._download_voicemail)
le.start(messageId)
def _set_dnd(self, dnd):
return self._callback
def set_callback_number(self, callback):
- le = concurrent.AsyncLinearExecution(self._pool, self._set_callback_number)
+ le = self._asyncQueue.add_async(self._set_callback_number)
le.start(callback)
def _set_callback_number(self, callback):
_moduleLogger = logging.getLogger(__name__)
-class AsyncLinearExecution(object):
+class AsyncTaskQueue(object):
+
+ def __init__(self, taskPool):
+ self._asyncs = []
+ self._taskPool = taskPool
+
+ def add_async(self, func):
+ self.flush()
+ a = _AsyncGeneratorTask(self._taskPool, func)
+ self._asyncs.append(a)
+ return a
+
+ def flush(self):
+ self._asyncs = [a for a in self._asyncs if not a.isDone]
+
+
+class _AsyncGeneratorTask(object):
def __init__(self, pool, func):
self._pool = pool
self._func = func
self._run = None
+ self._isDone = False
+
+ @property
+ def isDone(self):
+ return self._isDone
def start(self, *args, **kwds):
assert self._run is None, "Task already started"
try:
trampoline, args, kwds = self._run.send(result)
except StopIteration, e:
- pass
+ self._isDone = True
else:
self._pool.add_task(
trampoline,
try:
trampoline, args, kwds = self._run.throw(error)
except StopIteration, e:
- pass
+ self._isDone = True
else:
self._pool.add_task(
trampoline,
class _ParentThread(QtCore.QObject):
- def __init__(self, pool):
+ def __init__(self, futureThread):
QtCore.QObject.__init__(self)
- self._pool = pool
+ self._futureThread = futureThread
@qt_compat.Slot(object)
@misc.log_exception(_moduleLogger)
def _on_task_complete(self, taskResult):
on_success, on_error, isError, result = taskResult
- if not self._pool._isRunning:
+ if not self._futureThread._isRunning:
if isError:
_moduleLogger.error("Masking: %s" % (result, ))
isError = True
taskComplete = qt_compat.Signal(object)
- def __init__(self, pool):
+ def __init__(self, futureThread):
QtCore.QObject.__init__(self)
- self._pool = pool
+ self._futureThread = futureThread
@qt_compat.Slot(object)
@misc.log_exception(_moduleLogger)
def _on_task_added(self, task):
- if not self._pool._isRunning:
+ if not self._futureThread._isRunning:
_moduleLogger.error("Dropping task")
func, args, kwds, on_success, on_error = task
@qt_compat.Slot()
@misc.log_exception(_moduleLogger)
def _on_stop_requested(self):
- self._pool._thread.quit()
+ self._futureThread._thread.quit()
-class AsyncPool(QtCore.QObject):
+class FutureThread(QtCore.QObject):
_addTask = qt_compat.Signal(object)
- _stopPool = qt_compat.Signal()
+ _stopFutureThread = qt_compat.Signal()
def __init__(self):
QtCore.QObject.__init__(self)
self._addTask.connect(self._worker._on_task_added)
self._worker.taskComplete.connect(self._parent._on_task_complete)
- self._stopPool.connect(self._worker._on_stop_requested)
+ self._stopFutureThread.connect(self._worker._on_stop_requested)
def start(self):
self._thread.start()
def stop(self):
self._isRunning = False
- self._stopPool.emit()
+ self._stopFutureThread.emit()
def add_task(self, func, args, kwds, on_success, on_error):
assert self._isRunning, "Task queue not started"