Server IP : 85.214.239.14 / Your IP : 18.222.96.135 Web Server : Apache/2.4.62 (Debian) System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64 User : www-data ( 33) PHP Version : 7.4.18 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare, MySQL : OFF | cURL : OFF | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : OFF Directory : /lib/python3/dist-packages/ansible_collections/vyos/vyos/plugins/cliconf/ |
Upload File : |
# (c) 2017 Red Hat Inc. # # This file is part of Ansible # # Ansible is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # Ansible is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with Ansible. If not, see <http://www.gnu.org/licenses/>. # from __future__ import absolute_import, division, print_function __metaclass__ = type DOCUMENTATION = """ author: Ansible Networking Team (@ansible-network) name: vyos short_description: Use vyos cliconf to run command on VyOS platform description: - This vyos plugin provides low level abstraction apis for sending and receiving CLI commands from VyOS network devices. version_added: 1.0.0 options: config_commands: description: - Specifies a list of commands that can make configuration changes to the target device. - When `ansible_network_single_user_mode` is enabled, if a command sent to the device is present in this list, the existing cache is invalidated. version_added: 2.0.0 type: list elements: str default: [] vars: - name: ansible_vyos_config_commands """ import json import re from ansible.errors import AnsibleConnectionFailure from ansible.module_utils._text import to_text from ansible.module_utils.common._collections_compat import Mapping from ansible_collections.ansible.netcommon.plugins.module_utils.network.common.config import ( NetworkConfig, ) from ansible_collections.ansible.netcommon.plugins.module_utils.network.common.utils import to_list from ansible_collections.ansible.netcommon.plugins.plugin_utils.cliconf_base import CliconfBase class Cliconf(CliconfBase): __rpc__ = CliconfBase.__rpc__ + [ "commit", "discard_changes", "get_diff", "run_commands", ] def __init__(self, *args, **kwargs): super(Cliconf, self).__init__(*args, **kwargs) self._device_info = {} def get_device_info(self): if not self._device_info: device_info = {} device_info["network_os"] = "vyos" reply = self.get("show version") data = to_text(reply, errors="surrogate_or_strict").strip() match = re.search(r"Version:\s*(.*)", data) if match: device_info["network_os_version"] = match.group(1) match = re.search(r"(?:HW|Hardware) model:\s*(\S+)", data) if match: device_info["network_os_model"] = match.group(1) reply = self.get("show host name") device_info["network_os_hostname"] = to_text( reply, errors="surrogate_or_strict" ).strip() self._device_info = device_info return self._device_info def get_config(self, flags=None, format=None): if format: option_values = self.get_option_values() if format not in option_values["format"]: raise ValueError( "'format' value %s is invalid. Valid values of format are %s" % (format, ", ".join(option_values["format"])) ) if not flags: flags = [] if format == "text": command = "show configuration" else: command = "show configuration commands" command += " ".join(to_list(flags)) command = command.strip() out = self.send_command(command) return out def edit_config(self, candidate=None, commit=True, replace=None, comment=None): resp = {} operations = self.get_device_operations() self.check_edit_config_capability(operations, candidate, commit, replace, comment) results = [] requests = [] self.send_command("configure") for cmd in to_list(candidate): if not isinstance(cmd, Mapping): cmd = {"command": cmd} results.append(self.send_command(**cmd)) requests.append(cmd["command"]) out = self.get("compare") out = to_text(out, errors="surrogate_or_strict") diff_config = out if not out.startswith("No changes") else None if diff_config: if commit: try: self.commit(comment) except AnsibleConnectionFailure as e: msg = "commit failed: %s" % e.message self.discard_changes() raise AnsibleConnectionFailure(msg) else: self.send_command("exit") else: self.discard_changes() else: self.send_command("exit") if ( to_text(self._connection.get_prompt(), errors="surrogate_or_strict") .strip() .endswith("#") ): self.discard_changes() if diff_config: resp["diff"] = diff_config resp["response"] = results resp["request"] = requests return resp def get( self, command=None, prompt=None, answer=None, sendonly=False, newline=True, output=None, check_all=False, ): if not command: raise ValueError("must provide value of command to execute") if output: raise ValueError("'output' value %s is not supported for get" % output) return self.send_command( command=command, prompt=prompt, answer=answer, sendonly=sendonly, newline=newline, check_all=check_all, ) def commit(self, comment=None): if comment: command = 'commit comment "{0}"'.format(comment) else: command = "commit" self.send_command(command) def discard_changes(self): self.send_command("exit discard") def get_diff( self, candidate=None, running=None, diff_match="line", diff_ignore_lines=None, path=None, diff_replace=None, ): diff = {} device_operations = self.get_device_operations() option_values = self.get_option_values() if candidate is None and device_operations["supports_generate_diff"]: raise ValueError("candidate configuration is required to generate diff") if diff_match not in option_values["diff_match"]: raise ValueError( "'match' value %s in invalid, valid values are %s" % (diff_match, ", ".join(option_values["diff_match"])) ) if diff_replace: raise ValueError("'replace' in diff is not supported") if diff_ignore_lines: raise ValueError("'diff_ignore_lines' in diff is not supported") if path: raise ValueError("'path' in diff is not supported") set_format = candidate.startswith("set") or candidate.startswith("delete") candidate_obj = NetworkConfig(indent=4, contents=candidate) if not set_format: config = [c.line for c in candidate_obj.items] commands = list() # this filters out less specific lines for item in config: for index, entry in enumerate(commands): if item.startswith(entry): del commands[index] break commands.append(item) candidate_commands = ["set %s" % cmd.replace(" {", "") for cmd in commands] else: candidate_commands = str(candidate).strip().split("\n") if diff_match == "none": diff["config_diff"] = list(candidate_commands) return diff running_commands = [str(c).replace("'", "") for c in running.splitlines()] updates = list() visited = set() for line in candidate_commands: item = str(line).replace("'", "") if not item.startswith("set") and not item.startswith("delete"): raise ValueError("line must start with either `set` or `delete`") elif item.startswith("set") and item not in running_commands: updates.append(line) elif item.startswith("delete"): if not running_commands: updates.append(line) else: item = re.sub(r"delete", "set", item) for entry in running_commands: if entry.startswith(item) and line not in visited: updates.append(line) visited.add(line) diff["config_diff"] = list(updates) return diff def run_commands(self, commands=None, check_rc=True): if commands is None: raise ValueError("'commands' value is required") responses = list() for cmd in to_list(commands): if not isinstance(cmd, Mapping): cmd = {"command": cmd} output = cmd.pop("output", None) if output: raise ValueError("'output' value %s is not supported for run_commands" % output) try: out = self.send_command(**cmd) except AnsibleConnectionFailure as e: if check_rc: raise out = getattr(e, "err", e) responses.append(out) return responses def get_device_operations(self): return { "supports_diff_replace": False, "supports_commit": True, "supports_rollback": False, "supports_defaults": False, "supports_onbox_diff": True, "supports_commit_comment": True, "supports_multiline_delimiter": False, "supports_diff_match": True, "supports_diff_ignore_lines": False, "supports_generate_diff": False, "supports_replace": False, } def get_option_values(self): return { "format": ["text", "set"], "diff_match": ["line", "none"], "diff_replace": [], "output": [], } def get_capabilities(self): result = super(Cliconf, self).get_capabilities() result["device_operations"] = self.get_device_operations() result.update(self.get_option_values()) return json.dumps(result) def set_cli_prompt_context(self): """ Make sure we are in the operational cli mode :return: None """ if self._connection.connected: self._update_cli_prompt_context(config_context="#", exit_command="exit discard")