Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 18.191.174.125
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /proc/2/cwd/lib/python3/dist-packages/ansible_collections/ansible/utils/plugins/action/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /proc/2/cwd/lib/python3/dist-packages/ansible_collections/ansible/utils/plugins/action/cli_parse.py
# -*- coding: utf-8 -*-
# Copyright 2020 Red Hat
# GNU General Public License v3.0+
# (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)

"""
The action plugin file for cli_parse
"""
from __future__ import absolute_import, division, print_function


__metaclass__ = type

import json

from importlib import import_module

from ansible.errors import AnsibleActionFail
from ansible.module_utils._text import to_native, to_text
from ansible.module_utils.connection import Connection
from ansible.module_utils.connection import ConnectionError as AnsibleConnectionError
from ansible.plugins.action import ActionBase

from ansible_collections.ansible.utils.plugins.module_utils.common.argspec_validate import (
    check_argspec,
)
from ansible_collections.ansible.utils.plugins.modules.cli_parse import DOCUMENTATION


ARGSPEC_CONDITIONALS = {
    "argument_spec": {"parser": {"mutually_exclusive": [["command", "template_path"]]}},
    "required_one_of": [["command", "text"]],
    "mutually_exclusive": [["command", "text"]],
}


class ActionModule(ActionBase):
    """action module"""

    PARSER_CLS_NAME = "CliParser"

    def __init__(self, *args, **kwargs):
        super(ActionModule, self).__init__(*args, **kwargs)
        self._playhost = None
        self._parser_name = None
        self._result = {}
        self._task_vars = None

    def _debug(self, msg):
        """Output text using ansible's display

        :param msg: The message
        :type msg: str
        """
        msg = "<{phost}> [cli_parse] {msg}".format(phost=self._playhost, msg=msg)
        self._display.vvvv(msg)

    def _fail_json(self, msg):
        """Replace the AnsibleModule fai_json here

        :param msg: The message for the failure
        :type msg: str
        """
        msg = msg.replace("(basic.py)", self._task.action)
        raise AnsibleActionFail(msg)

    def _extended_check_argspec(self):
        """Check additional requirements for the argspec
        that cannot be covered using stnd techniques
        """
        errors = []
        requested_parser = self._task.args.get("parser").get("name")
        if len(requested_parser.split(".")) != 3:
            msg = "Parser name should be provided as a full name including collection"
            errors.append(msg)

        if self._task.args.get("text") and requested_parser not in [
            "ansible.utils.json",
            "ansible.utils.xml",
        ]:
            if not (
                self._task.args.get("parser").get("command")
                or self._task.args.get("parser").get("template_path")
            ):
                msg = "Either parser/command or parser/template_path needs to be provided when parsing text."
                errors.append(msg)
        if errors:
            self._result["failed"] = True
            self._result["msg"] = " ".join(errors)

    def _load_parser(self, task_vars):
        """Load a parser from the fs

        :param task_vars: The vars provided when the task was run
        :type task_vars: dict
        :return: An instance of class CliParser
        :rtype: CliParser
        """
        requested_parser = self._task.args.get("parser").get("name")
        cref = dict(zip(["corg", "cname", "plugin"], requested_parser.split(".")))
        if cref["cname"] == "netcommon" and cref["plugin"] in [
            "json",
            "textfsm",
            "ttp",
            "xml",
        ]:
            cref["cname"] = "utils"
            msg = (
                "Use 'ansible.utils.{plugin}' for parser name instead of '{requested_parser}'."
                " This feature will be removed from 'ansible.netcommon' collection in a release"
                " after 2022-11-01".format(plugin=cref["plugin"], requested_parser=requested_parser)
            )
            self._display.warning(msg)

        parserlib = "ansible_collections.{corg}.{cname}.plugins.sub_plugins.cli_parser.{plugin}_parser".format(
            **cref
        )
        try:
            parsercls = getattr(import_module(parserlib), self.PARSER_CLS_NAME)
            parser = parsercls(
                task_args=self._task.args,
                task_vars=task_vars,
                debug=self._debug,
            )
            return parser
        except Exception as exc:
            # TODO: The condition is added to support old sub-plugin structure.
            # Remove the if condition after ansible.netcommon.cli_parse module is removed
            # from ansible.netcommon collection
            if cref["cname"] == "netcommon" and cref["plugin"] in [
                "native",
                "content_templates",
                "ntc",
                "pyats",
            ]:
                parserlib = (
                    "ansible_collections.{corg}.{cname}.plugins.cli_parsers.{plugin}_parser".format(
                        **cref
                    )
                )
                try:
                    parsercls = getattr(import_module(parserlib), self.PARSER_CLS_NAME)
                    parser = parsercls(
                        task_args=self._task.args,
                        task_vars=task_vars,
                        debug=self._debug,
                    )
                    return parser
                except Exception as exc:
                    self._result["failed"] = True
                    self._result["msg"] = "Error loading parser: {err}".format(err=to_native(exc))
                    return None

            self._result["failed"] = True
            self._result["msg"] = "Error loading parser: {err}".format(err=to_native(exc))
            return None

    def _set_parser_command(self):
        """Set the /parser/command in the task args based on /command if needed"""
        if self._task.args.get("command"):
            if not self._task.args.get("parser").get("command"):
                self._task.args.get("parser")["command"] = self._task.args.get("command")

    def _set_text(self):
        """Set the /text in the task_args based on the command run"""
        if self._result.get("stdout"):
            self._task.args["text"] = self._result["stdout"]

    def _os_from_task_vars(self):
        """Extract an os str from the task's vars

        :return: A short OS name
        :rtype: str
        """
        os_vars = ["ansible_distribution", "ansible_network_os"]
        oper_sys = ""
        for hvar in os_vars:
            if self._task_vars.get(hvar):
                if hvar == "ansible_network_os":
                    oper_sys = self._task_vars.get(hvar, "").split(".")[-1]
                    self._debug(
                        "OS set to {os}, derived from ansible_network_os".format(
                            os=oper_sys.lower(),
                        ),
                    )
                else:
                    oper_sys = self._task_vars.get(hvar)
                    self._debug("OS set to {os}, using {key}".format(os=oper_sys.lower(), key=hvar))
        return oper_sys.lower()

    def _update_template_path(self, template_extension):
        """Update the template_path in the task args
        If not provided, generate template name using os and command

        :param template_extension: The parser specific template extension
        :type template extension: str
        """
        if not self._task.args.get("parser").get("template_path"):
            if self._task.args.get("parser").get("os"):
                oper_sys = self._task.args.get("parser").get("os")
            else:
                oper_sys = self._os_from_task_vars()
            cmd_as_fname = self._task.args.get("parser").get("command").replace(" ", "_")
            fname = "{os}_{cmd}.{ext}".format(os=oper_sys, cmd=cmd_as_fname, ext=template_extension)
            source = self._find_needle("templates", fname)
            self._debug("template_path in task args updated to {source}".format(source=source))
            self._task.args["parser"]["template_path"] = source

    def _get_template_contents(self):
        """Retrieve the contents of the parser template

        :return: The parser's contents
        :rtype: str
        """
        template_contents = None
        template_path = self._task.args.get("parser").get("template_path")
        if template_path:
            try:
                with open(template_path, "rb") as file_handler:
                    try:
                        template_contents = to_text(
                            file_handler.read(),
                            errors="surrogate_or_strict",
                        )
                    except UnicodeError:
                        raise AnsibleActionFail("Template source files must be utf-8 encoded")
            except FileNotFoundError as exc:
                raise AnsibleActionFail(
                    "Failed to open template '{tpath}'. Error: {err}".format(
                        tpath=template_path,
                        err=to_native(exc),
                    ),
                )
        return template_contents

    def _prune_result(self):
        """In the case of an error, remove stdout and stdout_lines
        this allows for easier visibility of the error message.
        In the case of an actual command error, it will be thrown
        in the module
        """
        self._result.pop("stdout", None)
        self._result.pop("stdout_lines", None)

    def _run_command(self):
        """Run a command on the host
        If socket_path exists, assume it's a network device
        else, run a low level command
        """
        command = self._task.args.get("command")
        if command:
            socket_path = self._connection.socket_path
            if socket_path:
                connection = Connection(socket_path)
                try:
                    response = connection.get(command=command)
                    self._result["stdout"] = response
                    self._result["stdout_lines"] = response.splitlines()
                except AnsibleConnectionError as exc:
                    self._result["failed"] = True
                    self._result["msg"] = [to_text(exc)]
            else:
                result = self._low_level_execute_command(cmd=command)
                if result["rc"]:
                    self._result["failed"] = True
                    self._result["msg"] = result["stderr"]
                self._result["stdout"] = result["stdout"]
                self._result["stdout_lines"] = result["stdout_lines"]

    def run(self, tmp=None, task_vars=None):
        """The std execution entry pt for an action plugin

        :param tmp: no longer used
        :type tmp: none
        :param task_vars: The vars provided when the task is run
        :type task_vars: dict
        :return: The results from the parser
        :rtype: dict
        """
        valid, argspec_result, updated_params = check_argspec(
            DOCUMENTATION,
            "cli_parse module",
            schema_conditionals=ARGSPEC_CONDITIONALS,
            **self._task.args
        )
        if not valid:
            return argspec_result

        self._extended_check_argspec()
        if self._result.get("failed"):
            return self._result

        self._task_vars = task_vars
        self._playhost = task_vars.get("inventory_hostname")
        self._parser_name = self._task.args.get("parser").get("name")

        self._run_command()
        if self._result.get("failed"):
            return self._result

        self._set_parser_command()
        self._set_text()

        parser = self._load_parser(task_vars)
        if self._result.get("failed"):
            self._prune_result()
            return self._result

        # Not all parsers use a template, in the case a parser provides
        # an extension, provide it the template path
        if getattr(parser, "DEFAULT_TEMPLATE_EXTENSION", False):
            self._update_template_path(parser.DEFAULT_TEMPLATE_EXTENSION)

        # Not all parsers require the template contents
        # when true, provide the template contents
        if getattr(parser, "PROVIDE_TEMPLATE_CONTENTS", False) is True:
            template_contents = self._get_template_contents()
        else:
            template_contents = None

        try:
            result = parser.parse(template_contents=template_contents)
            # ensure the response returned to the controller
            # contains only native types, nothing unique to the parser
            result = json.loads(json.dumps(result))
        except Exception as exc:
            raise AnsibleActionFail(
                "Unhandled exception from parser '{parser}'. Error: {err}".format(
                    parser=self._parser_name,
                    err=to_native(exc),
                ),
            )

        if result.get("errors"):
            self._prune_result()
            self._result.update({"failed": True, "msg": " ".join(result["errors"])})
        else:
            self._result["parsed"] = result["parsed"]
            set_fact = self._task.args.get("set_fact")
            if set_fact:
                self._result["ansible_facts"] = {set_fact: result["parsed"]}
        return self._result

Anon7 - 2022
AnonSec Team