Source code for thriftworker.utils.loop

"""Execute some function in loop and wait for answer."""
from __future__ import absolute_import

import sys
from functools import wraps, partial
from thread import get_ident
from threading import Event

import six

from thriftworker.state import current_app

#: Specify default timeout for delegation decorators.
DELEGATION_TIMEOUT = 5.0


def _create_decorator(decorator, *args, **options):
    """Execute given decorator if and only if function executed not in loop.

    """

    def inner_create_decorator(**options):

        def inner_decorator(func):

            @wraps(func)
            def inner_wrapper(*args, **kwargs):
                """Detect current thread and use appropriate method to avoid
                loop blocking.

                """
                try:
                    ident = current_app.loop.ident
                except AttributeError:
                    raise RuntimeError('Loop not started')

                if ident == get_ident():
                    # Don't block main loop.
                    return func(*args, **kwargs)
                else:
                    return decorator(func, options, *args, **kwargs)

            return inner_wrapper

        return inner_decorator

    if len(args) == 1 and callable(args[0]):
        return inner_create_decorator(**options)(*args)
    return inner_create_decorator(**options)


[docs]class Container(object): """Mutable that store result.""" __slots__ = ['_result', '_exception', '_event', 'timeout', 'func'] def __init__(self, func, timeout=None): self.func = func self.timeout = timeout self._result = None self._exception = None self._event = Event() @property def exception(self): return self._exception @exception.setter
[docs] def exception(self, value): assert isinstance(value, (Exception, tuple)) self._exception = value self._event.set()
@property def result(self): return self._result @result.setter
[docs] def result(self, value): self._result = value self._event.set()
[docs] def from_greenlet(self, g): if g.exception is None: self.result = g.value else: print g.exception self.exception = g.exception
[docs] def dispatch(self): try: if not self._event.wait(self.timeout): raise RuntimeError('Timeout happened waiting for {0!r}' .format(self.func)) finally: self._event.clear() exception = self._exception if exception is None: return self._result elif isinstance(exception, tuple): exc_type, exc, tb = exception six.reraise(exc_type, exc, tb) else: raise exception
def _loop_delegate(func, options, *args, **kwargs): """Run given function in loop. :param timeout: how many seconds we should wait before raise exception? """ container = Container(func, timeout=options.get('timeout') or DELEGATION_TIMEOUT) def inner_callback(): try: container.result = func(*args, **kwargs) except: container.exception = sys.exc_info() current_app.hub.callback(inner_callback) return container.dispatch() loop_delegate = partial(_create_decorator, _loop_delegate) def _greenlet_delegate(func, options, *args, **kwargs): """Run given function in greenlet. :param timeout: how many seconds we should wait before raise exception? """ container = Container(func, timeout=options.get('timeout') or DELEGATION_TIMEOUT) def inner_callback(): g = current_app.hub.spawn(func, *args, **kwargs) g.rawlink(container.from_greenlet) current_app.hub.callback(inner_callback) return container.dispatch() greenlet_delegate = partial(_create_decorator, _greenlet_delegate)
[docs]class in_loop(object): """Schedule execution of given function in main event loop. Wait for function execution. """ def __init__(self, func=None, timeout=None): self.__func = func self.__name__ = func.__name__ self.__doc__ = func.__doc__ self.__timeout = timeout def __get__(self, obj, type=None): if obj is None: return self try: return obj.__dict__[self.__name__] except KeyError: method = self.__func.__get__(obj) value = obj.__dict__[self.__name__] = \ loop_delegate(timeout=self.__timeout)(method) return value
Read the Docs v: latest
Versions
latest
Downloads
PDF
HTML
Epub
On Read the Docs
Project Home
Builds

Free document hosting provided by Read the Docs.