Server IP : 85.214.239.14 / Your IP : 3.136.22.12 Web Server : Apache/2.4.62 (Debian) System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64 User : www-data ( 33) PHP Version : 7.4.18 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare, MySQL : OFF | cURL : OFF | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : OFF Directory : /lib/python3/dist-packages/ansible_collections/ansible/netcommon/plugins/action/ |
Upload File : |
# # (c) 2018 Red Hat Inc. # GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) # SPDX-License-Identifier: GPL-3.0-or-later from __future__ import absolute_import, division, print_function __metaclass__ = type import os import re import time from ansible.errors import AnsibleError from ansible.module_utils._text import to_text from ansible.module_utils.six import PY3 from ansible.module_utils.six.moves.urllib.parse import urlsplit from ansible.plugins.action.normal import ActionModule as _ActionModule from ansible.utils.display import Display from ansible.utils.hashing import checksum, checksum_s display = Display() DEXEC_PREFIX = "ANSIBLE_NETWORK_IMPORT_MODULES:" class ActionModule(_ActionModule): def run(self, tmp=None, task_vars=None): config_module = hasattr(self, "_config_module") and self._config_module if config_module and self._task.args.get("src"): try: self._handle_src_option() except AnsibleError as exc: return dict(failed=True, msg=to_text(exc)) host = task_vars["ansible_host"] dexec_eligible = self._check_dexec_eligibility(host) # attempt to run using dexec if dexec_eligible: # find and load the module filename, module = self._find_load_module() display.vvvv( "{prefix} found {action} at {fname}".format( prefix=DEXEC_PREFIX, action=self._task.action, fname=filename, ), host, ) # not using AnsibleModule, return to normal run (eg eos_bgp) if getattr(module, "AnsibleModule", None): # patch and update the module self._patch_update_module(module, task_vars) display.vvvv( "{prefix} running {module}".format( prefix=DEXEC_PREFIX, module=self._task.action ), host, ) # execute the module, collect result result = self._exec_module(module) display.vvvv( "{prefix} complete".format(prefix=DEXEC_PREFIX), host ) display.vvvvv( "{prefix} Result: {result}".format( prefix=DEXEC_PREFIX, result=result ), host, ) else: dexec_eligible = False display.vvvv( "{prefix} {module} doesn't support direct execution, disabled".format( prefix=DEXEC_PREFIX, module=self._task.action ), host, ) if not dexec_eligible: result = super(ActionModule, self).run(task_vars=task_vars) if ( config_module and self._task.args.get("backup") and not result.get("failed") ): self._handle_backup_option(result, task_vars) return result def _handle_backup_option(self, result, task_vars): filename = None backup_path = None try: non_config_regexes = self._connection.cliconf.get_option( "non_config_lines", task_vars ) except (AttributeError, KeyError): non_config_regexes = [] try: content = self._sanitize_contents( contents=result.pop("__backup__"), filters=non_config_regexes ) except KeyError: raise AnsibleError("Failed while reading configuration backup") backup_options = self._task.args.get("backup_options") if backup_options: filename = backup_options.get("filename") backup_path = backup_options.get("dir_path") tstamp = time.strftime( "%Y-%m-%d@%H:%M:%S", time.localtime(time.time()) ) if not backup_path: cwd = self._get_working_path() backup_path = os.path.join(cwd, "backup") if not filename: filename = "%s_config.%s" % ( task_vars["inventory_hostname"], tstamp, ) dest = os.path.join(backup_path, filename) if not os.path.exists(backup_path): os.makedirs(backup_path) changed = False # Do not overwrite the destination if the contents match. if not os.path.exists(dest) or checksum(dest) != checksum_s(content): try: with open(dest, "w") as output_file: output_file.write(content) except Exception as exc: result["failed"] = True result[ "msg" ] = "Could not write to destination file %s: %s" % ( dest, to_text(exc), ) return changed = True result["backup_path"] = dest result["changed"] = changed result["date"], result["time"] = tstamp.split("@") if not (backup_options and backup_options.get("filename")): result["filename"] = os.path.basename(result["backup_path"]) result["shortname"] = os.path.splitext(result["backup_path"])[0] def _get_working_path(self): cwd = self._loader.get_basedir() if self._task._role is not None: cwd = self._task._role._role_path return cwd def _handle_src_option(self, convert_data=True): src = self._task.args.get("src") working_path = self._get_working_path() if os.path.isabs(src) or urlsplit("src").scheme: source = src else: source = self._loader.path_dwim_relative( working_path, "templates", src ) if not source: source = self._loader.path_dwim_relative(working_path, src) if not os.path.exists(source): raise AnsibleError("path specified in src not found") try: with open(source, "r") as f: template_data = to_text(f.read()) except IOError as e: raise AnsibleError( "unable to load src file {0}, I/O error({1}): {2}".format( source, e.errno, e.strerror ) ) # Create a template search path in the following order: # [working_path, self_role_path, dependent_role_paths, dirname(source)] searchpath = [working_path] if self._task._role is not None: searchpath.append(self._task._role._role_path) if hasattr(self._task, "_block:"): dep_chain = self._task._block.get_dep_chain() if dep_chain is not None: for role in dep_chain: searchpath.append(role._role_path) searchpath.append(os.path.dirname(source)) self._templar.environment.loader.searchpath = searchpath self._task.args["src"] = self._templar.template(template_data) def _get_network_os(self, task_vars): if "network_os" in self._task.args and self._task.args["network_os"]: display.vvvv("Getting network OS from task argument") network_os = self._task.args["network_os"] elif self._play_context.network_os: display.vvvv("Getting network OS from inventory") network_os = self._play_context.network_os elif ( "network_os" in task_vars.get("ansible_facts", {}) and task_vars["ansible_facts"]["network_os"] ): display.vvvv("Getting network OS from fact") network_os = task_vars["ansible_facts"]["network_os"] else: raise AnsibleError( "ansible_network_os must be specified on this host" ) return network_os def _check_dexec_eligibility(self, host): """Check if current python and task are eligble""" dexec = self.get_connection_option("import_modules") # log early about dexec if dexec: display.vvvv("{prefix} enabled".format(prefix=DEXEC_PREFIX), host) # disable dexec when not PY3 if not PY3: dexec = False display.vvvv( "{prefix} disabled for when not Python 3".format( prefix=DEXEC_PREFIX ), host=host, ) # disable dexec when running async if self._task.async_val: dexec = False display.vvvv( "{prefix} disabled for a task using async".format( prefix=DEXEC_PREFIX ), host=host, ) else: display.vvvv("{prefix} disabled".format(prefix=DEXEC_PREFIX), host) display.vvvv( "{prefix} module execution time may be extended".format( prefix=DEXEC_PREFIX ), host, ) return dexec def _find_load_module(self): """Use the task action to find a module and import it. :return filename: The module's filename :rtype filename: str :return module: The loaded module file :rtype module: module """ import importlib mloadr = self._shared_loader_obj.module_loader # 2.10 try: context = mloadr.find_plugin_with_context( self._task.action, collection_list=self._task.collections ) filename = context.plugin_resolved_path module = importlib.import_module(context.plugin_resolved_name) # 2.9 except AttributeError: fullname, filename = mloadr.find_plugin_with_name( self._task.action, collection_list=self._task.collections ) module = importlib.import_module(fullname) return filename, module def _patch_update_module(self, module, task_vars): """Update a module instance, replacing it's AnsibleModule with one that doesn't load params :param module: An loaded module :type module: A module file that was loaded :param task_vars: The vars provided to the task :type task_vars: dict """ import copy from ansible.module_utils.basic import AnsibleModule as _AnsibleModule # build an AnsibleModule that doesn't load params class PatchedAnsibleModule(_AnsibleModule): def _load_params(self): pass # update the task args w/ all the magic vars self._update_module_args(self._task.action, self._task.args, task_vars) # set the params of the ansible module cause we're not using stdin # use a copy so the module doesn't change the original task args PatchedAnsibleModule.params = copy.deepcopy(self._task.args) # give the module our revised AnsibleModule module.AnsibleModule = PatchedAnsibleModule def _exec_module(self, module): """exec the module's main() since modules print their result, we need to replace stdout with a buffer. If main() fails, we assume that as stderr Once we collect stdout/stderr, use our super to json load it or handle a traceback :param module: An loaded module :type module: A module file that was loaded :return module_result: The result of the module :rtype module_result: dict """ import io import sys from ansible.module_utils._text import to_native from ansible.vars.clean import remove_internal_keys # preserve previous stdout, replace with buffer sys_stdout = sys.stdout sys.stdout = io.StringIO() stdout = "" stderr = "" # run the module, catch the SystemExit so we continue try: module.main() except SystemExit: # module exited cleanly stdout = sys.stdout.getvalue() except Exception as exc: # dirty module or connection traceback stderr = to_native(exc) # restore stdout & stderr sys.stdout = sys_stdout # parse the response dict_out = { "stdout": stdout, "stdout_lines": stdout.splitlines(), "stderr": stderr, "stderr_lines": stderr.splitlines(), } data = self._parse_returned_data(dict_out) # Clean up the response like action _execute_module remove_internal_keys(data) # split stdout/stderr into lines if needed if "stdout" in data and "stdout_lines" not in data: # if the value is 'False', a default won't catch it. txt = data.get("stdout", None) or "" data["stdout_lines"] = txt.splitlines() if "stderr" in data and "stderr_lines" not in data: # if the value is 'False', a default won't catch it. txt = data.get("stderr", None) or "" data["stderr_lines"] = txt.splitlines() return data def _sanitize_contents(self, contents, filters): """remove lines from contents that match regexes specified in the `filters` list """ for x in filters: contents = re.sub(x, "", contents) return contents.strip()