Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 3.136.22.12
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /lib/python3/dist-packages/ansible_collections/ansible/netcommon/plugins/action/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /lib/python3/dist-packages/ansible_collections/ansible/netcommon/plugins/action/network.py
#
# (c) 2018 Red Hat Inc.
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later

from __future__ import absolute_import, division, print_function

__metaclass__ = type

import os
import re
import time

from ansible.errors import AnsibleError
from ansible.module_utils._text import to_text
from ansible.module_utils.six import PY3
from ansible.module_utils.six.moves.urllib.parse import urlsplit
from ansible.plugins.action.normal import ActionModule as _ActionModule
from ansible.utils.display import Display
from ansible.utils.hashing import checksum, checksum_s

display = Display()

DEXEC_PREFIX = "ANSIBLE_NETWORK_IMPORT_MODULES:"


class ActionModule(_ActionModule):
    def run(self, tmp=None, task_vars=None):
        config_module = hasattr(self, "_config_module") and self._config_module
        if config_module and self._task.args.get("src"):
            try:
                self._handle_src_option()
            except AnsibleError as exc:
                return dict(failed=True, msg=to_text(exc))

        host = task_vars["ansible_host"]
        dexec_eligible = self._check_dexec_eligibility(host)

        # attempt to run using dexec
        if dexec_eligible:
            # find and load the module
            filename, module = self._find_load_module()
            display.vvvv(
                "{prefix} found {action}  at {fname}".format(
                    prefix=DEXEC_PREFIX,
                    action=self._task.action,
                    fname=filename,
                ),
                host,
            )
            # not using AnsibleModule, return to normal run (eg eos_bgp)
            if getattr(module, "AnsibleModule", None):
                # patch and update the module
                self._patch_update_module(module, task_vars)
                display.vvvv(
                    "{prefix} running {module}".format(
                        prefix=DEXEC_PREFIX, module=self._task.action
                    ),
                    host,
                )
                # execute the module, collect result
                result = self._exec_module(module)
                display.vvvv(
                    "{prefix} complete".format(prefix=DEXEC_PREFIX), host
                )
                display.vvvvv(
                    "{prefix} Result: {result}".format(
                        prefix=DEXEC_PREFIX, result=result
                    ),
                    host,
                )

            else:
                dexec_eligible = False
                display.vvvv(
                    "{prefix} {module} doesn't support direct execution, disabled".format(
                        prefix=DEXEC_PREFIX, module=self._task.action
                    ),
                    host,
                )

        if not dexec_eligible:
            result = super(ActionModule, self).run(task_vars=task_vars)

        if (
            config_module
            and self._task.args.get("backup")
            and not result.get("failed")
        ):
            self._handle_backup_option(result, task_vars)

        return result

    def _handle_backup_option(self, result, task_vars):

        filename = None
        backup_path = None
        try:
            non_config_regexes = self._connection.cliconf.get_option(
                "non_config_lines", task_vars
            )
        except (AttributeError, KeyError):
            non_config_regexes = []
        try:
            content = self._sanitize_contents(
                contents=result.pop("__backup__"), filters=non_config_regexes
            )
        except KeyError:
            raise AnsibleError("Failed while reading configuration backup")

        backup_options = self._task.args.get("backup_options")
        if backup_options:
            filename = backup_options.get("filename")
            backup_path = backup_options.get("dir_path")

        tstamp = time.strftime(
            "%Y-%m-%d@%H:%M:%S", time.localtime(time.time())
        )
        if not backup_path:
            cwd = self._get_working_path()
            backup_path = os.path.join(cwd, "backup")
        if not filename:
            filename = "%s_config.%s" % (
                task_vars["inventory_hostname"],
                tstamp,
            )

        dest = os.path.join(backup_path, filename)
        if not os.path.exists(backup_path):
            os.makedirs(backup_path)

        changed = False
        # Do not overwrite the destination if the contents match.
        if not os.path.exists(dest) or checksum(dest) != checksum_s(content):
            try:
                with open(dest, "w") as output_file:
                    output_file.write(content)
            except Exception as exc:
                result["failed"] = True
                result[
                    "msg"
                ] = "Could not write to destination file %s: %s" % (
                    dest,
                    to_text(exc),
                )
                return
            changed = True

        result["backup_path"] = dest
        result["changed"] = changed

        result["date"], result["time"] = tstamp.split("@")
        if not (backup_options and backup_options.get("filename")):
            result["filename"] = os.path.basename(result["backup_path"])
            result["shortname"] = os.path.splitext(result["backup_path"])[0]

    def _get_working_path(self):
        cwd = self._loader.get_basedir()
        if self._task._role is not None:
            cwd = self._task._role._role_path
        return cwd

    def _handle_src_option(self, convert_data=True):
        src = self._task.args.get("src")
        working_path = self._get_working_path()

        if os.path.isabs(src) or urlsplit("src").scheme:
            source = src
        else:
            source = self._loader.path_dwim_relative(
                working_path, "templates", src
            )
            if not source:
                source = self._loader.path_dwim_relative(working_path, src)

        if not os.path.exists(source):
            raise AnsibleError("path specified in src not found")

        try:
            with open(source, "r") as f:
                template_data = to_text(f.read())
        except IOError as e:
            raise AnsibleError(
                "unable to load src file {0}, I/O error({1}): {2}".format(
                    source, e.errno, e.strerror
                )
            )

        # Create a template search path in the following order:
        # [working_path, self_role_path, dependent_role_paths, dirname(source)]
        searchpath = [working_path]
        if self._task._role is not None:
            searchpath.append(self._task._role._role_path)
            if hasattr(self._task, "_block:"):
                dep_chain = self._task._block.get_dep_chain()
                if dep_chain is not None:
                    for role in dep_chain:
                        searchpath.append(role._role_path)
        searchpath.append(os.path.dirname(source))
        self._templar.environment.loader.searchpath = searchpath
        self._task.args["src"] = self._templar.template(template_data)

    def _get_network_os(self, task_vars):
        if "network_os" in self._task.args and self._task.args["network_os"]:
            display.vvvv("Getting network OS from task argument")
            network_os = self._task.args["network_os"]
        elif self._play_context.network_os:
            display.vvvv("Getting network OS from inventory")
            network_os = self._play_context.network_os
        elif (
            "network_os" in task_vars.get("ansible_facts", {})
            and task_vars["ansible_facts"]["network_os"]
        ):
            display.vvvv("Getting network OS from fact")
            network_os = task_vars["ansible_facts"]["network_os"]
        else:
            raise AnsibleError(
                "ansible_network_os must be specified on this host"
            )

        return network_os

    def _check_dexec_eligibility(self, host):
        """Check if current python and task are eligble"""
        dexec = self.get_connection_option("import_modules")

        # log early about dexec
        if dexec:
            display.vvvv("{prefix} enabled".format(prefix=DEXEC_PREFIX), host)

            # disable dexec when not PY3
            if not PY3:
                dexec = False
                display.vvvv(
                    "{prefix} disabled for when not Python 3".format(
                        prefix=DEXEC_PREFIX
                    ),
                    host=host,
                )

            # disable dexec when running async
            if self._task.async_val:
                dexec = False
                display.vvvv(
                    "{prefix} disabled for a task using async".format(
                        prefix=DEXEC_PREFIX
                    ),
                    host=host,
                )
        else:
            display.vvvv("{prefix} disabled".format(prefix=DEXEC_PREFIX), host)
            display.vvvv(
                "{prefix} module execution time may be extended".format(
                    prefix=DEXEC_PREFIX
                ),
                host,
            )

        return dexec

    def _find_load_module(self):
        """Use the task action to find a module
        and import it.

        :return filename: The module's filename
        :rtype filename: str
        :return module: The loaded module file
        :rtype module: module
        """
        import importlib

        mloadr = self._shared_loader_obj.module_loader

        # 2.10
        try:
            context = mloadr.find_plugin_with_context(
                self._task.action, collection_list=self._task.collections
            )
            filename = context.plugin_resolved_path
            module = importlib.import_module(context.plugin_resolved_name)
        # 2.9
        except AttributeError:
            fullname, filename = mloadr.find_plugin_with_name(
                self._task.action, collection_list=self._task.collections
            )
            module = importlib.import_module(fullname)
        return filename, module

    def _patch_update_module(self, module, task_vars):
        """Update a module instance, replacing it's AnsibleModule
        with one that doesn't load params

        :param module: An loaded module
        :type module: A module file that was loaded
        :param task_vars: The vars provided to the task
        :type task_vars: dict
        """
        import copy

        from ansible.module_utils.basic import AnsibleModule as _AnsibleModule

        # build an AnsibleModule that doesn't load params
        class PatchedAnsibleModule(_AnsibleModule):
            def _load_params(self):
                pass

        # update the task args w/ all the magic vars
        self._update_module_args(self._task.action, self._task.args, task_vars)

        # set the params of the ansible module cause we're not using stdin
        # use a copy so the module doesn't change the original task args
        PatchedAnsibleModule.params = copy.deepcopy(self._task.args)

        # give the module our revised AnsibleModule
        module.AnsibleModule = PatchedAnsibleModule

    def _exec_module(self, module):
        """exec the module's main() since modules
        print their result, we need to replace stdout
        with a buffer. If main() fails, we assume that as stderr
        Once we collect stdout/stderr, use our super to json load
        it or handle a traceback

        :param module: An loaded module
        :type module: A module file that was loaded
        :return module_result: The result of the module
        :rtype module_result: dict
        """
        import io
        import sys

        from ansible.module_utils._text import to_native
        from ansible.vars.clean import remove_internal_keys

        # preserve previous stdout, replace with buffer
        sys_stdout = sys.stdout
        sys.stdout = io.StringIO()

        stdout = ""
        stderr = ""
        # run the module, catch the SystemExit so we continue
        try:
            module.main()
        except SystemExit:
            # module exited cleanly
            stdout = sys.stdout.getvalue()
        except Exception as exc:
            # dirty module or connection traceback
            stderr = to_native(exc)

        # restore stdout & stderr
        sys.stdout = sys_stdout

        # parse the response
        dict_out = {
            "stdout": stdout,
            "stdout_lines": stdout.splitlines(),
            "stderr": stderr,
            "stderr_lines": stderr.splitlines(),
        }
        data = self._parse_returned_data(dict_out)
        # Clean up the response like action _execute_module
        remove_internal_keys(data)

        # split stdout/stderr into lines if needed
        if "stdout" in data and "stdout_lines" not in data:
            # if the value is 'False', a default won't catch it.
            txt = data.get("stdout", None) or ""
            data["stdout_lines"] = txt.splitlines()
        if "stderr" in data and "stderr_lines" not in data:
            # if the value is 'False', a default won't catch it.
            txt = data.get("stderr", None) or ""
            data["stderr_lines"] = txt.splitlines()

        return data

    def _sanitize_contents(self, contents, filters):
        """remove lines from contents that match
        regexes specified in the `filters` list
        """
        for x in filters:
            contents = re.sub(x, "", contents)
        return contents.strip()

Anon7 - 2022
AnonSec Team