Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 18.224.32.56
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /lib/python3/dist-packages/ansible_collections/ansible/netcommon/plugins/action/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /lib/python3/dist-packages/ansible_collections/ansible/netcommon/plugins/action/net_get.py
# (c) 2018, Ansible Inc,
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later

from __future__ import absolute_import, division, print_function

__metaclass__ = type

import hashlib
import os
import re
import uuid

from ansible.errors import AnsibleError
from ansible.module_utils._text import to_bytes, to_text
from ansible.module_utils.connection import Connection, ConnectionError
from ansible.module_utils.six.moves.urllib.parse import urlsplit
from ansible.plugins.action import ActionBase
from ansible.utils.display import Display

display = Display()


class ActionModule(ActionBase):
    def run(self, tmp=None, task_vars=None):
        changed = False
        socket_path = None
        self._get_network_os(task_vars)
        persistent_connection = self._play_context.connection.split(".")[-1]

        result = super(ActionModule, self).run(task_vars=task_vars)

        if persistent_connection != "network_cli":
            # It is supported only with network_cli
            result["failed"] = True
            result["msg"] = (
                "connection type %s is not valid for net_get module,"
                " please use fully qualified name of network_cli connection type"
                % self._play_context.connection
            )
            return result

        try:
            src = self._task.args["src"]
        except KeyError as exc:
            return {
                "failed": True,
                "msg": "missing required argument: %s" % exc,
            }

        # Get destination file if specified
        dest = self._task.args.get("dest")

        if dest is None:
            dest = self._get_default_dest(src)
        else:
            dest = self._handle_dest_path(dest)

        # Get proto
        proto = self._task.args.get("protocol")
        if proto is None:
            proto = "scp"

        if socket_path is None:
            socket_path = self._connection.socket_path

        conn = Connection(socket_path)
        sock_timeout = conn.get_option("persistent_command_timeout")

        try:
            changed = self._handle_existing_file(
                conn, src, dest, proto, sock_timeout
            )
            if changed is False:
                result["changed"] = changed
                result["destination"] = dest
                return result
        except Exception as exc:
            result["msg"] = (
                "Warning: %s idempotency check failed. Check dest" % exc
            )

        try:
            conn.get_file(
                source=src, destination=dest, proto=proto, timeout=sock_timeout
            )
        except Exception as exc:
            result["failed"] = True
            result["msg"] = "Exception received: %s" % exc

        result["changed"] = changed
        result["destination"] = dest
        return result

    def _handle_dest_path(self, dest):
        working_path = self._get_working_path()

        if os.path.isabs(dest) or urlsplit("dest").scheme:
            dst = dest
        else:
            dst = self._loader.path_dwim_relative(working_path, "", dest)

        return dst

    def _get_src_filename_from_path(self, src_path):
        filename_list = re.split("/|:", src_path)
        return filename_list[-1]

    def _get_default_dest(self, src_path):
        dest_path = self._get_working_path()
        src_fname = self._get_src_filename_from_path(src_path)
        filename = "%s/%s" % (dest_path, src_fname)
        return filename

    def _handle_existing_file(self, conn, source, dest, proto, timeout):
        """
        Determines whether the source and destination file match.

        :return: False if source and dest both exist and have matching sha1 sums, True otherwise.
        """
        if not os.path.exists(dest):
            return True

        cwd = self._loader.get_basedir()
        filename = str(uuid.uuid4())
        tmp_dest_file = os.path.join(cwd, filename)
        try:
            conn.get_file(
                source=source,
                destination=tmp_dest_file,
                proto=proto,
                timeout=timeout,
            )
        except ConnectionError as exc:
            error = to_text(exc)
            if error.endswith("No such file or directory"):
                if os.path.exists(tmp_dest_file):
                    os.remove(tmp_dest_file)
                return True

        try:
            with open(tmp_dest_file, "r") as f:
                new_content = f.read()
            with open(dest, "r") as f:
                old_content = f.read()
        except (IOError, OSError):
            os.remove(tmp_dest_file)
            raise

        sha1 = hashlib.sha1()
        old_content_b = to_bytes(old_content, errors="surrogate_or_strict")
        sha1.update(old_content_b)
        checksum_old = sha1.digest()

        sha1 = hashlib.sha1()
        new_content_b = to_bytes(new_content, errors="surrogate_or_strict")
        sha1.update(new_content_b)
        checksum_new = sha1.digest()
        os.remove(tmp_dest_file)
        if checksum_old == checksum_new:
            return False
        return True

    def _get_working_path(self):
        cwd = self._loader.get_basedir()
        if self._task._role is not None:
            cwd = self._task._role._role_path
        return cwd

    def _get_network_os(self, task_vars):
        if "network_os" in self._task.args and self._task.args["network_os"]:
            display.vvvv("Getting network OS from task argument")
            network_os = self._task.args["network_os"]
        elif self._play_context.network_os:
            display.vvvv("Getting network OS from inventory")
            network_os = self._play_context.network_os
        elif (
            "network_os" in task_vars.get("ansible_facts", {})
            and task_vars["ansible_facts"]["network_os"]
        ):
            display.vvvv("Getting network OS from fact")
            network_os = task_vars["ansible_facts"]["network_os"]
        else:
            raise AnsibleError(
                "ansible_network_os must be specified on this host"
            )

        return network_os

Anon7 - 2022
AnonSec Team