Server IP : 85.214.239.14 / Your IP : 3.145.95.76 Web Server : Apache/2.4.62 (Debian) System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Mon Sep 30 15:36:27 MSK 2024 x86_64 User : www-data ( 33) PHP Version : 7.4.18 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare, MySQL : OFF | cURL : OFF | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : OFF Directory : /lib/python3/dist-packages/ansible_collections/cloud/common/plugins/module_utils/turbo/ |
Upload File : |
# Copyright (c) 2021 Red Hat # # This code is part of Ansible, but is an independent component. # This particular file snippet, and this file snippet only, is BSD licensed. # Modules you write using this snippet, which is embedded dynamically by Ansible # still belong to the author of the module, and may assign their own license # to the complete work. # # Redistribution and use in source and binary forms, with or without modification, # are permitted provided that the following conditions are met: # # * Redistributions of source code must retain the above copyright # notice, this list of conditions and the following disclaimer. # * Redistributions in binary form must reproduce the above copyright notice, # this list of conditions and the following disclaimer in the documentation # and/or other materials provided with the distribution. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED # WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. # IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, # INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, # PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE # USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. # import json import os import os.path import sys import tempfile import ansible.module_utils.basic from .exceptions import ( EmbeddedModuleSuccess, EmbeddedModuleFailure, ) import ansible_collections.cloud.common.plugins.module_utils.turbo.common if False: # pylint: disable=using-constant-test from .server import please_include_me # This is a trick to be sure server.py is embedded in the Ansiblez # zip archive.🥷 please_include_me def get_collection_name_from_path(): module_path = ansible.module_utils.basic.get_module_path() ansiblez = module_path.split("/")[-3] if ansiblez.startswith("ansible_") and ansiblez.endswith(".zip"): return ".".join(ansiblez[8:].split(".")[:2]) def expand_argument_specs_aliases(argument_spec): """Returns a dict of accepted argument that includes the aliases""" expanded_argument_specs = {} for k, v in argument_spec.items(): for alias in [k] + v.get("aliases", []): expanded_argument_specs[alias] = v return expanded_argument_specs def prepare_args(argument_specs, params): """Take argument_spec and the user params and prepare the final argument structure.""" def _keep_value(v, argument_specs, key, subkey=None): if v is None: # cannot be a valide parameter return False if key not in argument_specs: # should never happen return if not subkey: # level 1 parameter return v != argument_specs[key].get("default") elif subkey not in argument_specs[key]: # Freeform return True elif isinstance(argument_specs[key][subkey], dict): return v != argument_specs[key][subkey].get("default") else: # should never happen return True def _is_an_alias(k): aliases = argument_specs[k].get("aliases") return aliases and k in aliases new_params = {} for k, v in params.items(): if not _keep_value(v, argument_specs, k): continue if _is_an_alias(k): continue if isinstance(v, dict): new_params[k] = { i: j for i, j in v.items() if _keep_value(j, argument_specs, k, i) } else: new_params[k] = v args = {"ANSIBLE_MODULE_ARGS": new_params} return args class AnsibleTurboModule(ansible.module_utils.basic.AnsibleModule): embedded_in_server = False collection_name = None def __init__(self, *args, **kwargs): self.embedded_in_server = sys.argv[0].endswith("/server.py") self.collection_name = ( AnsibleTurboModule.collection_name or get_collection_name_from_path() ) ansible.module_utils.basic.AnsibleModule.__init__( self, *args, bypass_checks=not self.embedded_in_server, **kwargs ) self._running = None if not self.embedded_in_server: self.run_on_daemon() def socket_path(self): if self._remote_tmp is None: abs_remote_tmp = tempfile.gettempdir() else: abs_remote_tmp = os.path.expanduser(os.path.expandvars(self._remote_tmp)) return os.path.join(abs_remote_tmp, f"turbo_mode.{self.collection_name}.socket") def init_args(self): argument_specs = expand_argument_specs_aliases(self.argument_spec) args = prepare_args(argument_specs, self.params) for k in ansible.module_utils.basic.PASS_VARS: attribute = ansible.module_utils.basic.PASS_VARS[k][0] if not hasattr(self, attribute): continue v = getattr(self, attribute) if isinstance(v, int) or isinstance(v, bool) or isinstance(v, str): args["ANSIBLE_MODULE_ARGS"][f"_ansible_{k}"] = v return args def run_on_daemon(self): result = dict(changed=False, original_message="", message="") ttl = os.environ.get("ANSIBLE_TURBO_LOOKUP_TTL", None) with ansible_collections.cloud.common.plugins.module_utils.turbo.common.connect( socket_path=self.socket_path(), ttl=ttl ) as turbo_socket: ansiblez_path = sys.path[0] args = self.init_args() data = [ ansiblez_path, json.dumps(args), dict(os.environ), ] content = json.dumps(data).encode() result = turbo_socket.communicate(content) self.exit_json(**result) def exit_json(self, **kwargs): if not self.embedded_in_server: super().exit_json(**kwargs) else: self.do_cleanup_files() raise EmbeddedModuleSuccess(**kwargs) def fail_json(self, *args, **kwargs): if not self.embedded_in_server: super().fail_json(**kwargs) else: self.do_cleanup_files() raise EmbeddedModuleFailure(*args, **kwargs)