Server IP : 85.214.239.14 / Your IP : 3.145.100.163 Web Server : Apache/2.4.62 (Debian) System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Mon Sep 30 15:36:27 MSK 2024 x86_64 User : www-data ( 33) PHP Version : 7.4.18 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare, MySQL : OFF | cURL : OFF | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : OFF Directory : /lib/python3/dist-packages/ansible_collections/cisco/nxos/plugins/module_utils/network/nxos/ |
Upload File : |
# # This code is part of Ansible, but is an independent component. # # This particular file snippet, and this file snippet only, is BSD licensed. # Modules you write using this snippet, which is embedded dynamically by Ansible # still belong to the author of the module, and may assign their own license # to the complete work. # # Copyright: (c) 2017, Red Hat Inc. # # Redistribution and use in source and binary forms, with or without modification, # are permitted provided that the following conditions are met: # # * Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # * Redistributions in binary form must reproduce the above copyright notice, # this list of conditions and the following disclaimer in the documentation # and/or other materials provided with the distribution. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. # IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE # USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. from __future__ import absolute_import, division, print_function __metaclass__ = type import json import re from copy import deepcopy from ansible.module_utils._text import to_text from ansible.module_utils.common._collections_compat import Mapping from ansible.module_utils.connection import Connection, ConnectionError from ansible.module_utils.six import PY2, PY3 from ansible_collections.ansible.netcommon.plugins.module_utils.network.common.config import ( CustomNetworkConfig, NetworkConfig, dumps, ) from ansible_collections.ansible.netcommon.plugins.module_utils.network.common.utils import ( ComplexList, to_list, ) try: import yaml HAS_YAML = True except ImportError: HAS_YAML = False _DEVICE_CONNECTION = None def get_connection(module): global _DEVICE_CONNECTION if not _DEVICE_CONNECTION: connection_proxy = Connection(module._socket_path) cap = json.loads(connection_proxy.get_capabilities()) if cap["network_api"] == "cliconf": conn = Cli(module) elif cap["network_api"] == "nxapi": conn = HttpApi(module) _DEVICE_CONNECTION = conn return _DEVICE_CONNECTION class Cli: def __init__(self, module): self._module = module self._device_configs = {} self._connection = None def _get_connection(self): if self._connection: return self._connection self._connection = Connection(self._module._socket_path) return self._connection def get_config(self, flags=None): """Retrieves the current config from the device or cache""" flags = [] if flags is None else flags cmd = "show running-config " cmd += " ".join(flags) cmd = cmd.strip() try: return self._device_configs[cmd] except KeyError: connection = self._get_connection() try: out = connection.get_config(flags=flags) except ConnectionError as exc: self._module.fail_json(msg=to_text(exc, errors="surrogate_then_replace")) cfg = to_text(out, errors="surrogate_then_replace").strip() + "\n" self._device_configs[cmd] = cfg return cfg def run_commands(self, commands, check_rc=True): """Run list of commands on remote device and return results""" connection = self._get_connection() try: out = connection.run_commands(commands, check_rc) if check_rc == "retry_json": capabilities = self.get_capabilities() network_api = capabilities.get("network_api") if network_api == "cliconf" and out: for index, resp in enumerate(out): if ( "Invalid command at" in resp or "Ambiguous command at" in resp ) and "json" in resp: if commands[index]["output"] == "json": commands[index]["output"] = "text" out = connection.run_commands(commands, check_rc) return out except ConnectionError as exc: self._module.fail_json(msg=to_text(exc)) def load_config(self, config, return_error=False, opts=None, replace=None): """Sends configuration commands to the remote device""" if opts is None: opts = {} connection = self._get_connection() responses = [] try: resp = connection.edit_config(config, replace=replace) if isinstance(resp, Mapping): resp = resp["response"] except ConnectionError as e: code = getattr(e, "code", 1) message = getattr(e, "err", e) err = to_text(message, errors="surrogate_then_replace") if opts.get("ignore_timeout") and code: responses.append(err) return responses elif code and "no graceful-restart" in err: if "ISSU/HA will be affected if Graceful Restart is disabled" in err: msg = [""] responses.extend(msg) return responses else: self._module.fail_json(msg=err) elif code: self._module.fail_json(msg=err) responses.extend(resp) return responses def get_diff( self, candidate=None, running=None, diff_match="line", diff_ignore_lines=None, path=None, diff_replace="line", ): conn = self._get_connection() try: response = conn.get_diff( candidate=candidate, running=running, diff_match=diff_match, diff_ignore_lines=diff_ignore_lines, path=path, diff_replace=diff_replace, ) except ConnectionError as exc: self._module.fail_json(msg=to_text(exc, errors="surrogate_then_replace")) return response def get_capabilities(self): """Returns platform info of the remove device""" if hasattr(self._module, "_capabilities"): return self._module._capabilities connection = self._get_connection() try: capabilities = connection.get_capabilities() except ConnectionError as exc: self._module.fail_json(msg=to_text(exc, errors="surrogate_then_replace")) self._module._capabilities = json.loads(capabilities) return self._module._capabilities def read_module_context(self, module_key): connection = self._get_connection() try: module_context = connection.read_module_context(module_key) except ConnectionError as exc: self._module.fail_json(msg=to_text(exc, errors="surrogate_then_replace")) return module_context def save_module_context(self, module_key, module_context): connection = self._get_connection() try: connection.save_module_context(module_key, module_context) except ConnectionError as exc: self._module.fail_json(msg=to_text(exc, errors="surrogate_then_replace")) return None class HttpApi: def __init__(self, module): self._module = module self._device_configs = {} self._module_context = {} self._connection_obj = None @property def _connection(self): if not self._connection_obj: self._connection_obj = Connection(self._module._socket_path) return self._connection_obj def run_commands(self, commands, check_rc=True): """Runs list of commands on remote device and returns results""" try: out = self._connection.send_request(commands) except ConnectionError as exc: if check_rc is True: raise out = to_text(exc) out = to_list(out) if not out[0]: return out for index, response in enumerate(out): if response[0] == "{": out[index] = json.loads(response) return out def get_config(self, flags=None): """Retrieves the current config from the device or cache""" flags = [] if flags is None else flags cmd = "show running-config " cmd += " ".join(flags) cmd = cmd.strip() try: return self._device_configs[cmd] except KeyError: try: out = self._connection.send_request(cmd) except ConnectionError as exc: self._module.fail_json(msg=to_text(exc, errors="surrogate_then_replace")) cfg = to_text(out).strip() self._device_configs[cmd] = cfg return cfg def get_diff( self, candidate=None, running=None, diff_match="line", diff_ignore_lines=None, path=None, diff_replace="line", ): diff = {} # prepare candidate configuration candidate_obj = NetworkConfig(indent=2) candidate_obj.load(candidate) if running and diff_match != "none" and diff_replace != "config": # running configuration running_obj = NetworkConfig(indent=2, contents=running, ignore_lines=diff_ignore_lines) configdiffobjs = candidate_obj.difference( running_obj, path=path, match=diff_match, replace=diff_replace, ) else: configdiffobjs = candidate_obj.items diff["config_diff"] = dumps(configdiffobjs, "commands") if configdiffobjs else "" return diff def load_config(self, commands, return_error=False, opts=None, replace=None): """Sends the ordered set of commands to the device""" if opts is None: opts = {} responses = [] try: resp = self.edit_config(commands, replace=replace) except ConnectionError as exc: code = getattr(exc, "code", 1) message = getattr(exc, "err", exc) err = to_text(message, errors="surrogate_then_replace") if opts.get("ignore_timeout") and code: responses.append(code) return responses elif opts.get("catch_clierror") and "400" in code: return [code, err] elif code and "no graceful-restart" in err: if "ISSU/HA will be affected if Graceful Restart is disabled" in err: msg = [""] responses.extend(msg) return responses else: self._module.fail_json(msg=err) elif code: self._module.fail_json(msg=err) responses.extend(resp) return responses def edit_config(self, candidate=None, commit=True, replace=None, comment=None): resp = list() self.check_edit_config_capability(candidate, commit, replace, comment) if replace: candidate = "config replace {0}".format(replace) responses = self._connection.send_request(candidate, output="config") for response in to_list(responses): if response != "{}": resp.append(response) if not resp: resp = [""] return resp def get_capabilities(self): """Returns platform info of the remove device""" try: capabilities = self._connection.get_capabilities() except ConnectionError as exc: self._module.fail_json(msg=to_text(exc, errors="surrogate_then_replace")) return json.loads(capabilities) def check_edit_config_capability(self, candidate=None, commit=True, replace=None, comment=None): operations = self._connection.get_device_operations() if not candidate and not replace: raise ValueError("must provide a candidate or replace to load configuration") if commit not in (True, False): raise ValueError("'commit' must be a bool, got %s" % commit) if replace and not operations.get("supports_replace"): raise ValueError("configuration replace is not supported") if comment and not operations.get("supports_commit_comment", False): raise ValueError("commit comment is not supported") def read_module_context(self, module_key): try: module_context = self._connection.read_module_context(module_key) except ConnectionError as exc: self._module.fail_json(msg=to_text(exc, errors="surrogate_then_replace")) return module_context def save_module_context(self, module_key, module_context): try: self._connection.save_module_context(module_key, module_context) except ConnectionError as exc: self._module.fail_json(msg=to_text(exc, errors="surrogate_then_replace")) return None class NxosCmdRef: """NXOS Command Reference utilities. The NxosCmdRef class takes a yaml-formatted string of nxos module commands and converts it into dict-formatted database of getters/setters/defaults and associated common and platform-specific values. The utility methods add additional data such as existing states, playbook states, and proposed cli. The utilities also abstract away platform differences such as different defaults and different command syntax. Callers must provide a yaml formatted string that defines each command and its properties; e.g. BFD global: --- _template: # _template holds common settings for all commands # Enable feature bfd if disabled feature: bfd # Common getter syntax for BFD commands get_command: show run bfd all | incl '^(no )*bfd' interval: kind: dict getval: bfd interval (?P<tx>\\d+) min_rx (?P<min_rx>\\d+) multiplier (?P<multiplier>\\d+) setval: bfd interval {tx} min_rx {min_rx} multiplier {multiplier} default: tx: 50 min_rx: 50 multiplier: 3 N3K: # Platform overrides default: tx: 250 min_rx: 250 multiplier: 3 """ def __init__(self, module, cmd_ref_str, ref_only=False): """Initialize cmd_ref from yaml data.""" self._module = module self._check_imports() self._yaml_load(cmd_ref_str) self.cache_existing = None self.present_states = ["present", "merged", "replaced"] self.absent_states = ["absent", "deleted"] ref = self._ref # Create a list of supported commands based on ref keys ref["commands"] = sorted([k for k in ref if not k.startswith("_")]) ref["_proposed"] = [] ref["_context"] = [] ref["_resource_key"] = None if not ref_only: ref["_state"] = module.params.get("state", "present") self.feature_enable() self.get_platform_defaults() self.normalize_defaults() def __getitem__(self, key=None): if key is None: return self._ref return self._ref[key] def _check_imports(self): module = self._module msg = nxosCmdRef_import_check() if msg: module.fail_json(msg=msg) def _yaml_load(self, cmd_ref_str): if PY2: self._ref = yaml.load(cmd_ref_str) elif PY3: self._ref = yaml.load(cmd_ref_str, Loader=yaml.FullLoader) def feature_enable(self): """Add 'feature <foo>' to _proposed if ref includes a 'feature' key.""" ref = self._ref feature = ref["_template"].get("feature") if feature: show_cmd = "show run | incl 'feature {0}'".format(feature) output = self.execute_show_command(show_cmd, "text") if not output or "CLI command error" in output: msg = "** 'feature {0}' is not enabled. Module will auto-enable feature {0} ** ".format( feature, ) self._module.warn(msg) ref["_proposed"].append("feature {0}".format(feature)) ref["_cli_is_feature_disabled"] = ref["_proposed"] def get_platform_shortname(self): """Query device for platform type, normalize to a shortname/nickname. Returns platform shortname (e.g. 'N3K-3058P' returns 'N3K') or None. """ # TBD: add this method logic to get_capabilities() after those methods # are made consistent across transports platform_info = self.execute_show_command("show inventory", "json") if not platform_info or not isinstance(platform_info, dict): return None inventory_table = platform_info["TABLE_inv"]["ROW_inv"] for info in inventory_table: if "Chassis" in info["name"]: network_os_platform = info["productid"] break else: return None # Supported Platforms: N3K,N5K,N6K,N7K,N9K,N3K-F,N9K-F m = re.match("(?P<short>N[35679][K57])-(?P<N35>C35)*", network_os_platform) if not m: return None shortname = m.group("short") # Normalize if m.groupdict().get("N35"): shortname = "N35" elif re.match("N77", shortname): shortname = "N7K" elif re.match(r"N3K|N9K", shortname): for info in inventory_table: if "-R" in info["productid"]: # Fretta Platform shortname += "-F" break return shortname def get_platform_defaults(self): """Update ref with platform specific defaults""" plat = self.get_platform_shortname() if not plat: return ref = self._ref ref["_platform_shortname"] = plat # Remove excluded commands (no platform support for command) for k in ref["commands"]: if plat in ref[k].get("_exclude", ""): ref["commands"].remove(k) # Update platform-specific settings for each item in ref plat_spec_cmds = [k for k in ref["commands"] if plat in ref[k]] for k in plat_spec_cmds: for plat_key in ref[k][plat]: ref[k][plat_key] = ref[k][plat][plat_key] def normalize_defaults(self): """Update ref defaults with normalized data""" ref = self._ref for k in ref["commands"]: if "default" in ref[k] and ref[k]["default"]: kind = ref[k]["kind"] if "int" == kind: ref[k]["default"] = int(ref[k]["default"]) elif "list" == kind: ref[k]["default"] = [str(i) for i in ref[k]["default"]] elif "dict" == kind: for key, v in ref[k]["default"].items(): if v: v = str(v) ref[k]["default"][key] = v def execute_show_command(self, command, format): """Generic show command helper. Warning: 'CLI command error' exceptions are caught, must be handled by caller. Return device output as a newline-separated string or None. """ cmds = [{"command": command, "output": format}] output = None try: output = run_commands(self._module, cmds) if output: output = output[0] except ConnectionError as exc: if "CLI command error" in repr(exc): # CLI may be feature disabled output = repr(exc) else: raise return output def pattern_match_existing(self, output, k): """Pattern matching helper for `get_existing`. `k` is the command name string. Use the pattern from cmd_ref to find a matching string in the output. Return regex match object or None. """ ref = self._ref pattern = re.compile(ref[k]["getval"]) multiple = "multiple" in ref[k].keys() match_lines = [re.search(pattern, line) for line in output] if "dict" == ref[k]["kind"]: match = [m for m in match_lines if m] if not match: return None if len(match) > 1 and not multiple: raise ValueError("get_existing: multiple matches found for property {0}".format(k)) else: match = [m.groups() for m in match_lines if m] if not match: return None if len(match) > 1 and not multiple: raise ValueError("get_existing: multiple matches found for property {0}".format(k)) for item in match: index = match.index(item) match[index] = list(item) # tuple to list # Handle config strings that nvgen with the 'no' prefix. # Example match behavior: # When pattern is: '(no )*foo *(\S+)*$' AND # When output is: 'no foo' -> match: ['no ', None] # When output is: 'foo 50' -> match: [None, '50'] if None is match[index][0]: match[index].pop(0) elif "no" in match[index][0]: match[index].pop(0) if not match: return None return match def set_context(self, context=None): """Update ref with command context.""" if context is None: context = [] ref = self._ref # Process any additional context that this propoerty might require. # 1) Global context from NxosCmdRef _template. # 2) Context passed in using context arg. ref["_context"] = ref["_template"].get("context", []) for cmd in context: ref["_context"].append(cmd) # Last key in context is the resource key ref["_resource_key"] = context[-1] if context else ref["_resource_key"] def get_existing(self, cache_output=None): """Update ref with existing command states from the device. Store these states in each command's 'existing' key. """ ref = self._ref if ref.get("_cli_is_feature_disabled"): # Add context to proposed if state is present if ref["_state"] in self.present_states: [ref["_proposed"].append(ctx) for ctx in ref["_context"]] return show_cmd = ref["_template"]["get_command"] if cache_output: output = cache_output else: output = self.execute_show_command(show_cmd, "text") or [] self.cache_existing = output # Add additional command context if needed. if ref["_context"]: output = CustomNetworkConfig(indent=2, contents=output) output = output.get_section(ref["_context"]) if not output: # Add context to proposed if state is present if ref["_state"] in self.present_states: [ref["_proposed"].append(ctx) for ctx in ref["_context"]] return # We need to remove the last item in context for state absent case. if ref["_state"] in self.absent_states and ref["_context"]: if ref["_resource_key"] and ref["_resource_key"] == ref["_context"][-1]: if ref["_context"][-1] in output: ref["_context"][-1] = "no " + ref["_context"][-1] else: del ref["_context"][-1] return # Walk each cmd in ref, use cmd pattern to discover existing cmds output = output.split("\n") for k in ref["commands"]: match = self.pattern_match_existing(output, k) if not match: continue ref[k]["existing"] = {} for item in match: index = match.index(item) kind = ref[k]["kind"] if "int" == kind: ref[k]["existing"][index] = int(item[0]) elif "list" == kind: ref[k]["existing"][index] = [str(i) for i in item[0]] elif "dict" == kind: # The getval pattern should contain regex named group keys that # match up with the setval named placeholder keys; e.g. # getval: my-cmd (?P<foo>\d+) bar (?P<baz>\d+) # setval: my-cmd {foo} bar {baz} ref[k]["existing"][index] = {} for key in item.groupdict().keys(): ref[k]["existing"][index][key] = str(item.group(key)) elif "str" == kind: ref[k]["existing"][index] = item[0] else: raise ValueError( "get_existing: unknown 'kind' value specified for key '{0}'".format(k), ) def get_playvals(self): """Update ref with values from the playbook. Store these values in each command's 'playval' key. """ ref = self._ref module = self._module params = {} if module.params.get("config"): # Resource module builder packs playvals under 'config' key param_data = module.params.get("config") params["global"] = param_data for key in param_data.keys(): if isinstance(param_data[key], list): params[key] = param_data[key] else: params["global"] = module.params for k in ref.keys(): for level in params.keys(): if isinstance(params[level], dict): params[level] = [params[level]] for item in params[level]: if k in item and item[k] is not None: if not ref[k].get("playval"): ref[k]["playval"] = {} playval = item[k] index = params[level].index(item) # Normalize each value if "int" == ref[k]["kind"]: playval = int(playval) elif "list" == ref[k]["kind"]: playval = [str(i) for i in playval] elif "dict" == ref[k]["kind"]: for key, v in playval.items(): playval[key] = str(v) ref[k]["playval"][index] = playval def build_cmd_set(self, playval, existing, k): """Helper function to create list of commands to configure device Return a list of commands """ ref = self._ref proposed = ref["_proposed"] cmd = None kind = ref[k]["kind"] if "int" == kind: cmd = ref[k]["setval"].format(playval) elif "list" == kind: cmd = ref[k]["setval"].format(*(playval)) elif "dict" == kind: # The setval pattern should contain placeholder keys that # match up with the getval regex named group keys; e.g. # getval: my-cmd (?P<foo>\d+) bar (?P<baz>\d+) # setval: my-cmd {foo} bar {baz} if ref[k]["setval"].startswith("path"): tmplt = "path {name}" if "depth" in playval: tmplt += " depth {depth}" if "query_condition" in playval: tmplt += " query-condition {query_condition}" if "filter_condition" in playval: tmplt += " filter-condition {filter_condition}" cmd = tmplt.format(**playval) else: cmd = ref[k]["setval"].format(**playval) elif "str" == kind: if "deleted" in str(playval): if existing: cmd = "no " + ref[k]["setval"].format(existing) else: cmd = ref[k]["setval"].format(playval) else: raise ValueError("get_proposed: unknown 'kind' value specified for key '{0}'".format(k)) if cmd: if ref["_state"] in self.absent_states and not re.search(r"^no", cmd): cmd = "no " + cmd # Commands may require parent commands for proper context. # Global _template context is replaced by parameter context [proposed.append(ctx) for ctx in ref["_context"]] [proposed.append(ctx) for ctx in ref[k].get("context", [])] proposed.append(cmd) def get_proposed(self): """Compare playbook values against existing states and create a list of proposed commands. Return a list of raw cli command strings. """ ref = self._ref # '_proposed' may be empty list or contain initializations; e.g. ['feature foo'] proposed = ref["_proposed"] if ref["_context"] and ref["_context"][-1].startswith("no"): [proposed.append(ctx) for ctx in ref["_context"]] return proposed # Create a list of commands that have playbook values play_keys = [k for k in ref["commands"] if "playval" in ref[k]] def compare(playval, existing): if ref["_state"] in self.present_states: if existing is None: return False elif str(playval) == str(existing): return True elif isinstance(existing, dict) and playval in existing.values(): return True if ref["_state"] in self.absent_states: if isinstance(existing, dict) and all(x is None for x in existing.values()): existing = None if existing is None or playval not in existing.values(): return True return False # Compare against current state for k in play_keys: playval = ref[k]["playval"] # Create playval copy to avoid RuntimeError # dictionary changed size during iteration error playval_copy = deepcopy(playval) existing = ref[k].get("existing", ref[k]["default"]) multiple = "multiple" in ref[k].keys() # Multiple Instances: if isinstance(existing, dict) and multiple: for ekey, evalue in existing.items(): if isinstance(evalue, dict): # Remove values set to string 'None' from dvalue evalue = dict((k, v) for k, v in evalue.items() if v != "None") for pkey, pvalue in playval.items(): if compare(pvalue, evalue): if playval_copy.get(pkey): del playval_copy[pkey] if not playval_copy: continue # Single Instance: else: for pkey, pval in playval.items(): if compare(pval, existing): if playval_copy.get(pkey): del playval_copy[pkey] if not playval_copy: continue playval = playval_copy # Multiple Instances: if isinstance(existing, dict): for dkey, dvalue in existing.items(): for pval in playval.values(): self.build_cmd_set(pval, dvalue, k) # Single Instance: else: for pval in playval.values(): self.build_cmd_set(pval, existing, k) # Remove any duplicate commands before returning. # pylint: disable=unnecessary-lambda cmds = sorted(set(proposed), key=lambda x: proposed.index(x)) return cmds def nxosCmdRef_import_check(): """Return import error messages or empty string""" msg = "" if not HAS_YAML: msg += "Mandatory python library 'PyYAML' is not present, try 'pip install PyYAML'\n" return msg def is_json(cmd): return to_text(cmd).endswith("| json") def is_text(cmd): return not is_json(cmd) def to_command(module, commands): transform = ComplexList( dict( command=dict(key=True), output=dict(type="str", default="text"), prompt=dict(type="list"), answer=dict(type="list"), newline=dict(type="bool", default=True), sendonly=dict(type="bool", default=False), check_all=dict(type="bool", default=False), ), module, ) commands = transform(to_list(commands)) for item in commands: if is_json(item["command"]): item["output"] = "json" return commands def get_config(module, flags=None): flags = [] if flags is None else flags conn = get_connection(module) return conn.get_config(flags=flags) def run_commands(module, commands, check_rc=True): conn = get_connection(module) return conn.run_commands(to_command(module, commands), check_rc) def load_config(module, config, return_error=False, opts=None, replace=None): conn = get_connection(module) return conn.load_config(config, return_error, opts, replace=replace) def get_capabilities(module): conn = get_connection(module) return conn.get_capabilities() def get_diff( self, candidate=None, running=None, diff_match="line", diff_ignore_lines=None, path=None, diff_replace="line", ): conn = self.get_connection() return conn.get_diff( candidate=candidate, running=running, diff_match=diff_match, diff_ignore_lines=diff_ignore_lines, path=path, diff_replace=diff_replace, ) def normalize_interface(name): """Return the normalized interface name""" if not name: return def _get_number(name): digits = "" for char in name: if char.isdigit() or char in "/.": digits += char return digits if name.lower().startswith("et"): if_type = "Ethernet" elif name.lower().startswith("vl"): if_type = "Vlan" elif name.lower().startswith("lo"): if_type = "loopback" elif name.lower().startswith("po"): if_type = "port-channel" elif name.lower().startswith("nv"): if_type = "nve" else: if_type = None number_list = name.split(" ") if len(number_list) == 2: number = number_list[-1].strip() else: number = _get_number(name) if if_type: proper_interface = if_type + number else: proper_interface = name return proper_interface def get_interface_type(interface): """Gets the type of interface""" if interface.upper().startswith("ET"): return "ethernet" elif interface.upper().startswith("VL"): return "svi" elif interface.upper().startswith("LO"): return "loopback" elif interface.upper().startswith("MG"): return "management" elif interface.upper().startswith("MA"): return "management" elif interface.upper().startswith("PO"): return "portchannel" elif interface.upper().startswith("NV"): return "nve" else: return "unknown" def default_intf_enabled(name="", sysdefs=None, mode=None): """Get device/version/interface-specific default 'enabled' state. L3: - Most L3 intfs default to 'shutdown'. Loopbacks default to 'no shutdown'. - Some legacy platforms default L3 intfs to 'no shutdown'. L2: - User-System-Default 'system default switchport shutdown' defines the enabled state for L2 intf's. USD defaults may be different on some platforms. - An intf may be explicitly defined as L2 with 'switchport' or it may be implicitly defined as L2 when USD 'system default switchport' is defined. """ if not name: return None if sysdefs is None: sysdefs = {} default = False if re.search("port-channel|loopback", name): default = True elif re.search("Vlan", name): default = False else: if mode is None: # intf 'switchport' cli is not present so use the user-system-default mode = sysdefs.get("mode") if mode == "layer3": default = sysdefs.get("L3_enabled") elif mode == "layer2": default = sysdefs.get("L2_enabled") return default def read_module_context(module): conn = get_connection(module) return conn.read_module_context(module._name) def save_module_context(module, module_context): conn = get_connection(module) return conn.save_module_context(module._name, module_context)