# -*- coding: utf-8 -*-
# (The MIT License)
#
# Copyright (c) 2013-2021 Kura
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the 'Software'), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED 'AS IS', WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
"""Provides control functionality, including socket wrappers."""
try: # pragma: no cover
import ssl
except ImportError: # pragma: no cover
ssl = None
import grp
import logging
import os
import pwd
import socket
from .config import Config
from .exceptions import BlackholeRuntimeException
__all__ = ("pid_permissions", "server", "setgid", "setuid")
"""Tuple all the things."""
logger = logging.getLogger("blackhole.control")
ciphers = [
"ECDHE-ECDSA-AES256-GCM-SHA384",
"ECDHE-RSA-AES256-GCM-SHA384",
"ECDHE-ECDSA-CHACHA20-POLY1305",
"ECDHE-RSA-CHACHA20-POLY1305",
"ECDHE-ECDSA-AES128-GCM-SHA256",
"ECDHE-RSA-AES128-GCM-SHA256",
"ECDHE-ECDSA-AES256-SHA384",
"ECDHE-RSA-AES256-SHA384",
"ECDHE-ECDSA-AES128-SHA256",
"ECDHE-RSA-AES128-SHA256",
]
"""Strong default TLS ciphers."""
[docs]def _context(use_tls=False):
"""
Create a TLS context using the certificate, key and dhparams file.
:param bool use_tls: Whether to create a TLS context or not.
Default: ``False``.
:returns: A TLS context or ``None``.
:rtype: :py:class:`ssl.SSLContext` or :py:obj:`None`.
.. note::
Created with:
- :py:obj:`ssl.OP_NO_SSLv2`
- :py:obj:`ssl.OP_NO_SSLv3`
- :py:obj:`ssl.OP_NO_COMPRESSION`
- :py:obj:`ssl.OP_CIPHER_SERVER_PREFERENCE`
Also responsible for loading Diffie Hellman ephemeral parameters if
they're provided -- :py:func:`ssl.SSLContext.load_dh_params`
If the ``-ls`` or ``--less-secure`` option is provided,
:py:obj:`ssl.OP_SINGLE_DH_USE` and :py:obj:`ssl.OP_SINGLE_ECDH_USE`
will be omitted from the context. --
https://kura.gg/blackhole/configuration.html#command-line-options
-- added in :ref:`2.0.13`
"""
if use_tls is False:
return None
config = Config()
ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ctx.load_cert_chain(config.tls_cert, config.tls_key)
ctx.options |= ssl.OP_NO_SSLv2
ctx.options |= ssl.OP_NO_SSLv3
ctx.options |= ssl.OP_NO_COMPRESSION
ctx.options |= ssl.OP_CIPHER_SERVER_PREFERENCE
if not config.args.less_secure:
ctx.options |= ssl.OP_SINGLE_DH_USE
ctx.options |= ssl.OP_SINGLE_ECDH_USE
ctx.set_ciphers(":".join(ciphers))
if config.tls_dhparams:
ctx.load_dh_params(config.tls_dhparams)
return ctx
[docs]def _socket(addr, port, family):
"""
Create a socket, bind and listen.
:param str addr: The address to use.
:param int port: The port to use.
:param family: The type of socket to use.
:type family: :py:obj:`socket.AF_INET` or :py:obj:`socket.AF_INET6`.
:returns: Bound socket.
:rtype: :py:func:`socket.socket`
:raises BlackholeRuntimeException: When a socket cannot be bound.
"""
sock = socket.socket(family, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
try:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
except (AttributeError, OSError):
pass
if family == socket.AF_INET6:
sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
try:
sock.bind((addr, port))
except OSError:
msg = f"Cannot bind to {addr}:{port}."
logger.critical(msg)
sock.close()
raise BlackholeRuntimeException(msg)
os.set_inheritable(sock.fileno(), True)
sock.listen(1024)
sock.setblocking(False)
return sock
[docs]def server(addr, port, family, use_tls=False):
"""
Socket and possibly a TLS context.
Create an instance of :py:func:`socket.socket`, bind it and return a
dictionary containing the socket object and a TLS context if configured.
:param str addr: The address to use.
:param int port: The port to use.
:param family: The type of socket to use.
:type family: :py:obj:`socket.AF_INET` or :py:obj:`socket.AF_INET6`.
:param bool use_tls: Whether to create a TLS context or not.
Default: ``False``.
:returns: Bound socket, a TLS context if configured.
:rtype: :py:obj:`dict`
"""
sock = _socket(addr, port, family)
ctx = _context(use_tls=use_tls)
return {"sock": sock, "ssl": ctx}
[docs]def pid_permissions():
"""
Change the pid file ownership.
Called before :func:`blackhole.control.setgid` and
:func:`blackhole.control.setuid` are called to stop
:class:`blackhole.daemon.Daemon` losing permissions to modify the pid file.
:raises SystemExit: With exit code :py:obj:`os.EX_USAGE` when a permissions
error occurs.
"""
config = Config()
try:
user = pwd.getpwnam(config.user)
group = grp.getgrnam(config.group)
os.chown(config.pidfile, user.pw_uid, group.gr_gid)
except (KeyError, PermissionError):
logger.error("Unable to change pidfile ownership permissions.")
raise SystemExit(os.EX_USAGE)
[docs]def setgid():
"""
Change group.
Change to a less privileged group. Unless you're using it incorrectly --
in which case, don't use it.
:raises SystemExit: Exit code :py:obj:`os.EX_USAGE` when a configuration
error occurs or :py:obj:`os.EX_NOPERM` when a
permission error occurs.
.. note::
MUST be called BEFORE setuid, not after.
"""
config = Config()
try:
gid = grp.getgrnam(config.group).gr_gid
os.setgid(gid)
except KeyError:
logger.error(f"Group '{config.group}' does not exist.")
raise SystemExit(os.EX_USAGE)
except PermissionError:
logger.error(
f"You do not have permission to switch to group '{config.group}'.",
)
raise SystemExit(os.EX_NOPERM)
[docs]def setuid():
"""
Change user.
Change to a less privileged user.Unless you're using it incorrectly --
inwhich case, don't use it.
:raises SystemExit: Exit code :py:obj:`os.EX_USAGE` when a configuration
error occurs or :py:obj:`os.EX_NOPERM` when a
permission error occurs.
.. note::
MUST be called AFTER setgid, not before.
"""
config = Config()
try:
uid = pwd.getpwnam(config.user).pw_uid
os.setuid(uid)
except KeyError:
logger.error(f"User '{config.user}' does not exist.")
raise SystemExit(os.EX_USAGE)
except PermissionError:
logger.error(
f"You do not have permission to switch to user '{config.user}'.",
)
raise SystemExit(os.EX_NOPERM)