Source code for thriftworker.transports.base
from __future__ import absolute_import
import os
import socket
import errno
import logging
from contextlib import contextmanager
from abc import ABCMeta, abstractproperty
from pyuv import TCP, Poll, UV_READABLE
from pyuv.errno import strerror
from six import with_metaclass
from thriftworker.constants import BACKLOG_SIZE
from thriftworker.utils.mixin import LoopMixin, StartStopMixin
from thriftworker.utils.loop import in_loop
from thriftworker.utils.decorators import cached_property
from thriftworker.utils.waiter import Waiter
from . import utils
logger = logging.getLogger(__name__)
NOTBLOCK = (errno.EAGAIN, errno.EWOULDBLOCK, errno.EINVAL, errno.EBADF)
@contextmanager
[docs]def ignore_eagain():
"""Ignore all *EAGAIN* errors in context."""
try:
yield
except OSError as exc:
if exc.errno not in (errno.EAGAIN, errno.EWOULDBLOCK,
errno.EINVAL, errno.EBADF):
raise
[docs]class Connections(object):
"""Store existed connections."""
def __init__(self):
self.connections = set()
self._callback = None
def __len__(self):
return len(self.connections)
def __iter__(self):
return iter(self.connections)
def __repr__(self):
return repr(self.connections)
def _execute_callback(self):
"""Execute callback if needed."""
if self._callback is not None:
self._callback()
self._callback = None
@property
def callback(self):
"""Return callback if existed."""
return self._callback
@callback.setter
[docs] def callback(self, cb):
"""Setup callback and execute it if needed."""
self._callback = cb
if not self.connections:
self._execute_callback()
[docs] def register(self, connection):
"""Register new connection."""
self.connections.add(connection)
[docs] def remove(self, connection):
"""Remove registered connection."""
try:
self.connections.remove(connection)
except KeyError:
pass
if not self.connections:
self._execute_callback()
[docs] def close(self):
connections = self.connections
while connections:
connection = connections.pop()
if not connection.is_closed():
logger.warn('Connection %r closed prematurely', connection)
connection.close()
self._execute_callback()
[docs]class BaseAcceptor(with_metaclass(ABCMeta, LoopMixin)):
"""Accept incoming connections."""
Connections = Connections
def __init__(self, name, descriptor, backlog=None):
self.name = name
self.descriptor = descriptor
self.backlog = backlog or BACKLOG_SIZE
self._connections = self.Connections()
super(BaseAcceptor, self).__init__()
@cached_property
def _poller(self):
return Poll(self.loop, self.descriptor)
@cached_property
def _socket(self):
"""Create socket from given descriptor."""
sock = socket.fromfd(self.descriptor, socket.AF_INET, socket.SOCK_STREAM)
sock.setblocking(0)
return sock
@abstractproperty
[docs] def Connection(self):
"""Return connection class. Depends on current
implementation of transport.
"""
raise NotImplementedError()
@property
[docs] def connections_number(self):
"""Return number of active connections."""
return len(self._connections)
@property
[docs] def empty(self):
"""Is acceptor empty or not."""
return self.connections_number == 0
@property
[docs] def active(self):
"""Is current acceptor active."""
return self._poller.active
def __iter__(self):
"""Return all registered connections."""
return iter(self._connections)
@cached_property
[docs] def acceptor(self):
"""Return function that should accept new connections."""
loop = self.loop
service = self.name
connections = self._connections
listen_fd = self._socket.fileno()
worker = self.app.worker
producer = worker.create_producer(service)
def on_close(connection):
"""Callback called when connection closed."""
connections.remove(connection)
def inner_acceptor(handle, events, error):
"""Function that try to accept new connection."""
if error: # pragma: no cover
logger.error('Error handling new connection for'
' service %r: %s', service, strerror(error))
return
try:
fd, addr = utils.accept_connection(listen_fd)
except OSError as exc:
if exc.errno not in NOTBLOCK:
raise
return
try:
# Setup socket.
utils.set_nonblocking(fd)
utils.set_sockopt(fd, socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
except:
os.close(fd)
raise
handle = TCP(loop)
handle.open(fd)
connection = self.Connection(producer, loop, handle, addr, on_close)
connections.register(connection)
return inner_acceptor
@in_loop
[docs] def start(self):
"""Start acceptor if active."""
poller = self._poller
if not poller.active and not poller.closed:
poller.start(UV_READABLE, self.acceptor)
@in_loop
[docs] def stop(self, callback=None):
"""Stop acceptor if active."""
poller = self._poller
if poller.active and not poller.closed:
poller.stop()
self._connections.callback = callback
@in_loop
[docs] def close(self):
"""Close all resources."""
if not self._poller.closed:
self._poller.close()
self._connections.close()
self._socket.close()
[docs]class Acceptors(StartStopMixin, LoopMixin):
"""Maintain pool of acceptors. Start them when needed."""
def __init__(self):
self._acceptors = {}
self._stop_waiter = Waiter(
timeout=self.app.shutdown_timeout)
super(Acceptors, self).__init__()
def __iter__(self):
"""Iterate over registered acceptors."""
return iter(self._acceptors.values())
@cached_property
[docs] def Acceptor(self):
"""Shortcut to :class:`thriftworker.acceptor.Acceptor` class."""
return self.app.Acceptor
[docs] def register(self, fd, name, backlog=None):
"""Register new acceptor in pool."""
self._acceptors[name] = self.Acceptor(name, fd, backlog=backlog)
[docs] def start_by_name(self, name):
"""Start acceptor by name."""
acceptor = self._acceptors[name]
self.app.hub.callback(acceptor.start)
[docs] def stop_by_name(self, name):
"""Stop acceptor by name."""
acceptor = self._acceptors[name]
self.app.hub.callback(acceptor.stop)
[docs] def start_accepting(self):
"""Start all registered acceptors if needed."""
for acceptor in self._acceptors.values():
acceptor.start()
[docs] def stop_accepting(self, callback=None):
"""Stop all registered acceptors if needed."""
for acceptor in self._acceptors.values():
acceptor.stop(callback)
@property
[docs] def connections_number(self):
"""Return current connection number across all acceptors."""
return sum(acceptor.connections_number for acceptor in self)
@property
[docs] def empty(self):
"""Are all acceptors empty or not."""
return self.connections_number == 0
[docs] def stop(self):
"""Close all registered acceptors."""
# wait for unclosed connections
def on_close():
if self.empty:
self._stop_waiter.done()
# stop accepting new connection
self.stop_accepting(callback=on_close)
# wait for unclosed connections
if not self.empty:
logger.info('Waiting for unclosed connections...')
self._stop_waiter.wait()
if not self.empty:
logger.warning('Not all connection closed!')
# close existed connection
for acceptor in self:
acceptor.close()