Source code for blackhole.worker

# -*- coding: utf-8 -*-

# (The MIT License)
#
# Copyright (c) 2013-2021 Kura
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the 'Software'), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

"""Provides functionality to manage child processes from the supervisor."""


import asyncio
import logging
import os
import signal
import time

from . import protocols
from .child import Child
from .control import setgid, setuid
from .streams import StreamProtocol


try:
    import setproctitle
except ImportError:
    setproctitle = None


__all__ = ("Worker",)


logger = logging.getLogger("blackhole.worker")


[docs]class Worker: """ A worker. Providers functionality to manage a single child process. The worker is responsible for communicating heartbeat information with it's child process and starting, stopping and restarting a child as required or instructed. """ _started = False ping_count = 0 def __init__(self, idx, socks, loop=None): """ Initialise the worker. :param str idx: The number reference of the worker and child. :param list socks: Sockets to listen for connections on. :param loop: The event loop to use. :type loop: :py:class:`asyncio.unix_events._UnixSelectorEventLoop` or :py:obj:`None` to get the current event loop using :py:func:`asyncio.get_event_loop`. """ self.loop = loop if loop is not None else asyncio.get_event_loop() self.socks = socks self.idx = idx self.start()
[docs] def start(self): """Create and fork off a child process for the current worker.""" if self._started is True: raise AssertionError("Cannot start an already started worker") self._started = True self.up_read, self.up_write = os.pipe() self.down_read, self.down_write = os.pipe() self.pid = os.fork() if self.pid > 0: # Parent asyncio.ensure_future(self.connect()) else: # Child self.setup_child()
[docs] def setup_child(self): """Set the gid, uid and start the child process..""" setgid() setuid() asyncio.set_event_loop(None) if setproctitle: setproctitle.setproctitle("blackhole: worker") process = Child(self.up_read, self.down_write, self.socks, self.idx) process.start()
[docs] def restart_child(self): """Restart the child process.""" self.kill_child() self.pid = os.fork() if self.pid > 0: self.ping_count = 0 self.ping = time.monotonic() else: self.setup_child()
[docs] def kill_child(self): """Kill the child process.""" try: os.kill(self.pid, signal.SIGTERM) os.wait() except ProcessLookupError: pass
[docs] async def heartbeat(self, writer): """ Handle heartbeat between a worker and child. If a child process stops communicating with it's worker, it will be killed, the worker managing it will also be removed and a new worker and child will be spawned. :param asyncio.StreamWriter writer: An object for writing data to the pipe. .. note:: 3 bytes are used in the communication channel. - b'x01' -- :const:`blackhole.protocols.PING` - b'x02' -- :const:`blackhole.protocols.PONG` The worker will sleep for 15 seconds, before requesting a ping from the child. If we go for over 30 seconds waiting for a ping, the worker will restart itself and the child bound to it. These message values are defined in the :mod:`blackhole.protocols` schema. Documentation is available at -- https://kura.gg/blackhole/api-protocols.html """ while self._started: await asyncio.sleep(15) if (time.monotonic() - self.ping) < 30: writer.write(protocols.PING) else: if self._started: logger.debug( f"worker.{self.idx}.heartbeat: Communication failed. " "Restarting worker", ) self.restart_child()
[docs] async def chat(self, reader): """ Communicate between a worker and child. If communication with the child fails the worker is shutdown and the child is killed. :param asyncio.StreamReader reader: An object for reading data from the pipe. .. note:: 3 bytes are used in the communication channel. - b'x01' -- :const:`blackhole.protocols.PING` - b'x02' -- :const:`blackhole.protocols.PONG` Read data coming in from the child. If a PONG is received, we'll update the worker, setting this PONG as a 'PING' from the child. These message values are defined in the :mod:`blackhole.protocols` schema. Documentation is available at -- https://kura.gg/blackhole/api-protocols.html """ while self._started: try: msg = await reader.read(3) if msg == protocols.PONG: logger.debug( f"worker.{self.idx}.chat: Pong received from child", ) self.ping = time.monotonic() self.ping_count += 1 except: # noqa self.stop() await asyncio.sleep(5)
[docs] async def connect(self): """ Connect the child and worker so they can communicate. :param int up_write: A file descriptor. :param int down_read: A file descriptor. """ read_fd = os.fdopen(self.down_read, "rb") r_trans, r_proto = await self.loop.connect_read_pipe( StreamProtocol, read_fd, ) write_fd = os.fdopen(self.up_write, "wb") w_trans, w_proto = await self.loop.connect_write_pipe( StreamProtocol, write_fd, ) reader = r_proto.reader writer = asyncio.StreamWriter(w_trans, w_proto, reader, self.loop) self.ping = time.monotonic() self.rtransport = r_trans self.wtransport = w_trans self.chat_task = asyncio.ensure_future(self.chat(reader)) self.heartbeat_task = asyncio.ensure_future(self.heartbeat(writer))
[docs] def stop(self): """Terminate the worker and it's respective child process.""" self.kill_child() self._started = False self.chat_task.cancel() self.heartbeat_task.cancel() self.rtransport.close() self.wtransport.close()