Server IP : 85.214.239.14 / Your IP : 18.190.24.245 Web Server : Apache/2.4.62 (Debian) System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Mon Sep 30 15:36:27 MSK 2024 x86_64 User : www-data ( 33) PHP Version : 7.4.18 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare, MySQL : OFF | cURL : OFF | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : OFF Directory : /lib/python3/dist-packages/ansible_collections/ansible/netcommon/plugins/action/ |
Upload File : |
# (c) 2017, Ansible by Red Hat, inc # GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) # SPDX-License-Identifier: GPL-3.0-or-later from __future__ import absolute_import, division, print_function __metaclass__ = type import copy from ansible.errors import AnsibleError from ansible.plugins.action import ActionBase from ansible.utils.display import Display display = Display() class ActionModule(ActionBase): def run(self, tmp=None, task_vars=None): del tmp # tmp no longer has any effect result = {} play_context = copy.deepcopy(self._play_context) play_context.network_os = self._get_network_os(task_vars) new_task = self._task.copy() module = self._get_implementation_module( play_context.network_os, self._task.action ) if not module: if self._task.args["fail_on_missing_module"]: result["failed"] = True else: result["failed"] = False result[ "msg" ] = "Could not find implementation module %s for %s" % ( self._task.action, play_context.network_os, ) return result new_task.action = module action = self._shared_loader_obj.action_loader.get( play_context.network_os, task=new_task, connection=self._connection, play_context=play_context, loader=self._loader, templar=self._templar, shared_loader_obj=self._shared_loader_obj, ) display.vvvv("Running implementation module %s" % module) return action.run(task_vars=task_vars) def _get_network_os(self, task_vars): if "network_os" in self._task.args and self._task.args["network_os"]: display.vvvv("Getting network OS from task argument") network_os = self._task.args["network_os"] elif self._play_context.network_os: display.vvvv("Getting network OS from inventory") network_os = self._play_context.network_os elif ( "network_os" in task_vars.get("ansible_facts", {}) and task_vars["ansible_facts"]["network_os"] ): display.vvvv("Getting network OS from fact") network_os = task_vars["ansible_facts"]["network_os"] else: raise AnsibleError( "ansible_network_os must be specified on this host to use platform agnostic modules" ) return network_os def _get_implementation_module(self, network_os, platform_agnostic_module): module_name = ( network_os.split(".")[-1] + "_" + platform_agnostic_module.partition("_")[2] ) if "." in network_os: fqcn_module = ".".join(network_os.split(".")[0:-1]) implementation_module = fqcn_module + "." + module_name else: implementation_module = module_name if implementation_module not in self._shared_loader_obj.module_loader: implementation_module = None return implementation_module