| Viewing file:  ifconfig.py (8.88 KB)      -rw-r--r-- Select action/file-type:
 
  (+) |  (+) |  (+) | Code (+) | Session (+) |  (+) | SDB (+) |  (+) |  (+) |  (+) |  (+) |  (+) | 
 
# Copyright(C) 2022 FreeBSD Foundation#
 # Author: Mina Galić <me+FreeBSD@igalic.co>
 #
 # This file is part of cloud-init. See LICENSE file for license information.
 
 import copy
 import logging
 import re
 from collections import defaultdict
 from functools import lru_cache
 from ipaddress import IPv4Address, IPv4Interface, IPv6Interface
 from typing import Dict, List, Optional, Tuple, Union
 
 LOG = logging.getLogger(__name__)
 
 MAC_RE = r"""([0-9a-fA-F]{2}:){5}[0-9a-fA-F]{2}"""
 
 
 class Ifstate:
 """
 This class holds the parsed state of a BSD network interface.
 It is itself side-effect free.
 All methods with side-effects should be implemented on one of the
 ``BSDNetworking`` classes.
 """
 
 def __init__(self, name):
 self.name = name
 self.index: int = 0
 self.inet = {}
 self.inet6 = {}
 self.up = False
 self.options = []
 self.nd6 = []
 self.flags = []
 self.mtu: int = 0
 self.metric: int = 0
 self.groups = []
 self.description: Optional[str] = None
 self.media: Optional[str] = None
 self.status: Optional[str] = None
 self.mac: Optional[str] = None
 self.macs = []
 self.vlan = {}
 self.members = []
 
 @property
 def is_loopback(self) -> bool:
 return "loopback" in self.flags or "lo" in self.groups
 
 @property
 def is_physical(self) -> bool:
 # OpenBSD makes this very easy:
 if "egress" in self.groups:
 return True
 if self.groups == [] and self.media and "Ethernet" in self.media:
 return True
 return False
 
 @property
 def is_bridge(self) -> bool:
 return "bridge" in self.groups
 
 @property
 def is_bond(self) -> bool:
 return "lagg" in self.groups
 
 @property
 def is_vlan(self) -> bool:
 return ("vlan" in self.groups) or (self.vlan != {})
 
 
 class Ifconfig:
 """
 A parser for BSD style ``ifconfig(8)`` output.
 For documentation here:
 - https://man.freebsd.org/ifconfig(8)
 - https://man.netbsd.org/ifconfig.8
 - https://man.openbsd.org/ifconfig.8
 All output is considered equally, and then massaged into a singular form:
 an ``Ifstate`` object.
 """
 
 def __init__(self):
 self._ifs_by_name = {}
 self._ifs_by_mac = {}
 
 @lru_cache()
 def parse(self, text: str) -> Dict[str, Union[Ifstate, List[Ifstate]]]:
 """
 Parse the ``ifconfig -a`` output ``text``, into a dict of ``Ifstate``
 objects, referenced by ``name`` *and* by ``mac`` address.
 
 This dict will always be the same, given the same input, so we can
 ``@lru_cache()`` it. n.b.: ``@lru_cache()`` takes only the
 ``__hash__()`` of the input (``text``), so it should be fairly quick,
 despite our giant inputs.
 
 @param text: The output of ``ifconfig -a``
 @returns: A dict of ``Ifstate``s, referenced by ``name`` and ``mac``
 """
 ifindex = 0
 ifs_by_mac = defaultdict(list)
 for line in text.splitlines():
 if len(line) == 0:
 continue
 if line[0] not in ("\t", " "):
 # We hit the start of a device block in the ifconfig output
 # These start with devN: flags=NNNN<flags> and then continue
 # *indented* for the rest of the config.
 # Here our loop resets ``curif`` & ``dev`` to this new device
 ifindex += 1
 curif = line.split()[0]
 # current ifconfig pops a ':' on the end of the device
 if curif.endswith(":"):
 curif = curif[:-1]
 dev = Ifstate(curif)
 dev.index = ifindex
 self._ifs_by_name[curif] = dev
 
 toks = line.lower().strip().split()
 
 if len(toks) > 1 and toks[1].startswith("flags="):
 flags = self._parse_flags(toks)
 if flags != {}:
 dev.flags = copy.deepcopy(flags["flags"])
 dev.up = flags["up"]
 if "mtu" in flags:
 dev.mtu = flags["mtu"]
 if "metric" in flags:
 dev.metric = flags["metric"]
 if toks[0].startswith("capabilities="):
 caps = re.split(r"<|>", toks[0])
 dev.flags.append(caps)
 
 if toks[0] == "index":
 # We have found a real index! override our fake one
 dev.index = int(toks[1])
 
 if toks[0] == "description:":
 dev.description = line[line.index(":") + 2 :]
 
 if (
 toks[0].startswith("options=")
 or toks[0].startswith("ec_capabilities")
 or toks[0].startswith("ec_enabled")
 ):
 options = re.split(r"<|>", toks[0])
 if len(options) > 1:
 dev.options += options[1].split(",")
 
 # We also store the Ifstate reference under all mac addresses
 # so we can easier reverse-find it.
 if toks[0] == "ether":
 dev.mac = toks[1]
 dev.macs.append(toks[1])
 ifs_by_mac[toks[1]].append(dev)
 if toks[0] == "hwaddr":
 dev.macs.append(toks[1])
 ifs_by_mac[toks[1]].append(dev)
 
 if toks[0] == "groups:":
 dev.groups += toks[1:]
 
 if toks[0] == "media:":
 dev.media = line[line.index(": ") + 2 :]
 
 if toks[0] == "nd6":
 nd6_opts = re.split(r"<|>", toks[0])
 if len(nd6_opts) > 1:
 dev.nd6 = nd6_opts[1].split(",")
 
 if toks[0] == "status":
 dev.status = toks[1]
 
 if toks[0] == "inet":
 ip = self._parse_inet(toks)
 dev.inet[ip[0]] = copy.deepcopy(ip[1])
 
 if toks[0] == "inet6":
 ip = self._parse_inet6(toks)
 dev.inet6[ip[0]] = copy.deepcopy(ip[1])
 
 # bridges and ports are kind of the same thing, right?
 if toks[0] == "member:" or toks[0] == "laggport:":
 dev.members += toks[1]
 
 if toks[0] == "vlan:":
 dev.vlan = {}
 dev.vlan["id"] = toks[1]
 for i in range(2, len(toks)):
 if toks[i] == "interface:":
 dev.vlan["link"] = toks[i + 1]
 
 self._ifs_by_mac = dict(ifs_by_mac)
 return {**self._ifs_by_name, **self._ifs_by_mac}
 
 def ifs_by_mac(self):
 return self._ifs_by_mac
 
 def _parse_inet(self, toks: list) -> Tuple[str, dict]:
 broadcast = None
 if "/" in toks[1]:
 ip = IPv4Interface(toks[1])
 netmask = str(ip.netmask)
 if "broadcast" in toks:
 broadcast = toks[toks.index("broadcast") + 1]
 else:
 netmask = str(IPv4Address(int(toks[3], 0)))
 if "broadcast" in toks:
 broadcast = toks[toks.index("broadcast") + 1]
 ip = IPv4Interface("%s/%s" % (toks[1], netmask))
 
 prefixlen = ip.with_prefixlen.split("/")[1]
 return (
 str(ip.ip),
 {
 "netmask": netmask,
 "broadcast": broadcast,
 "prefixlen": prefixlen,
 },
 )
 
 def _get_prefixlen(self, toks):
 for i in range(2, len(toks)):
 if toks[i] == "prefixlen":
 return toks[i + 1]
 
 def _parse_inet6(self, toks: list) -> Tuple[str, dict]:
 scope = None
 # workaround https://github.com/python/cpython/issues/78969
 if "%" in toks[1]:
 scope = "link-local"
 ip6, rest = toks[1].split("%")
 if "/" in rest:
 prefixlen = rest.split("/")[1]
 else:
 prefixlen = self._get_prefixlen(toks)
 ip = IPv6Interface("%s/%s" % (ip6, prefixlen))
 elif "/" in toks[1]:
 ip = IPv6Interface(toks[1])
 prefixlen = toks[1].split("/")[1]
 else:
 prefixlen = self._get_prefixlen(toks)
 ip = IPv6Interface("%s/%s" % (toks[1], prefixlen))
 
 if not scope and ip.is_link_local:
 scope = "link-local"
 elif not scope and ip.is_site_local:
 scope = "site-local"
 
 return (str(ip.ip), {"prefixlen": prefixlen, "scope": scope})
 
 def _parse_flags(self, toks: list) -> dict:
 flags = re.split(r"<|>", toks[1])
 ret = {}
 if len(flags) > 1:
 ret["flags"] = flags[1].split(",")
 if "up" in ret["flags"]:
 ret["up"] = True
 else:
 ret["up"] = False
 for t in range(2, len(toks)):
 if toks[t] == "metric":
 ret["metric"] = int(toks[t + 1])
 elif toks[t] == "mtu":
 ret["mtu"] = int(toks[t + 1])
 return ret
 
 |