Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 3.139.81.114
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /usr/lib/python3/dist-packages/ansible_collections/cisco/nxos/plugins/httpapi/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /usr/lib/python3/dist-packages/ansible_collections/cisco/nxos/plugins/httpapi//nxos.py
# (c) 2018 Red Hat Inc.
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)

from __future__ import absolute_import, division, print_function


__metaclass__ = type

DOCUMENTATION = """
author: Ansible Networking Team (@ansible-network)
name: nxos
short_description: Use NX-API to run commands on Cisco NX-OS platform
description:
- This plugin provides low level abstraction APIs for sending and receiving
  commands using NX-API with devices running Cisco NX-OS.
version_added: 1.0.0
"""

import collections
import json
import re

from ansible.module_utils._text import to_text
from ansible.module_utils.connection import ConnectionError
from ansible_collections.ansible.netcommon.plugins.module_utils.network.common.utils import to_list
from ansible_collections.ansible.netcommon.plugins.plugin_utils.httpapi_base import HttpApiBase


OPTIONS = {
    "format": ["text", "json"],
    "diff_match": ["line", "strict", "exact", "none"],
    "diff_replace": ["line", "block", "config"],
    "output": ["text", "json"],
}


class HttpApi(HttpApiBase):
    def __init__(self, *args, **kwargs):
        super(HttpApi, self).__init__(*args, **kwargs)
        self._device_info = None
        self._module_context = {}

    def read_module_context(self, module_key):
        if self._module_context.get(module_key):
            return self._module_context[module_key]

        return None

    def save_module_context(self, module_key, module_context):
        self._module_context[module_key] = module_context

        return None

    def send_request(self, data, **message_kwargs):
        output = None
        queue = list()
        responses = list()

        for item in to_list(data):
            cmd_output = message_kwargs.get("output") or "text"
            if isinstance(item, dict):
                command = item["command"]
                if "output" in item:
                    cmd_output = item["output"]
            else:
                command = item

            # Emulate '| json' from CLI
            if command.endswith("| json"):
                command = command.rsplit("|", 1)[0]
                cmd_output = "json"

            if output and output != cmd_output:
                responses.extend(self._run_queue(queue, output))
                queue = list()

            output = cmd_output
            queue.append(command)

        if queue:
            responses.extend(self._run_queue(queue, output))

        if len(responses) == 1:
            return responses[0]
        return responses

    def _run_queue(self, queue, output):
        if self._become:
            self.connection.queue_message(
                "warning",
                "become has no effect over httpapi. Use network_cli if privilege escalation is required",
            )

        request = request_builder(queue, output)
        headers = {"Content-Type": "application/json"}

        response, response_data = self.connection.send(
            "/ins",
            request,
            headers=headers,
            method="POST",
        )

        try:
            response_data = json.loads(to_text(response_data.getvalue()))
        except ValueError:
            raise ConnectionError(
                "Response was not valid JSON, got {0}".format(to_text(response_data.getvalue())),
            )

        results = handle_response(response_data)
        return results

    def get_device_info(self):
        if self._device_info:
            return self._device_info

        device_info = {}

        device_info["network_os"] = "nxos"
        reply, platform_reply = self.send_request(["show version", "show inventory"])

        find_os_version = [
            r"\s+system:\s+version\s*(\S+)",
            r"\s+kickstart:\s+version\s*(\S+)",
            r"\s+NXOS:\s+version\s*(\S+)",
        ]
        for regex in find_os_version:
            match_ver = re.search(regex, reply, re.M)
            if match_ver:
                device_info["network_os_version"] = match_ver.group(1)
                break

        match_chassis_id = re.search(r"Hardware\n\s+cisco\s*(\S+\s+\S+)", reply, re.M)
        if match_chassis_id:
            device_info["network_os_model"] = match_chassis_id.group(1)

        match_host_name = re.search(r"\s+Device name:\s*(\S+)", reply, re.M)
        if match_host_name:
            device_info["network_os_hostname"] = match_host_name.group(1)

        find_os_image = [
            r"\s+system image file is:\s*(\S+)",
            r"\s+kickstart image file is:\s*(\S+)",
            r"\s+NXOS image file is:\s*(\S+)",
        ]
        for regex in find_os_image:
            match_file_name = re.search(regex, reply, re.M)
            if match_file_name:
                device_info["network_os_image"] = match_file_name.group(1)
                break

        match_os_platform = re.search(
            r'NAME: (?:"Chassis"| Chassis ),\s*DESCR:.*\nPID:\s*(\S+)',
            platform_reply,
            re.M,
        )
        if match_os_platform:
            device_info["network_os_platform"] = match_os_platform.group(1)

        self._device_info = device_info
        return self._device_info

    def get_device_operations(self):
        platform = self.get_device_info().get("network_os_platform", "")
        return {
            "supports_diff_replace": True,
            "supports_commit": False,
            "supports_rollback": False,
            "supports_defaults": True,
            "supports_onbox_diff": False,
            "supports_commit_comment": False,
            "supports_multiline_delimiter": False,
            "supports_diff_match": True,
            "supports_diff_ignore_lines": True,
            "supports_generate_diff": True,
            "supports_replace": True if "9K" in platform else False,
        }

    def get_capabilities(self):
        result = {}
        result["rpc"] = []
        result["device_info"] = self.get_device_info()
        result["device_operations"] = self.get_device_operations()
        result.update(OPTIONS)
        result["network_api"] = "nxapi"

        return json.dumps(result)

    # Shims for resource module support
    def get(self, command, output=None):
        # This method is ONLY here to support resource modules. Therefore most
        # arguments are unsupported and not present.

        return self.send_request(data=command, output=output)

    def edit_config(self, candidate):
        # This method is ONLY here to support resource modules. Therefore most
        # arguments are unsupported and not present.

        responses = self.send_request(candidate, output="config")
        return [resp for resp in to_list(responses) if resp != "{}"]


def handle_response(response):
    results = []

    if response["ins_api"].get("outputs"):
        for output in to_list(response["ins_api"]["outputs"]["output"]):
            if output["code"] != "200":
                # Best effort messages: some API output keys may not exist on some platforms
                input_data = output.get("input", "")
                msg = output.get("msg", "")
                clierror = output.get("clierror", "")
                raise ConnectionError(
                    "%s: %s: %s" % (input_data, msg, clierror),
                    code=output["code"],
                )
            elif "body" in output:
                result = output["body"]
                if isinstance(result, dict):
                    result = json.dumps(result)

                results.append(result.strip())

    return results


def request_builder(commands, output, version="1.0", chunk="0", sid=None):
    """Encodes a NXAPI JSON request message"""
    output_to_command_type = {
        "text": "cli_show_ascii",
        "json": "cli_show",
        "bash": "bash",
        "config": "cli_conf",
    }

    maybe_output = commands[0].split("|")[-1].strip()
    if maybe_output in output_to_command_type:
        command_type = output_to_command_type[maybe_output]
        commands = [command.split("|")[0].strip() for command in commands]
    else:
        try:
            command_type = output_to_command_type[output]
        except KeyError:
            msg = "invalid format, received %s, expected one of %s" % (
                output,
                ",".join(output_to_command_type.keys()),
            )
            raise ConnectionError(msg)

    if isinstance(commands, (list, set, tuple)):
        commands = " ;".join(commands)

    # Order should not matter but some versions of NX-OS software fail
    # to process the payload properly if 'input' gets serialized before
    # 'type' and the payload of 'input' contains the word 'type'.
    msg = collections.OrderedDict()
    msg["version"] = version
    msg["type"] = command_type
    msg["chunk"] = chunk
    msg["sid"] = sid
    msg["input"] = commands
    msg["output_format"] = "json"

    return json.dumps(dict(ins_api=msg))

Anon7 - 2022
AnonSec Team