# -*- coding: utf-8 -*-
# vim:ts=4:sw=4:expandtab 2
# Copyright 2016, 2017 juga (juga at riseup dot net), MIT license.
"""Client class for the DHCP client implementation of the Anonymity Profile
([:rfc:`7844`])."""
from __future__ import absolute_import
# from __future__ import unicode_literals
import logging
import attr
from netaddr import IPNetwork
from scapy.arch import get_if_raw_hwaddr
from scapy.config import conf
from scapy.layers.dhcp import BOOTP, DHCP
from scapy.layers.inet import IP, UDP
from scapy.layers.l2 import Ether
from scapy.utils import mac2str, str2mac
from .constants import (BROADCAST_ADDR, BROADCAST_MAC, CLIENT_PORT,
DHCP_EVENTS, DHCP_OFFER_OPTIONS, META_ADDR,
SERVER_PORT, PRL)
from .dhcpcaputils import gen_xid
from .dhcpcaplease import DHCPCAPLease
logger = logging.getLogger('dhcpcanon')
[docs]
@attr.s
class DHCPCAP(object):
"""."""
iface = attr.ib(default=None)
client_mac = attr.ib(default=None)
client_ip = attr.ib(default=META_ADDR)
client_port = attr.ib(default=CLIENT_PORT)
server_mac = attr.ib(default=BROADCAST_MAC)
server_ip = attr.ib(default=BROADCAST_ADDR)
server_port = attr.ib(default=SERVER_PORT)
lease = attr.ib(default=attr.Factory(DHCPCAPLease))
event = attr.ib(default=None)
prl = attr.ib(default=None)
xid = attr.ib(default=None)
def __attrs_post_init__(self):
"""Initializes attributes after attrs __init__.
These attributes do not change during the life of the object.
"""
logger.debug('Creating new DHCPCAP obj.')
if self.iface is None:
self.iface = conf.iface
if self.client_mac is None:
_, client_mac = get_if_raw_hwaddr(self.iface)
self.client_mac = str2mac(client_mac)
if self.prl is None:
self.prl = PRL
if self.xid is None:
self.xid = gen_xid()
logger.debug('Modifying Lease obj, setting iface.')
self.lease.interface = self.iface
[docs]
def gen_ether_ip(self):
"""Generates link layer and IP layer part of DHCP packet.
For broadcast packets is:
Ether(src=client_mac, dst="ff:ff:ff:ff:ff:ff") /
IP(src="0.0.0.0", dst="255.255.255.255") /
"""
ether_ip = (Ether(src=self.client_mac, dst=BROADCAST_MAC) /
IP(src=META_ADDR, dst=BROADCAST_ADDR))
return ether_ip
[docs]
def gen_ether_ip_unicast(self):
"""Generates link layer and IP layer part of DHCP packet.
For unicast packets is:
Ether(src=client_mac, dst=server_mac) /
IP(src=client_ip?, dst=server_ip) /
"""
ether_ip = (Ether(src=self.client_mac, dst=self.server_mac) /
IP(src=self.client_ip, dst=self.server_ip))
return ether_ip
[docs]
def gen_udp(self):
"""Generates UDP layer part of DHCP packet.
UDP layer is always:
UDP(sport=68, dport=67) /
"""
udp = (UDP(sport=self.client_port, dport=self.server_port))
return udp
[docs]
def gen_bootp(self):
"""Generates BOOTP layer part of DHCP packet.
[ :rfc:`7844#section-3.4` ] ::
The presence of this address is necessary for the proper operation
of the DHCP service.
[:rfc:`7844#section-3.`] ::
MAY contain the Client Identifier option,
"""
bootp = (
BOOTP(chaddr=[mac2str(self.client_mac)], xid=self.xid)
)
return bootp
[docs]
def gen_bootp_unicast(self):
"""Generates BOOTP layer part of unicast DHCP packet.
Same comments as in gen_bootp
"""
bootp = (
BOOTP(chaddr=[mac2str(self.client_mac)], xid=self.xid,
ciaddr=self.client_ip)
)
return bootp
[docs]
def gen_discover(self):
"""
Generate DHCP DISCOVER packet.
[:rfc:`7844#section-3.1`] ::
SHOULD randomize the ordering of options
If this can not be implemented
MAY order the options by option code number (lowest to highest).
[:rfc:`7844#section-3.`] ::
MAY contain the Parameter Request List option.
"""
dhcp_discover = (
self.gen_ether_ip() /
self.gen_udp() /
self.gen_bootp() /
DHCP(options=[
("message-type", "discover"),
("client_id", mac2str(self.client_mac)),
("param_req_list", self.prl),
"end"
])
)
logger.debug('Generated discover %s.', dhcp_discover.summary())
return dhcp_discover
[docs]
def gen_request(self):
"""
Generate DHCP REQUEST packet.
[:rfc:`7844#section-3.1`] ::
SHOULD randomize the ordering of options
If this can not be implemented
MAY order the options by option code number (lowest to highest).
[:rfc:`7844#section-3.`] ::
MAY contain the Parameter Request List option.
If in response to a DHCPOFFER,::
MUST contain the corresponding Server Identifier option
MUST contain the Requested IP address option.
If the message is not in response to a DHCPOFFER (BOUND, RENEW),::
MAY contain a Requested IP address option
"""
dhcp_req = (
self.gen_ether_ip() /
self.gen_udp() /
self.gen_bootp() /
DHCP(options=[
("message-type", "request"),
("client_id", mac2str(self.client_mac)),
("param_req_list", self.prl),
("requested_addr", self.lease.address),
("server_id", self.lease.server_id),
"end"])
)
logger.debug('Generated request %s.', dhcp_req.summary())
return dhcp_req
[docs]
def gen_request_unicast(self):
"""
Generate DHCP REQUEST unicast packet.
Same comments as in gen_request apply.
"""
dhcp_req = (
self.gen_ether_ip_unicast() /
self.gen_udp() /
self.gen_bootp_unicast() /
DHCP(options=[
("message-type", "request"),
("client_id", mac2str(self.client_mac)),
("param_req_list", self.prl),
"end"])
)
logger.debug('Generated request %s.', dhcp_req.summary())
return dhcp_req
[docs]
def gen_decline(self):
"""
Generate DHCP decline packet (broadcast).
[:rfc:`7844#section-3.`] ::
MUST contain the Message Type option,
MUST contain the Server Identifier option,
MUST contain the Requested IP address option;
.. note:: currently not being used.
"""
dhcp_decline = (
self.gen_ether_ip() /
self.gen_udp() /
self.gen_bootp() /
DHCP(options=[
("message-type", "decline"),
("server_id", self.server_ip),
("requested_addr", self.client_ip),
"end"])
)
logger.debug('Generated decline.')
logger.debug(dhcp_decline.summary())
return dhcp_decline
[docs]
def gen_release(self):
"""
Generate DHCP release packet (broadcast?).
[:rfc:`7844#section-3.`] ::
MUST contain the Message Type option and
MUST contain the Server Identifier option,
.. note:: currently not being used.
"""
dhcp_release = (
self.gen_ether_ip() /
self.gen_udp() /
self.gen_bootp() /
DHCP(options=[
("message-type", "release"),
("client_id", mac2str(self.client_mac)),
("server_id", self.server_ip),
"end"])
)
logger.debug('Generated release.')
logger.debug(dhcp_release.summary())
return dhcp_release
[docs]
def gen_check_lease_attrs(self, attrs_dict):
"""Generate network mask in CIDR format and subnet.
Validate the given arguments. Otherwise AddrFormatError exception
will be raised and catched in the FSM.
"""
# without some minimal options given by the server, is not possible
# to create new lease
assert attrs_dict['subnet_mask']
assert attrs_dict['address']
# if address and/or network are not valid this will raise an exception
# (AddrFormatError)
ipn = IPNetwork(attrs_dict['address'] + '/' +
attrs_dict['subnet_mask'])
# FIXME:70 should be this option required?
# assert attrs_dict['server_id']
if attrs_dict.get('server_id') is None:
attrs_dict['server_id'] = self.server_ip
# TODO: there should be more complex checking here about getting an
# address in a subnet?
# else:
# if IPAddress('server_id') not in ipn:
# raise ValueError("server_id is not in the same network as"
# "the offered address.")
if attrs_dict.get('router') is None:
attrs_dict['router'] = attrs_dict['server_id']
ripn = IPNetwork(attrs_dict['router'] + '/' +
attrs_dict['subnet_mask'])
assert ripn.network == ipn.network
# set the options that are not given by the server
attrs_dict['subnet_mask_cidr'] = str(ipn.prefixlen)
attrs_dict['subnet'] = str(ipn.network)
# check other options that might not be given by the server
if attrs_dict.get('broadcast_address') is None:
attrs_dict['broadcast_address'] = str(ipn.broadcast)
if attrs_dict.get('name_server') is None:
attrs_dict['name_server'] = attrs_dict['server_id']
if attrs_dict.get('next_server') is None:
attrs_dict['next_server'] = attrs_dict['server_id']
logger.debug('Net values are valid')
return attrs_dict
[docs]
def handle_offer_ack(self, pkt, time_sent_request=None):
"""Create a lease object with the values in OFFER/ACK packet."""
attrs_dict = dict()
for opt in pkt[DHCP].options:
if isinstance(opt, tuple) and opt[0] in DHCP_OFFER_OPTIONS:
v = opt[1] if len(opt[1:]) < 2 else ' '.join(opt[1:])
v = str(v.decode('utf8')) if isinstance(v, bytes) else str(v)
attrs_dict[opt[0]] = v
attrs_dict.update({
"interface": self.iface,
"address": pkt[BOOTP].yiaddr,
"next_server": pkt[BOOTP].siaddr,
})
# this function changes the dict
self.gen_check_lease_attrs(attrs_dict)
logger.debug('Creating Lease obj.')
logger.debug('with attrs %s', attrs_dict)
lease = DHCPCAPLease(**attrs_dict)
return lease
[docs]
def handle_offer(self, pkt):
"""."""
logger.debug("Handling Offer.")
logger.debug('Modifying obj DHCPCAP, setting lease.')
self.lease = self.handle_offer_ack(pkt)
[docs]
def handle_ack(self, pkt, time_sent_request):
"""."""
logger.debug("Handling ACK.")
logger.debug('Modifying obj DHCPCAP, setting server data.')
self.server_mac = pkt[Ether].src
self.server_ip = pkt[IP].src
self.server_port = pkt[UDP].sport
event = DHCP_EVENTS['IP_ACQUIRE']
# FIXME:0 check the fields match the previously offered ones?
# FIXME:50 create a new object also on renewing/rebinding
# or only set_times?
lease = self.handle_offer_ack(pkt, time_sent_request)
lease.set_times(time_sent_request)
if self.lease is not None:
if (self.lease.address != lease.address or
self.lease.subnet_mask != lease.subnet_mask or
self.lease.router != lease.router):
event = DHCP_EVENTS['IP_CHANGE']
else:
event = DHCP_EVENTS['RENEW']
logger.debug('Modifying obj DHCPCAP, setting lease, client ip, event.')
self.lease = lease
self.client_ip = self.lease.address
self.event = event
return event