| Viewing file:  bsd.py (8.43 KB)      -rw-r--r-- Select action/file-type:
 
  (+) |  (+) |  (+) | Code (+) | Session (+) |  (+) | SDB (+) |  (+) |  (+) |  (+) |  (+) |  (+) | 
 
# This file is part of cloud-init. See LICENSE file for license information.
 import logging
 import re
 from typing import Optional
 
 from cloudinit import net, subp, util
 from cloudinit.distros import bsd_utils
 from cloudinit.distros.parsers.resolv_conf import ResolvConf
 from cloudinit.net import renderer
 from cloudinit.net.network_state import NetworkState
 
 LOG = logging.getLogger(__name__)
 
 
 class BSDRenderer(renderer.Renderer):
 resolv_conf_fn = "etc/resolv.conf"
 rc_conf_fn = "etc/rc.conf"
 interface_routes = ""
 route_names = ""
 
 def get_rc_config_value(self, key):
 fn = subp.target_path(self.target, self.rc_conf_fn)
 bsd_utils.get_rc_config_value(key, fn=fn)
 
 def set_rc_config_value(self, key, value):
 fn = subp.target_path(self.target, self.rc_conf_fn)
 bsd_utils.set_rc_config_value(key, value, fn=fn)
 
 def __init__(self, config=None):
 if not config:
 config = {}
 self.target = None
 self.interface_configurations = {}
 self.interface_configurations_ipv6 = {}
 self._postcmds = config.get("postcmds", True)
 
 def _ifconfig_entries(self, settings):
 ifname_by_mac = net.get_interfaces_by_mac()
 for interface in settings.iter_interfaces():
 device_name = interface.get("name")
 device_mac = interface.get("mac_address")
 if device_name and re.match(r"^lo\d+$", device_name):
 continue
 if device_mac not in ifname_by_mac:
 LOG.info("Cannot find any device with MAC %s", device_mac)
 elif device_mac and device_name:
 cur_name = ifname_by_mac[device_mac]
 if cur_name != device_name:
 LOG.info(
 "netif service will rename interface %s to %s",
 cur_name,
 device_name,
 )
 try:
 self.rename_interface(cur_name, device_name)
 except NotImplementedError:
 LOG.error(
 "Interface renaming is not supported on this OS"
 )
 device_name = cur_name
 
 else:
 device_name = ifname_by_mac[device_mac]
 
 LOG.info("Configuring interface %s", device_name)
 
 for subnet in interface.get("subnets", []):
 if subnet.get("type") == "static":
 if not subnet.get("netmask"):
 LOG.debug(
 "Skipping IP %s, because there is no netmask",
 subnet.get("address"),
 )
 continue
 LOG.debug(
 "Configuring dev %s with %s / %s",
 device_name,
 subnet.get("address"),
 subnet.get("netmask"),
 )
 
 self.interface_configurations[device_name] = {
 "address": subnet.get("address"),
 "netmask": subnet.get("netmask"),
 "mtu": subnet.get("mtu") or interface.get("mtu"),
 }
 
 elif subnet.get("type") == "static6":
 if not subnet.get("prefix"):
 LOG.debug(
 "Skipping IP %s, because there is no prefix",
 subnet.get("address"),
 )
 continue
 LOG.debug(
 "Configuring dev %s with %s / %s",
 device_name,
 subnet.get("address"),
 subnet.get("prefix"),
 )
 
 self.interface_configurations_ipv6[device_name] = {
 "address": subnet.get("address"),
 "prefix": subnet.get("prefix"),
 "mtu": subnet.get("mtu") or interface.get("mtu"),
 }
 elif (
 subnet.get("type") == "dhcp"
 or subnet.get("type") == "dhcp4"
 ):
 self.interface_configurations[device_name] = "DHCP"
 
 def _route_entries(self, settings):
 routes = list(settings.iter_routes())
 for interface in settings.iter_interfaces():
 subnets = interface.get("subnets", [])
 for subnet in subnets:
 if subnet.get("type") == "static":
 gateway = subnet.get("gateway")
 if gateway and len(gateway.split(".")) == 4:
 routes.append(
 {
 "network": "0.0.0.0",
 "netmask": "0.0.0.0",
 "gateway": gateway,
 }
 )
 elif subnet.get("type") == "static6":
 gateway = subnet.get("gateway")
 if gateway and len(gateway.split(":")) > 1:
 routes.append(
 {
 "network": "::",
 "prefix": "0",
 "gateway": gateway,
 }
 )
 else:
 continue
 routes += subnet.get("routes", [])
 
 for route in routes:
 network = route.get("network")
 if not network:
 LOG.debug("Skipping a bad route entry")
 continue
 netmask = (
 route.get("netmask")
 if route.get("netmask")
 else route.get("prefix")
 )
 gateway = route.get("gateway")
 self.set_route(network, netmask, gateway)
 
 def _resolve_conf(self, settings):
 nameservers = settings.dns_nameservers
 searchdomains = settings.dns_searchdomains
 for interface in settings.iter_interfaces():
 for subnet in interface.get("subnets", []):
 if "dns_nameservers" in subnet:
 nameservers.extend(subnet["dns_nameservers"])
 if "dns_search" in subnet:
 searchdomains.extend(subnet["dns_search"])
 # Try to read the /etc/resolv.conf or just start from scratch if that
 # fails.
 try:
 resolvconf = ResolvConf(
 util.load_file(
 subp.target_path(self.target, self.resolv_conf_fn)
 )
 )
 resolvconf.parse()
 except IOError:
 util.logexc(
 LOG,
 "Failed to parse %s, use new empty file",
 subp.target_path(self.target, self.resolv_conf_fn),
 )
 resolvconf = ResolvConf("")
 resolvconf.parse()
 
 # Add some nameservers
 for server in set(nameservers):
 try:
 resolvconf.add_nameserver(server)
 except ValueError:
 util.logexc(LOG, "Failed to add nameserver %s", server)
 
 # And add any searchdomains.
 for domain in set(searchdomains):
 try:
 resolvconf.add_search_domain(domain)
 except ValueError:
 util.logexc(LOG, "Failed to add search domain %s", domain)
 util.write_file(
 subp.target_path(self.target, self.resolv_conf_fn),
 str(resolvconf),
 0o644,
 )
 
 def render_network_state(
 self,
 network_state: NetworkState,
 templates: Optional[dict] = None,
 target=None,
 ) -> None:
 if target:
 self.target = target
 self._ifconfig_entries(settings=network_state)
 self._route_entries(settings=network_state)
 self._resolve_conf(settings=network_state)
 
 self.write_config()
 self.start_services(run=self._postcmds)
 
 def dhcp_interfaces(self):
 ic = self.interface_configurations.items
 return [k for k, v in ic() if v == "DHCP"]
 
 def start_services(self, run=False):
 raise NotImplementedError()
 
 def write_config(self, target=None):
 raise NotImplementedError()
 
 def rename_interface(self, cur_name, device_name):
 raise NotImplementedError()
 
 def set_route(self, network, netmask, gateway):
 raise NotImplementedError()
 
 |