HEX
Server: Apache
System: Linux clpupre 5.4.0-90-generic #101-Ubuntu SMP Fri Oct 15 20:00:55 UTC 2021 x86_64
User: undanet (1000)
PHP: 7.4.3
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
Upload Files
File: //proc/thread-self/root/proc/thread-self/root/usr/share/netplan/netplan/cli/commands/set.py
#!/usr/bin/python3
#
# Copyright (C) 2020 Canonical, Ltd.
# Author: Lukas Märdian <lukas.maerdian@canonical.com>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation; version 3.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

'''netplan set command line'''

import os
import yaml
import tempfile
import re
import logging
import shutil

import netplan.cli.utils as utils
from netplan.configmanager import ConfigManager

FALLBACK_HINT = '70-netplan-set'
GLOBAL_KEYS = ['renderer', 'version']


class NetplanSet(utils.NetplanCommand):

    def __init__(self):
        super().__init__(command_id='set',
                         description='Add new setting by specifying a dotted key=value pair like ethernets.eth0.dhcp4=true',
                         leaf=True)

    def run(self):
        self.parser.add_argument('key_value', type=str,
                                 help='The nested key=value pair in dotted format. Value can be NULL to delete a key.')
        self.parser.add_argument('--origin-hint', type=str,
                                 help='Can be used to help choose a name for the overwrite YAML file. \
                                       A .yaml suffix will be appended automatically.')
        self.parser.add_argument('--root-dir', default='/',
                                 help='Overwrite configuration files in this root directory instead of /')

        self.func = self.command_set

        self.parse_args()
        self.run_command()

    def split_tree_by_hint(self, set_tree) -> (str, dict):
        network = set_tree.get('network', {})
        # A mapping of 'origin-hint' -> YAML tree (one subtree per netdef)
        subtrees = dict()
        for devtype in network:
            if devtype in GLOBAL_KEYS:
                continue  # special handling of global keys down below
            devtype_content = network.get(devtype, [])
            # Special case: removal of a whole devtype.
            # We replace the devtype null node with a dict of all defined netdefs
            # set to None.
            if devtype_content is None:
                devtype_content = {dev: None for dev in utils.netplan_get_ids_for_devtype(devtype, self.root_dir)}
                network[devtype] = devtype_content
            for netdef in devtype_content:
                hint = FALLBACK_HINT
                filename = utils.netplan_get_filename_by_id(netdef, self.root_dir)
                if filename:
                    hint = os.path.basename(filename)[:-5]  # strip prefix and .yaml
                netdef_tree = {'network': {devtype: {netdef: network.get(devtype).get(netdef)}}}
                # Merge all netdef trees which are going to be written to the same file/hint
                subtrees[hint] = self.merge(subtrees.get(hint, {}), netdef_tree)

        # Merge GLOBAL_KEYS into one of the available subtrees
        # Write to same file (if only one hint/subtree is available)
        # Write to FALLBACK_HINT if multiple hints/subtrees are available, as we do not know where it is supposed to go
        if any(network.get(key) for key in GLOBAL_KEYS):
            # Write to the same file, if we have only one file-hint or to FALLBACK_HINT otherwise
            hint = list(subtrees)[0] if len(subtrees) == 1 else FALLBACK_HINT
            for key in GLOBAL_KEYS:
                tree = {'network': {key: network.get(key)}}
                subtrees[hint] = self.merge(subtrees.get(hint, {}), tree)

        # return a list of (str:hint, dict:subtree) tuples
        return subtrees.items()

    def command_set(self):
        if self.origin_hint is not None and len(self.origin_hint) == 0:
            raise Exception('Invalid/empty origin-hint')
        split = self.key_value.split('=', 1)
        if len(split) != 2:
            raise Exception('Invalid value specified')
        key, value = split
        set_tree = self.parse_key(key, yaml.safe_load(value))

        hints = [(self.origin_hint, set_tree)]
        # Override YAML config in each individual netdef file if origin-hint is not set
        if self.origin_hint is None:
            hints = self.split_tree_by_hint(set_tree)

        for hint, subtree in hints:
            self.write_file(subtree, hint + '.yaml', self.root_dir)

    def parse_key(self, key, value):
        # The 'network.' prefix is optional for netsted keys, its always assumed to be there
        if not key.startswith('network.') and not key == 'network':
            key = 'network.' + key
        # Split at '.' but not at '\.' via negative lookbehind expression
        split = re.split(r'(?<!\\)\.', key)
        tree = {}
        i = 1
        t = tree
        for part in split:
            part = part.replace('\\.', '.')  # Unescape interface-ids, containing dots
            val = {}
            if i == len(split):
                val = value
            t = t.setdefault(part, val)
            i += 1
        return tree

    def merge(self, a, b, path=None):
        """
        Merges tree/dict 'b' into tree/dict 'a'
        """
        if path is None:
            path = []
        for key in b:
            if key in a:
                if isinstance(a[key], dict) and isinstance(b[key], dict):
                    self.merge(a[key], b[key], path + [str(key)])
                elif b[key] is None:
                    del a[key]
                else:
                    # Overwrite existing key with new key/value from 'set' command
                    a[key] = b[key]
            else:
                a[key] = b[key]
        return a

    def write_file(self, set_tree, name, rootdir='/'):
        tmproot = tempfile.TemporaryDirectory(prefix='netplan-set_')
        path = os.path.join('etc', 'netplan')
        os.makedirs(os.path.join(tmproot.name, path))

        config = {'network': {}}
        absp = os.path.join(rootdir, path, name)
        if os.path.isfile(absp):
            with open(absp, 'r') as f:
                config = yaml.safe_load(f)

        new_tree = self.merge(config, set_tree)
        stripped = ConfigManager.strip_tree(new_tree)
        logging.debug('Writing file {}: {}'.format(name, stripped))
        if 'network' in stripped and list(stripped['network'].keys()) == ['version']:
            # Clear file if only 'network: {version: 2}' is left
            os.remove(absp)
        elif 'network' in stripped:
            tmpp = os.path.join(tmproot.name, path, name)
            with open(tmpp, 'w+') as f:
                new_yaml = yaml.dump(stripped, indent=2, default_flow_style=False)
                f.write(new_yaml)
            # Validate the newly created file, by parsing it via libnetplan
            utils.netplan_parse(tmpp)
            # Valid, move it to final destination
            shutil.copy2(tmpp, absp)
            os.remove(tmpp)
        elif os.path.isfile(absp):
            # Clear file if the last/only key got removed
            os.remove(absp)
        else:
            raise Exception('Invalid input: {}'.format(set_tree))