###############################################################################
#
# The MIT License (MIT)
#
# Copyright (c) Tavendo GmbH
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
###############################################################################
from twisted.python.failure import Failure
from twisted.internet.defer import maybeDeferred, Deferred, DeferredList
from twisted.internet.defer import succeed, fail
from twisted.internet.interfaces import IReactorTime
from txaio.interfaces import IFailedFuture
from txaio import _Config
using_twisted = True
using_asyncio = False
config = _Config()
class FailedFuture(IFailedFuture):
pass
FailedFuture.register(Failure)
[docs]def create_future(result=None, error=None):
if result is not None and error is not None:
raise ValueError("Cannot have both result and error.")
f = Deferred()
if result is not None:
resolve(f, result)
elif error is not None:
reject(f, error)
return f
# maybe delete, just use create_future()
def create_future_success(result):
return succeed(result)
# maybe delete, just use create_future()
def create_future_error(error=None):
return fail(create_failure(error))
# maybe rename to call()?
[docs]def as_future(fun, *args, **kwargs):
return maybeDeferred(fun, *args, **kwargs)
[docs]def call_later(delay, fun, *args, **kwargs):
return IReactorTime(_get_loop()).callLater(delay, fun, *args, **kwargs)
[docs]def resolve(future, result=None):
future.callback(result)
[docs]def reject(future, error=None):
if error is None:
error = create_failure()
elif isinstance(error, Exception):
error = Failure(error)
else:
if not isinstance(error, Failure):
raise RuntimeError("reject requires a Failure or Exception")
future.errback(error)
def create_failure(exception=None):
"""
Create a Failure instance.
if ``exception`` is None (the default), we MUST be inside an
"except" block. This encapsulates the exception into an object
that implements IFailedFuture
"""
if exception:
return Failure(exception)
return Failure()
[docs]def add_callbacks(future, callback, errback):
"""
callback or errback may be None, but at least one must be
non-None.
"""
assert future is not None
if callback is None:
assert errback is not None
future.addErrback(errback)
else:
# Twisted allows errback to be None here
future.addCallbacks(callback, errback)
return future
[docs]def gather(futures, consume_exceptions=True):
def completed(res):
rtn = []
for (ok, value) in res:
rtn.append(value)
if not ok and not consume_exceptions:
value.raiseException()
return rtn
# XXX if consume_exceptions is False in asyncio.gather(), it will
# abort on the first raised exception -- should we set
# fireOnOneErrback=True (if consume_exceptions=False?) -- but then
# we'll have to wrap the errback() to extract the "real" failure
# from the FirstError that gets thrown if you set that ...
dl = DeferredList(list(futures), consumeErrors=consume_exceptions)
# we unpack the (ok, value) tuples into just a list of values, so
# that the callback() gets the same value in asyncio and Twisted.
add_callbacks(dl, completed, None)
return dl
# methods internal to this implementation
def _get_loop():
if config.loop is None:
from twisted.internet import reactor
config.loop = reactor
return config.loop