Server IP : 85.214.239.14 / Your IP : 18.191.174.125 Web Server : Apache/2.4.62 (Debian) System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64 User : www-data ( 33) PHP Version : 7.4.18 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare, MySQL : OFF | cURL : OFF | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : OFF Directory : /proc/2/cwd/lib/python3/dist-packages/ansible_collections/ansible/utils/plugins/action/ |
Upload File : |
# -*- coding: utf-8 -*- # Copyright 2020 Red Hat # GNU General Public License v3.0+ # (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) """ The action plugin file for cli_parse """ from __future__ import absolute_import, division, print_function __metaclass__ = type import json from importlib import import_module from ansible.errors import AnsibleActionFail from ansible.module_utils._text import to_native, to_text from ansible.module_utils.connection import Connection from ansible.module_utils.connection import ConnectionError as AnsibleConnectionError from ansible.plugins.action import ActionBase from ansible_collections.ansible.utils.plugins.module_utils.common.argspec_validate import ( check_argspec, ) from ansible_collections.ansible.utils.plugins.modules.cli_parse import DOCUMENTATION ARGSPEC_CONDITIONALS = { "argument_spec": {"parser": {"mutually_exclusive": [["command", "template_path"]]}}, "required_one_of": [["command", "text"]], "mutually_exclusive": [["command", "text"]], } class ActionModule(ActionBase): """action module""" PARSER_CLS_NAME = "CliParser" def __init__(self, *args, **kwargs): super(ActionModule, self).__init__(*args, **kwargs) self._playhost = None self._parser_name = None self._result = {} self._task_vars = None def _debug(self, msg): """Output text using ansible's display :param msg: The message :type msg: str """ msg = "<{phost}> [cli_parse] {msg}".format(phost=self._playhost, msg=msg) self._display.vvvv(msg) def _fail_json(self, msg): """Replace the AnsibleModule fai_json here :param msg: The message for the failure :type msg: str """ msg = msg.replace("(basic.py)", self._task.action) raise AnsibleActionFail(msg) def _extended_check_argspec(self): """Check additional requirements for the argspec that cannot be covered using stnd techniques """ errors = [] requested_parser = self._task.args.get("parser").get("name") if len(requested_parser.split(".")) != 3: msg = "Parser name should be provided as a full name including collection" errors.append(msg) if self._task.args.get("text") and requested_parser not in [ "ansible.utils.json", "ansible.utils.xml", ]: if not ( self._task.args.get("parser").get("command") or self._task.args.get("parser").get("template_path") ): msg = "Either parser/command or parser/template_path needs to be provided when parsing text." errors.append(msg) if errors: self._result["failed"] = True self._result["msg"] = " ".join(errors) def _load_parser(self, task_vars): """Load a parser from the fs :param task_vars: The vars provided when the task was run :type task_vars: dict :return: An instance of class CliParser :rtype: CliParser """ requested_parser = self._task.args.get("parser").get("name") cref = dict(zip(["corg", "cname", "plugin"], requested_parser.split("."))) if cref["cname"] == "netcommon" and cref["plugin"] in [ "json", "textfsm", "ttp", "xml", ]: cref["cname"] = "utils" msg = ( "Use 'ansible.utils.{plugin}' for parser name instead of '{requested_parser}'." " This feature will be removed from 'ansible.netcommon' collection in a release" " after 2022-11-01".format(plugin=cref["plugin"], requested_parser=requested_parser) ) self._display.warning(msg) parserlib = "ansible_collections.{corg}.{cname}.plugins.sub_plugins.cli_parser.{plugin}_parser".format( **cref ) try: parsercls = getattr(import_module(parserlib), self.PARSER_CLS_NAME) parser = parsercls( task_args=self._task.args, task_vars=task_vars, debug=self._debug, ) return parser except Exception as exc: # TODO: The condition is added to support old sub-plugin structure. # Remove the if condition after ansible.netcommon.cli_parse module is removed # from ansible.netcommon collection if cref["cname"] == "netcommon" and cref["plugin"] in [ "native", "content_templates", "ntc", "pyats", ]: parserlib = ( "ansible_collections.{corg}.{cname}.plugins.cli_parsers.{plugin}_parser".format( **cref ) ) try: parsercls = getattr(import_module(parserlib), self.PARSER_CLS_NAME) parser = parsercls( task_args=self._task.args, task_vars=task_vars, debug=self._debug, ) return parser except Exception as exc: self._result["failed"] = True self._result["msg"] = "Error loading parser: {err}".format(err=to_native(exc)) return None self._result["failed"] = True self._result["msg"] = "Error loading parser: {err}".format(err=to_native(exc)) return None def _set_parser_command(self): """Set the /parser/command in the task args based on /command if needed""" if self._task.args.get("command"): if not self._task.args.get("parser").get("command"): self._task.args.get("parser")["command"] = self._task.args.get("command") def _set_text(self): """Set the /text in the task_args based on the command run""" if self._result.get("stdout"): self._task.args["text"] = self._result["stdout"] def _os_from_task_vars(self): """Extract an os str from the task's vars :return: A short OS name :rtype: str """ os_vars = ["ansible_distribution", "ansible_network_os"] oper_sys = "" for hvar in os_vars: if self._task_vars.get(hvar): if hvar == "ansible_network_os": oper_sys = self._task_vars.get(hvar, "").split(".")[-1] self._debug( "OS set to {os}, derived from ansible_network_os".format( os=oper_sys.lower(), ), ) else: oper_sys = self._task_vars.get(hvar) self._debug("OS set to {os}, using {key}".format(os=oper_sys.lower(), key=hvar)) return oper_sys.lower() def _update_template_path(self, template_extension): """Update the template_path in the task args If not provided, generate template name using os and command :param template_extension: The parser specific template extension :type template extension: str """ if not self._task.args.get("parser").get("template_path"): if self._task.args.get("parser").get("os"): oper_sys = self._task.args.get("parser").get("os") else: oper_sys = self._os_from_task_vars() cmd_as_fname = self._task.args.get("parser").get("command").replace(" ", "_") fname = "{os}_{cmd}.{ext}".format(os=oper_sys, cmd=cmd_as_fname, ext=template_extension) source = self._find_needle("templates", fname) self._debug("template_path in task args updated to {source}".format(source=source)) self._task.args["parser"]["template_path"] = source def _get_template_contents(self): """Retrieve the contents of the parser template :return: The parser's contents :rtype: str """ template_contents = None template_path = self._task.args.get("parser").get("template_path") if template_path: try: with open(template_path, "rb") as file_handler: try: template_contents = to_text( file_handler.read(), errors="surrogate_or_strict", ) except UnicodeError: raise AnsibleActionFail("Template source files must be utf-8 encoded") except FileNotFoundError as exc: raise AnsibleActionFail( "Failed to open template '{tpath}'. Error: {err}".format( tpath=template_path, err=to_native(exc), ), ) return template_contents def _prune_result(self): """In the case of an error, remove stdout and stdout_lines this allows for easier visibility of the error message. In the case of an actual command error, it will be thrown in the module """ self._result.pop("stdout", None) self._result.pop("stdout_lines", None) def _run_command(self): """Run a command on the host If socket_path exists, assume it's a network device else, run a low level command """ command = self._task.args.get("command") if command: socket_path = self._connection.socket_path if socket_path: connection = Connection(socket_path) try: response = connection.get(command=command) self._result["stdout"] = response self._result["stdout_lines"] = response.splitlines() except AnsibleConnectionError as exc: self._result["failed"] = True self._result["msg"] = [to_text(exc)] else: result = self._low_level_execute_command(cmd=command) if result["rc"]: self._result["failed"] = True self._result["msg"] = result["stderr"] self._result["stdout"] = result["stdout"] self._result["stdout_lines"] = result["stdout_lines"] def run(self, tmp=None, task_vars=None): """The std execution entry pt for an action plugin :param tmp: no longer used :type tmp: none :param task_vars: The vars provided when the task is run :type task_vars: dict :return: The results from the parser :rtype: dict """ valid, argspec_result, updated_params = check_argspec( DOCUMENTATION, "cli_parse module", schema_conditionals=ARGSPEC_CONDITIONALS, **self._task.args ) if not valid: return argspec_result self._extended_check_argspec() if self._result.get("failed"): return self._result self._task_vars = task_vars self._playhost = task_vars.get("inventory_hostname") self._parser_name = self._task.args.get("parser").get("name") self._run_command() if self._result.get("failed"): return self._result self._set_parser_command() self._set_text() parser = self._load_parser(task_vars) if self._result.get("failed"): self._prune_result() return self._result # Not all parsers use a template, in the case a parser provides # an extension, provide it the template path if getattr(parser, "DEFAULT_TEMPLATE_EXTENSION", False): self._update_template_path(parser.DEFAULT_TEMPLATE_EXTENSION) # Not all parsers require the template contents # when true, provide the template contents if getattr(parser, "PROVIDE_TEMPLATE_CONTENTS", False) is True: template_contents = self._get_template_contents() else: template_contents = None try: result = parser.parse(template_contents=template_contents) # ensure the response returned to the controller # contains only native types, nothing unique to the parser result = json.loads(json.dumps(result)) except Exception as exc: raise AnsibleActionFail( "Unhandled exception from parser '{parser}'. Error: {err}".format( parser=self._parser_name, err=to_native(exc), ), ) if result.get("errors"): self._prune_result() self._result.update({"failed": True, "msg": " ".join(result["errors"])}) else: self._result["parsed"] = result["parsed"] set_fact = self._task.args.get("set_fact") if set_fact: self._result["ansible_facts"] = {set_fact: result["parsed"]} return self._result