Server IP : 85.214.239.14 / Your IP : 3.137.164.200 Web Server : Apache/2.4.62 (Debian) System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64 User : www-data ( 33) PHP Version : 7.4.18 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare, MySQL : OFF | cURL : OFF | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : OFF Directory : /lib/python3/dist-packages/ansible_collections/ansible/netcommon/plugins/action/ |
Upload File : |
# (c) 2018, Ansible Inc, # GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) # SPDX-License-Identifier: GPL-3.0-or-later from __future__ import absolute_import, division, print_function __metaclass__ = type import hashlib import os import re import uuid from ansible.errors import AnsibleError from ansible.module_utils._text import to_bytes, to_text from ansible.module_utils.connection import Connection, ConnectionError from ansible.module_utils.six.moves.urllib.parse import urlsplit from ansible.plugins.action import ActionBase from ansible.utils.display import Display display = Display() class ActionModule(ActionBase): def run(self, tmp=None, task_vars=None): changed = False socket_path = None self._get_network_os(task_vars) persistent_connection = self._play_context.connection.split(".")[-1] result = super(ActionModule, self).run(task_vars=task_vars) if persistent_connection != "network_cli": # It is supported only with network_cli result["failed"] = True result["msg"] = ( "connection type %s is not valid for net_get module," " please use fully qualified name of network_cli connection type" % self._play_context.connection ) return result try: src = self._task.args["src"] except KeyError as exc: return { "failed": True, "msg": "missing required argument: %s" % exc, } # Get destination file if specified dest = self._task.args.get("dest") if dest is None: dest = self._get_default_dest(src) else: dest = self._handle_dest_path(dest) # Get proto proto = self._task.args.get("protocol") if proto is None: proto = "scp" if socket_path is None: socket_path = self._connection.socket_path conn = Connection(socket_path) sock_timeout = conn.get_option("persistent_command_timeout") try: changed = self._handle_existing_file( conn, src, dest, proto, sock_timeout ) if changed is False: result["changed"] = changed result["destination"] = dest return result except Exception as exc: result["msg"] = ( "Warning: %s idempotency check failed. Check dest" % exc ) try: conn.get_file( source=src, destination=dest, proto=proto, timeout=sock_timeout ) except Exception as exc: result["failed"] = True result["msg"] = "Exception received: %s" % exc result["changed"] = changed result["destination"] = dest return result def _handle_dest_path(self, dest): working_path = self._get_working_path() if os.path.isabs(dest) or urlsplit("dest").scheme: dst = dest else: dst = self._loader.path_dwim_relative(working_path, "", dest) return dst def _get_src_filename_from_path(self, src_path): filename_list = re.split("/|:", src_path) return filename_list[-1] def _get_default_dest(self, src_path): dest_path = self._get_working_path() src_fname = self._get_src_filename_from_path(src_path) filename = "%s/%s" % (dest_path, src_fname) return filename def _handle_existing_file(self, conn, source, dest, proto, timeout): """ Determines whether the source and destination file match. :return: False if source and dest both exist and have matching sha1 sums, True otherwise. """ if not os.path.exists(dest): return True cwd = self._loader.get_basedir() filename = str(uuid.uuid4()) tmp_dest_file = os.path.join(cwd, filename) try: conn.get_file( source=source, destination=tmp_dest_file, proto=proto, timeout=timeout, ) except ConnectionError as exc: error = to_text(exc) if error.endswith("No such file or directory"): if os.path.exists(tmp_dest_file): os.remove(tmp_dest_file) return True try: with open(tmp_dest_file, "r") as f: new_content = f.read() with open(dest, "r") as f: old_content = f.read() except (IOError, OSError): os.remove(tmp_dest_file) raise sha1 = hashlib.sha1() old_content_b = to_bytes(old_content, errors="surrogate_or_strict") sha1.update(old_content_b) checksum_old = sha1.digest() sha1 = hashlib.sha1() new_content_b = to_bytes(new_content, errors="surrogate_or_strict") sha1.update(new_content_b) checksum_new = sha1.digest() os.remove(tmp_dest_file) if checksum_old == checksum_new: return False return True def _get_working_path(self): cwd = self._loader.get_basedir() if self._task._role is not None: cwd = self._task._role._role_path return cwd def _get_network_os(self, task_vars): if "network_os" in self._task.args and self._task.args["network_os"]: display.vvvv("Getting network OS from task argument") network_os = self._task.args["network_os"] elif self._play_context.network_os: display.vvvv("Getting network OS from inventory") network_os = self._play_context.network_os elif ( "network_os" in task_vars.get("ansible_facts", {}) and task_vars["ansible_facts"]["network_os"] ): display.vvvv("Getting network OS from fact") network_os = task_vars["ansible_facts"]["network_os"] else: raise AnsibleError( "ansible_network_os must be specified on this host" ) return network_os