Source code for nobodd.systemd

# nobodd: a boot configuration tool for the Raspberry Pi
#
# Copyright (c) 2024 Dave Jones <dave.jones@canonical.com>
# Copyright (c) 2024 Canonical Ltd.
#
# SPDX-License-Identifier: GPL-3.0

"""
Provides a simple interface to systemd's notification and watchdog services.

.. autoclass:: Systemd
"""

import os
import socket

from . import lang


[docs] class Systemd: """ Provides a simple interface to systemd's notification and watchdog services. It is suggested applications obtain a single, top-level instance of this class via :func:`get_systemd` and use it to communicate with systemd. """ __slots__ = ('_socket',) LISTEN_FDS_START = 3 def __init__(self, address=None): # Remove NOTIFY_SOCKET implicitly so child processes don't inherit it self._socket = None if address is None: address = os.environ.pop('NOTIFY_SOCKET', None) if address is not None: if len(address) <= 1 or address[0] not in ('@', '/'): return None if address[0] == '@': address = '\0' + address[1:] # abstract namespace socket self._socket = socket.socket( socket.AF_UNIX, socket.SOCK_DGRAM | socket.SOCK_CLOEXEC) try: self._socket.connect(address) except IOError: self._socket = None
[docs] def available(self): """ If systemd's notification socket is not available, raises :exc:`RuntimeError`. Services expecting systemd notifications to be available can call this to assert that notifications will be noticed. """ if self._socket is None: raise RuntimeError(lang._( 'systemd notification socket unavailable'))
[docs] def notify(self, state): """ Send a notification to systemd. *state* is a string type (if it is a unicode string it will be encoded with the 'ascii' codec). """ if self._socket is not None: if isinstance(state, str): state = state.encode('ascii') self._socket.sendall(state)
[docs] def ready(self): """ Notify systemd that service startup is complete. """ self.notify('READY=1')
[docs] def reloading(self): """ Notify systemd that the service is reloading its configuration. Call :func:`ready` when reload is complete. """ self.notify('RELOADING=1')
[docs] def stopping(self): """ Notify systemd that the service is stopping. """ self.notify('STOPPING=1')
[docs] def extend_timeout(self, timeout): """ Notify systemd to extend the start / stop timeout by *timeout* seconds. A timeout will occur if the service does not call :func:`ready` or terminate within *timeout* seconds but *only* if the original timeout (set in the systemd configuration) has already been exceeded. For example, if the stopping timeout is configured as 90s, and the service calls :func:`stopping`, systemd expects the service to terminate within 90s. After 10s the service calls :func:`extend_timeout` with a *timeout* of 10s. 20s later the service has not yet terminated but systemd does *not* consider the timeout expired as only 30s have elapsed of the original 90s timeout. """ self.notify(f'EXTEND_TIMEOUT_USEC={timeout * 1000000:d}')
[docs] def watchdog_ping(self): """ Ping the systemd watchdog. This must be done periodically if :func:`watchdog_period` returns a value other than ``None``. """ self.notify('WATCHDOG=1')
[docs] def watchdog_reset(self, timeout): """ Reset the systemd watchdog timer to *timeout* seconds. """ self.notify(f'WATCHDOG_USEC={timeout * 1000000:d}')
[docs] def watchdog_period(self): """ Returns the time (in seconds) before which systemd expects the process to call :func:`watchdog_ping`. If a watchdog timeout is not set, the function returns ``None``. """ timeout = os.environ.get('WATCHDOG_USEC') if timeout is not None: pid = os.environ.get('WATCHDOG_PID') if pid is None or int(pid) == os.getpid(): return int(timeout) / 1000000 return None
[docs] def watchdog_clean(self): """ Unsets the watchdog environment variables so that no future child processes will inherit them. .. warning:: After calling this function, :func:`watchdog_period` will return ``None`` but systemd will continue expecting :func:`watchdog_ping` to be called periodically. In other words, you should call :func:`watchdog_period` first and store its result somewhere before calling this function. """ os.environ.pop('WATCHDOG_USEC', None) os.environ.pop('WATCHDOG_PID', None)
[docs] def main_pid(self, pid=None): """ Report the main PID of the process to systemd (for services that confuse systemd with their forking behaviour). If *pid* is None, :func:`os.getpid` is called to determine the calling process' PID. """ if pid is None: pid = os.getpid() self.notify(f'MAINPID={pid:d}')
[docs] def listen_fds(self): """ Return file-descriptors passed to the service by systemd, e.g. as part of socket activation or file descriptor stores. It returns a :class:`dict` mapping each file-descriptor to its name, or the string "unknown" if no name was given. """ try: if int(os.environ['LISTEN_PID']) != os.getpid(): raise ValueError(lang._('wrong LISTEN_PID')) fds = int(os.environ['LISTEN_FDS']) except (ValueError, KeyError): return {} try: names = os.environ['LISTEN_FDNAMES'].split(':') except KeyError: names = ['unknown'] * fds if len(names) != fds: return {} return { fd: name for fd, name in zip( range(self.LISTEN_FDS_START, self.LISTEN_FDS_START + fds), names) }
_SYSTEMD = None
[docs] def get_systemd(): """ Return a single top-level instance of :class:`Systemd`; repeated calls will return the same instance. """ global _SYSTEMD if _SYSTEMD is None: _SYSTEMD = Systemd() return _SYSTEMD