# nobodd: a boot configuration tool for the Raspberry Pi
#
# Copyright (c) 2023-2024 Dave Jones <dave.jones@canonical.com>
# Copyright (c) 2023-2024 Canonical Ltd.
#
# SPDX-License-Identifier: GPL-3.0
import os
import re
import socket
import datetime as dt
from pathlib import Path
from decimal import Decimal
from contextlib import suppress
from fnmatch import fnmatchcase
from collections import namedtuple
from configparser import ConfigParser
from argparse import ArgumentParser, SUPPRESS
from ipaddress import ip_address
from copy import deepcopy
from . import lang
# The locations to attempt to read the configuration from
XDG_CONFIG_HOME = Path(os.environ.get('XDG_CONFIG_HOME', '~/.config'))
CONFIG_LOCATIONS = (
Path('/etc/nobodd/nobodd.conf'),
Path('/usr/local/etc/nobodd/nobodd.conf'),
Path(XDG_CONFIG_HOME / 'nobodd/nobodd.conf'),
)
[docs]
class ConfigArgumentParser(ArgumentParser):
"""
A variant of :class:`~argparse.ArgumentParser` that links arguments to
specified keys in a :class:`~configparser.ConfigParser` instance.
Typical usage is to construct an instance of :class:`ConfigArgumentParser`,
define the parameters and parameter groups on it, associating them with
configuration section and key names as appropriate, then call
:meth:`read_configs` to parse a set of configuration files. These will be
checked against the (optional) *template* configuration passed to the
initializer, which defines the set of valid sections and keys expected.
The resulting :class:`~configparser.ConfigParser` forms the "base"
configuration, prior to argument parsing. This can be optionally
manipulated, before passing it to :meth:`set_defaults_from` to set the
argument defaults. At this point,
:meth:`~argparse.ArgumentParser.parse_args` may be called to parse the
command line arguments, knowing that defaults in the help will be drawn
from the "base" configuration.
The resulting :class:`~argparse.Namespace` object is the application's
runtime configuration. For example::
>>> from pathlib import Path
>>> from nobodd.config import *
>>> parser = ConfigArgumentParser()
>>> tftp = parser.add_argument_group('tftp', section='tftp')
>>> tftp.add_argument('--listen', type=str, key='listen',
... help="the address on which to listen for connections "
... "(default: %(default)s)")
>>> Path('defaults.conf').write_text('''
... [tftp]
... listen = 127.0.0.1
... ''')
>>> defaults = parser.read_configs(['defaults.conf'])
>>> parser.set_defaults_from(defaults)
>>> parser.get_default('listen')
'127.0.0.1'
>>> config = parser.parse_args(['--listen', '0.0.0.0'])
>>> config.listen
'0.0.0.0'
Note that, after the call to :meth:`set_defaults_from`, the parser's idea
of the defaults has been drawn from the file-based configuration (and thus
will be reflected in printed ``--help``), but this is still overridden by
the arguments passed to the command line.
"""
def __init__(self, *args, template=None, **kwargs):
super().__init__(*args, **kwargs)
if template is not None:
self._template = self._get_config_parser()
with template.open('r') as f:
self._template.read_file(f)
else:
self._template = None
self._config_map = {}
def _get_config_parser(self):
"""
Generate and return a new :class:`~configparser.ConfigParser` with
appropriate configuration (interpolation, delimiters, etc.) for the
desired parsing behaviour.
"""
return ConfigParser(
delimiters=('=',), empty_lines_in_values=False,
interpolation=None, strict=False)
[docs]
def add_argument(self, *args, section=None, key=None, **kwargs):
"""
Adds *section* and *key* parameters. These link the new argument to the
specified configuration entry.
The default for the argument can be specified directly as usual, or can
be read from the configuration (see :meth:`read_configs` and
:meth:`set_defaults_from`). When arguments are parsed, the value
assigned to this argument will be copied to the associated
configuration entry.
"""
return self._add_config_action(
*args, method=super().add_argument, section=section, key=key,
**kwargs)
[docs]
def add_argument_group(self, title=None, description=None, section=None):
"""
Adds a new argument group object and returns it.
The new argument group will likewise accept *section* and *key*
parameters on its :meth:`add_argument` method. The *section* parameter
will default to the value of the *section* parameter passed to this
method (but may be explicitly overridden).
"""
group = super().add_argument_group(title=title, description=description)
def add_argument(*args, section=section, key=None,
_add_arg=group.add_argument, **kwargs):
return self._add_config_action(
*args, method=_add_arg, section=section, key=key, **kwargs)
group.add_argument = add_argument
return group
def _add_config_action(self, *args, method, section, key, **kwargs):
assert callable(method), 'method must be a callable'
if (section is None) != (key is None):
raise ValueError(lang._(
'section and key must be specified together'))
try:
if kwargs['action'] in ('store_true', 'store_false'):
type = boolean
except KeyError:
type = kwargs.get('type', str)
action = method(*args, **kwargs)
if key is not None:
with suppress(KeyError):
if self._config_map[action.dest] != (section, key, type):
raise ValueError(lang._(
'section and key must match for all equivalent dest '
'values'))
self._config_map[action.dest] = (section, key, type)
return action
[docs]
def read_configs(self, paths):
"""
Constructs a :class:`~configparser.ConfigParser` instance, and reads
the configuration files specified by *paths*, a list of
:class:`~pathlib.Path`-like objects, into it.
The method will check the configuration for valid section and key
names, raising :exc:`ValueError` on invalid items. It will also resolve
any configuration values that have the type :class:`~pathlib.Path`
relative to the path of the configuration file in which they were
defined.
The return value is the configuration parser instance.
"""
# NOTE: We cheat in several places here to deal with the board:*
# sections in the default.conf. If you use this class elsewhere, adjust
# these accordingly
if self._template is None:
config = self._get_config_parser()
else:
config = deepcopy(self._template)
valid = {config.default_section: set()}
for section, keys in config.items():
for key in keys:
valid.setdefault(
'board:*' if section.startswith('board:') else section,
set()
).add(key)
for section in {s for s in config if s.startswith('board:')}:
del config[section]
# Figure out which configuration items represent paths. These will need
# special handling when loading configuration files as the values will
# be resolved relative to the containing configuration file
path_items = self.of_type(Path)
path_items |= {('board:*', 'image')}
# Attempt to load each of the specified locations; these are done
# strictly in order to permit the customary hierarchy of configuration
# files (/lib, /etc, ~) to override each other
to_read = [Path(p) for p in paths]
while to_read:
path = to_read.pop(0).expanduser()
config.read(path)
# If a template was provided upon construction, validate sections
# and keys against those in the template
if self._template is not None:
for section, keys in config.items():
try:
section = {
s for s in valid if fnmatchcase(section, s)}.pop()
except KeyError:
raise ValueError(lang._(
'{path}: invalid section [{section}]'
.format(path=path, section=section)))
for key in set(keys) - valid[section]:
raise ValueError(lang._(
'{path}: invalid key {key} in [{section}]'
.format(path=path, key=key, section=section)))
# Resolve paths relative to the configuration file just loaded
for glob, key in path_items:
for section in {s for s in config if fnmatchcase(s, glob)}:
if key in config[section]:
value = Path(config[section][key]).expanduser()
if not value.is_absolute():
value = (path.parent / value).resolve()
config[section][key] = str(value)
return config
[docs]
def set_defaults_from(self, config):
"""
Sets defaults for all arguments from their associated configuration
entries in *config*.
"""
kwargs = {
dest:
config.getboolean(section, key)
if type is boolean else
config[section][key]
for dest, (section, key, type) in self._config_map.items()
if section in config
and key in config[section]
}
return super().set_defaults(**kwargs)
[docs]
def update_config(self, config, namespace):
"""
Copy values from *namespace* (an :class:`argparse.Namespace`,
presumably the result of calling something like
:meth:`~argparse.ArgumentParser.parse_args`) to *config*, a
:class:`~configparser.ConfigParser`. Note that namespace values will be
converted to :class:`str` implicitly.
"""
for dest, (section, key, type) in self._config_map.items():
config[section][key] = str(getattr(namespace, dest))
[docs]
def of_type(self, type):
"""
Return a set of (section, key) tuples listing all configuration items
which were defined as being of the specified *type* (with the *type*
keyword passed to :meth:`add_argument`.
"""
return {
(section, key)
for section, key, item_type in self._config_map.values()
if item_type is type
}
[docs]
def port(s):
"""
Convert the string *s* into a port number. The string may either contain
an integer representation (in which case the conversion is trivial, or a
port name, in which case :func:`socket.getservbyname` will be used to
convert it to a port number (usually via NSS).
"""
try:
return int(s)
except ValueError:
try:
return socket.getservbyname(s)
except OSError:
raise ValueError(lang._('invalid service name or port number'))
[docs]
def boolean(s):
"""
Convert the string *s* to a :class:`bool`. A typical set of case
insensitive strings are accepted: "yes", "y", "true", "t", and "1" are
converted to :data:`True`, while "no", "n", "false", "f", and "0" convert
to :data:`False`. Other values will result in :exc:`ValueError`.
"""
try:
return {
'n': False,
'no': False,
'f': False,
'false': False,
'0': False,
'y': True,
'yes': True,
't': True,
'true': True,
'1': True,
}[str(s).strip().lower()]
except KeyError:
raise ValueError(lang._('invalid boolean value: {s}'.format(s=s)))
[docs]
def size(s):
"""
Convert the string *s*, which must contain a number followed by an optional
suffix (MB for mega-bytes, GB, for giga-bytes, etc.), and return the
absolute integer value (scale the number in the string by the suffix
given).
"""
for power, suffix in enumerate(['KB', 'MB', 'GB', 'TB'], start=1):
if s.endswith(suffix):
n = Decimal(s[:-len(suffix)])
result = int(n * 2 ** (10 * power))
break
else:
if s.endswith('B'):
result = int(s[:-1])
else:
# No recognized suffix; attempt straight conversion
result = int(s)
return result
def serial(s):
"""
Convert the string *s*, which must contain a number in hexidecimal format
to an :class:`int`. If *s* begins with either "10000000" or "00000000" then
these prefixes will be discarded. This is intended to provide a
representation of a Raspberry Pi's serial number that is consistent with
that used by the TFTP client in the Pi's bootloader.
"""
s = s.strip()
if len(s) >= 16 and (s.startswith('10000000') or s.startswith('00000000')):
s = s[8:]
value = int(s, base=16)
if not 0 <= value <= 0xFFFFFFFF:
raise ValueError(lang._(
'serial number is out of range: {value}'.format(value=value)))
return value
[docs]
class Board(namedtuple('Board', ('serial', 'image', 'partition', 'ip'))):
"""
Represents a known board, recording its *serial* number, the *image*
(filename) that the board should boot, the *partition* number within the
*image* that contains the boot partition, and the IP address (if any) that
the board should have.
"""
[docs]
@classmethod
def from_section(cls, config, section):
"""
Construct a new :class:`Board` from the specified *section* of the
*config* (a mapping, e.g. a :class:`~configparser.ConfigParser`
section).
"""
if not section.startswith('board:'):
raise ValueError(lang._(
'invalid section name: {section}'.format(section=section)))
values = config[section]
sernum = serial(section[len('board:'):])
image = values['image']
part = int(values.get('partition', 1))
try:
ip = ip_address(values['ip'])
except KeyError:
ip = None
return cls(sernum, Path(image), part, ip)
[docs]
@classmethod
def from_string(cls, s):
"""
Construct a new :class:`Board` from the string *s* which is expected to
be a comma-separated list of serial number, filename, partition number,
and IP address. The last two parts (partition number and IP address)
are optional and default to 1 and :data:`None` respectively.
"""
sernum, image, *extra = s.split(',')
sernum = serial(sernum)
ip = part = None
if len(extra) > 2:
raise ValueError(lang._(
'expected serial,filename,[part],[ip] instead of {s}'
.format(s=s)))
elif len(extra) > 1:
part = extra[0]
ip = extra[1]
elif len(extra) > 0:
part = extra[0]
if part:
try:
part = int(part)
except ValueError:
raise ValueError(lang._(
'invalid partition number {part!r}'.format(part=part)))
else:
part = 1
if ip is not None:
ip = ip_address(ip)
return cls(sernum, Path(image), part, ip)
def __str__(self):
return '\n'.join((
f"[board:{self.serial:x}]",
f"image = {self.image}",
f"partition = {self.partition:d}",
) + ((f"ip = {self.ip}",) if self.ip is not None else ()))
_SPANS = {
span: re.compile(fr'(?:(?P<num>[+-]?\d+)\s*{suffix}\b)')
for span, suffix in [
('microseconds', '(micro|u|µ)s(ec(ond)?s?)?'),
('milliseconds', '(milli|m)s(ec(ond)?s?)?'),
('seconds', 's(ec(ond)?s?)?'),
('minutes', 'm(i(n(ute)?s?)?)?'),
('hours', 'h((ou)?rs?)?'),
]
}
[docs]
def duration(s):
"""
Convert the string *s* to a :class:`~datetime.timedelta`. The string must
consist of white-space and/or comma separated values which are a number
followed by a suffix indicating duration. For example:
>>> duration('1s')
timedelta(seconds=1)
>>> duration('5 minutes, 30 seconds')
timedelta(seconds=330)
The set of possible durations, and their recognized suffixes is as follows:
* *Microseconds*: microseconds, microsecond, microsec, micros, micro,
useconds, usecond, usecs, usec, us, µseconds, µsecond, µsecs, µsec, µs
* *Milliseconds*: milliseconds, millisecond, millisec, millis, milli,
mseconds, msecond, msecs, msec, ms
* *Seconds*: seconds, second, secs, sec, s
* *Minutes*: minutes, minute, mins, min, mi, m
* *Hours*: hours, hour, hrs, hr, h
If conversion fails, :exc:`ValueError` is raised.
"""
spans = {}
t = s
for span, regex in _SPANS.items():
m = regex.search(t)
if m:
spans[span] = spans.get(span, 0) + int(m.group('num'))
t = (t[:m.start(0)] + t[m.end(0):]).strip(' \t\n,')
if not t:
break
if t:
raise ValueError(lang._('invalid duration {s}'.format(s=s)))
return dt.timedelta(**spans)