Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 18.191.234.200
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /lib/python3/dist-packages/ansible_collections/ansible/netcommon/plugins/action/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /lib/python3/dist-packages/ansible_collections/ansible/netcommon/plugins/action/net_put.py
# (c) 2018, Ansible Inc,
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later

from __future__ import absolute_import, division, print_function

__metaclass__ = type

import hashlib
import os
import uuid

from ansible.errors import AnsibleError
from ansible.module_utils._text import to_bytes, to_text
from ansible.module_utils.connection import Connection, ConnectionError
from ansible.module_utils.six.moves.urllib.parse import urlsplit
from ansible.plugins.action import ActionBase
from ansible.utils.display import Display

display = Display()


class ActionModule(ActionBase):
    def run(self, tmp=None, task_vars=None):
        changed = False
        socket_path = None
        network_os = self._get_network_os(task_vars).split(".")[-1]
        persistent_connection = self._play_context.connection.split(".")[-1]

        result = super(ActionModule, self).run(task_vars=task_vars)

        if persistent_connection != "network_cli":
            # It is supported only with network_cli
            result["failed"] = True
            result["msg"] = (
                "connection type %s is not valid for net_put module,"
                " please use fully qualified name of network_cli connection type"
                % self._play_context.connection
            )
            return result

        try:
            src = self._task.args["src"]
        except KeyError as exc:
            return {
                "failed": True,
                "msg": "missing required argument: %s" % exc,
            }

        src_file_path_name = src

        # Get destination file if specified
        dest = self._task.args.get("dest")

        # Get proto
        proto = self._task.args.get("protocol")
        if proto is None:
            proto = "scp"

        # Get mode if set
        mode = self._task.args.get("mode")
        if mode is None:
            mode = "binary"

        if mode == "text":
            try:
                self._handle_src_option(convert_data=False)
            except AnsibleError as exc:
                return dict(failed=True, msg=to_text(exc))

            # Now src has resolved file write to disk in current diectory for scp
            src = self._task.args.get("src")
            filename = str(uuid.uuid4())
            cwd = self._loader.get_basedir()
            output_file = os.path.join(cwd, filename)
            try:
                with open(output_file, "wb") as f:
                    f.write(to_bytes(src, encoding="utf-8"))
            except Exception:
                os.remove(output_file)
                raise
        else:
            try:
                output_file = self._get_binary_src_file(src)
            except ValueError as exc:
                return dict(failed=True, msg=to_text(exc))

        if socket_path is None:
            socket_path = self._connection.socket_path

        conn = Connection(socket_path)
        sock_timeout = conn.get_option("persistent_command_timeout")

        if dest is None:
            dest = src_file_path_name
        try:
            changed = self._handle_existing_file(
                conn, output_file, dest, proto, sock_timeout
            )
            if changed is False:
                result["changed"] = changed
                result["destination"] = dest
                if mode == "text":
                    # Cleanup tmp file expanded wih ansible vars
                    os.remove(output_file)
                return result
        except Exception as exc:
            result["msg"] = (
                "Warning: %s idempotency check failed. Check dest" % exc
            )

        try:
            conn.copy_file(
                source=output_file,
                destination=dest,
                proto=proto,
                timeout=sock_timeout,
            )
        except Exception as exc:
            if to_text(exc) == "No response from server":
                if network_os == "iosxr":
                    # IOSXR sometimes closes socket prematurely after completion
                    # of file transfer
                    result[
                        "msg"
                    ] = "Warning: iosxr scp server pre close issue. Please check dest"
            else:
                result["failed"] = True
                result["msg"] = "Exception received: %s" % exc

        if mode == "text":
            # Cleanup tmp file expanded wih ansible vars
            os.remove(output_file)

        result["changed"] = changed
        result["destination"] = dest
        return result

    def _handle_existing_file(self, conn, source, dest, proto, timeout):
        """
        Determines whether the source and destination file match.

        :return: False if source and dest both exist and have matching sha1 sums, True otherwise.
        """
        cwd = self._loader.get_basedir()
        filename = str(uuid.uuid4())
        tmp_source_file = os.path.join(cwd, filename)
        try:
            conn.get_file(
                source=dest,
                destination=tmp_source_file,
                proto=proto,
                timeout=timeout,
            )
        except ConnectionError as exc:
            error = to_text(exc)
            if error.endswith("No such file or directory"):
                if os.path.exists(tmp_source_file):
                    os.remove(tmp_source_file)
                return True
        try:
            with open(source, "r") as f:
                new_content = f.read()
            with open(tmp_source_file, "r") as f:
                old_content = f.read()
        except (IOError, OSError):
            os.remove(tmp_source_file)
            raise

        sha1 = hashlib.sha1()
        old_content_b = to_bytes(old_content, errors="surrogate_or_strict")
        sha1.update(old_content_b)
        checksum_old = sha1.digest()

        sha1 = hashlib.sha1()
        new_content_b = to_bytes(new_content, errors="surrogate_or_strict")
        sha1.update(new_content_b)
        checksum_new = sha1.digest()
        os.remove(tmp_source_file)
        if checksum_old == checksum_new:
            return False
        return True

    def _get_binary_src_file(self, src):
        working_path = self._get_working_path()

        if os.path.isabs(src) or urlsplit("src").scheme:
            source = src
        else:
            source = self._loader.path_dwim_relative(
                working_path, "templates", src
            )
            if not source:
                source = self._loader.path_dwim_relative(working_path, src)

        if not os.path.exists(source):
            raise ValueError("path specified in src not found")

        return source

    def _get_working_path(self):
        cwd = self._loader.get_basedir()
        if self._task._role is not None:
            cwd = self._task._role._role_path
        return cwd

    def _handle_src_option(self, convert_data=True):
        src = self._task.args.get("src")
        working_path = self._get_working_path()

        if os.path.isabs(src) or urlsplit("src").scheme:
            source = src
        else:
            source = self._loader.path_dwim_relative(
                working_path, "templates", src
            )
            if not source:
                source = self._loader.path_dwim_relative(working_path, src)

        if not os.path.exists(source):
            raise AnsibleError("path specified in src not found")

        try:
            with open(source, "r") as f:
                template_data = to_text(f.read())
        except IOError as e:
            raise AnsibleError(
                "unable to load src file {0}, I/O error({1}): {2}".format(
                    source, e.errno, e.strerror
                )
            )

        # Create a template search path in the following order:
        # [working_path, self_role_path, dependent_role_paths, dirname(source)]
        searchpath = [working_path]
        if self._task._role is not None:
            searchpath.append(self._task._role._role_path)
            if hasattr(self._task, "_block:"):
                dep_chain = self._task._block.get_dep_chain()
                if dep_chain is not None:
                    for role in dep_chain:
                        searchpath.append(role._role_path)
        searchpath.append(os.path.dirname(source))
        self._templar.environment.loader.searchpath = searchpath
        self._task.args["src"] = self._templar.template(template_data)

    def _get_network_os(self, task_vars):
        if "network_os" in self._task.args and self._task.args["network_os"]:
            display.vvvv("Getting network OS from task argument")
            network_os = self._task.args["network_os"]
        elif self._play_context.network_os:
            display.vvvv("Getting network OS from inventory")
            network_os = self._play_context.network_os
        elif (
            "network_os" in task_vars.get("ansible_facts", {})
            and task_vars["ansible_facts"]["network_os"]
        ):
            display.vvvv("Getting network OS from fact")
            network_os = task_vars["ansible_facts"]["network_os"]
        else:
            raise AnsibleError(
                "ansible_network_os must be specified on this host"
            )

        return network_os

Anon7 - 2022
AnonSec Team