Server IP : 85.214.239.14 / Your IP : 3.148.115.43 Web Server : Apache/2.4.62 (Debian) System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64 User : www-data ( 33) PHP Version : 7.4.18 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare, MySQL : OFF | cURL : OFF | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : OFF Directory : /proc/self/root/lib/python3/dist-packages/ansible_collections/kubernetes/core/plugins/action/ |
Upload File : |
# Copyright (c) 2012-2014, Michael DeHaan <michael.dehaan@gmail.com> # Copyright (c) 2017, Toshio Kuratomi <tkuraotmi@ansible.com> # Copyright (c) 2020, Ansible Project # GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) from __future__ import absolute_import, division, print_function __metaclass__ = type import copy import traceback import os from contextlib import contextmanager import platform from ansible.config.manager import ensure_type from ansible.errors import ( AnsibleError, AnsibleFileNotFound, AnsibleAction, AnsibleActionFail, ) from ansible.module_utils.parsing.convert_bool import boolean from ansible.module_utils.six import string_types, iteritems from ansible.module_utils._text import to_text, to_bytes, to_native from ansible.plugins.action import ActionBase class RemoveOmit(object): def __init__(self, buffer, omit_value): try: import yaml except ImportError: raise AnsibleError("Failed to import the required Python library (PyYAML).") self.data = yaml.safe_load_all(buffer) self.omit = omit_value def remove_omit(self, data): if isinstance(data, dict): result = dict() for key, value in iteritems(data): if value == self.omit: continue result[key] = self.remove_omit(value) return result if isinstance(data, list): return [self.remove_omit(v) for v in data if v != self.omit] return data def output(self): return [self.remove_omit(d) for d in self.data] ENV_KUBECONFIG_PATH_SEPARATOR = ";" if platform.system() == "Windows" else ":" class ActionModule(ActionBase): TRANSFERS_FILES = True DEFAULT_NEWLINE_SEQUENCE = "\n" def _ensure_invocation(self, result): # NOTE: adding invocation arguments here needs to be kept in sync with # any no_log specified in the argument_spec in the module. if "invocation" not in result: if self._play_context.no_log: result["invocation"] = "CENSORED: no_log is set" else: result["invocation"] = self._task.args.copy() result["invocation"]["module_args"] = self._task.args.copy() return result @contextmanager def get_template_data(self, template_path): try: source = self._find_needle("templates", template_path) except AnsibleError as e: raise AnsibleActionFail(to_text(e)) # Get vault decrypted tmp file try: tmp_source = self._loader.get_real_file(source) except AnsibleFileNotFound as e: raise AnsibleActionFail( "could not find template=%s, %s" % (source, to_text(e)) ) b_tmp_source = to_bytes(tmp_source, errors="surrogate_or_strict") try: with open(b_tmp_source, "rb") as f: try: template_data = to_text(f.read(), errors="surrogate_or_strict") except UnicodeError: raise AnsibleActionFail( "Template source files must be utf-8 encoded" ) yield template_data except AnsibleAction: raise except Exception as e: raise AnsibleActionFail("%s: %s" % (type(e).__name__, to_text(e))) finally: self._loader.cleanup_tmp_file(b_tmp_source) def get_template_args(self, template): template_param = { "newline_sequence": self.DEFAULT_NEWLINE_SEQUENCE, "variable_start_string": None, "variable_end_string": None, "block_start_string": None, "block_end_string": None, "trim_blocks": True, "lstrip_blocks": False, } if isinstance(template, string_types): # treat this as raw_params template_param["path"] = template elif isinstance(template, dict): template_args = template template_path = template_args.get("path", None) if not template_path: raise AnsibleActionFail("Please specify path for template.") template_param["path"] = template_path # Options type validation strings for s_type in ( "newline_sequence", "variable_start_string", "variable_end_string", "block_start_string", "block_end_string", ): if s_type in template_args: value = ensure_type(template_args[s_type], "string") if value is not None and not isinstance(value, string_types): raise AnsibleActionFail( "%s is expected to be a string, but got %s instead" % (s_type, type(value)) ) try: template_param.update( { "trim_blocks": boolean( template_args.get("trim_blocks", True), strict=False ), "lstrip_blocks": boolean( template_args.get("lstrip_blocks", False), strict=False ), } ) except TypeError as e: raise AnsibleActionFail(to_native(e)) template_param.update( { "newline_sequence": template_args.get( "newline_sequence", self.DEFAULT_NEWLINE_SEQUENCE ), "variable_start_string": template_args.get( "variable_start_string", None ), "variable_end_string": template_args.get( "variable_end_string", None ), "block_start_string": template_args.get("block_start_string", None), "block_end_string": template_args.get("block_end_string", None), } ) else: raise AnsibleActionFail( "Error while reading template file - " "a string or dict for template expected, but got %s instead" % type(template) ) return template_param def import_jinja2_lstrip(self, templates): # Option `lstrip_blocks' was added in Jinja2 version 2.7. if any(tmp["lstrip_blocks"] for tmp in templates): try: import jinja2.defaults except ImportError: raise AnsibleError( "Unable to import Jinja2 defaults for determining Jinja2 features." ) try: jinja2.defaults.LSTRIP_BLOCKS except AttributeError: raise AnsibleError( "Option `lstrip_blocks' is only available in Jinja2 versions >=2.7" ) def load_template(self, template, new_module_args, task_vars): # template is only supported by k8s module. if self._task.action not in ( "k8s", "kubernetes.core.k8s", "community.okd.k8s", "redhat.openshift.k8s", "community.kubernetes.k8s", "openshift_adm_groups_sync", "community.okd.openshift_adm_groups_sync", "redhat.openshift.openshift_adm_groups_sync", ): raise AnsibleActionFail( "'template' is only a supported parameter for the 'k8s' module." ) omit_value = task_vars.get("omit") template_params = [] if isinstance(template, string_types) or isinstance(template, dict): template_params.append(self.get_template_args(template)) elif isinstance(template, list): for element in template: template_params.append(self.get_template_args(element)) else: raise AnsibleActionFail( "Error while reading template file - " "a string or dict for template expected, but got %s instead" % type(template) ) self.import_jinja2_lstrip(template_params) wrong_sequences = ["\\n", "\\r", "\\r\\n"] allowed_sequences = ["\n", "\r", "\r\n"] result_template = [] old_vars = self._templar.available_variables default_environment = {} for key in ( "newline_sequence", "variable_start_string", "variable_end_string", "block_start_string", "block_end_string", "trim_blocks", "lstrip_blocks", ): if hasattr(self._templar.environment, key): default_environment[key] = getattr(self._templar.environment, key) for template_item in template_params: # We need to convert unescaped sequences to proper escaped sequences for Jinja2 newline_sequence = template_item["newline_sequence"] if newline_sequence in wrong_sequences: template_item["newline_sequence"] = allowed_sequences[ wrong_sequences.index(newline_sequence) ] elif newline_sequence not in allowed_sequences: raise AnsibleActionFail( "newline_sequence needs to be one of: \n, \r or \r\n" ) # template the source data locally & get ready to transfer with self.get_template_data(template_item["path"]) as template_data: # add ansible 'template' vars temp_vars = copy.deepcopy(task_vars) for key, value in iteritems(template_item): if hasattr(self._templar.environment, key): if value is not None: setattr(self._templar.environment, key, value) else: setattr( self._templar.environment, key, default_environment.get(key), ) self._templar.available_variables = temp_vars result = self._templar.do_template( template_data, preserve_trailing_newlines=True, escape_backslashes=False, ) if omit_value is not None: result_template.extend(RemoveOmit(result, omit_value).output()) else: result_template.append(result) self._templar.available_variables = old_vars resource_definition = self._task.args.get("definition", None) if not resource_definition: new_module_args.pop("template") new_module_args["definition"] = result_template def get_file_realpath(self, local_path): # local_path is only supported by k8s_cp module. if self._task.action not in ( "k8s_cp", "kubernetes.core.k8s_cp", "community.kubernetes.k8s_cp", ): raise AnsibleActionFail( "'local_path' is only supported parameter for 'k8s_cp' module." ) if os.path.exists(local_path): return local_path try: # find in expected paths return self._find_needle("files", local_path) except AnsibleError: raise AnsibleActionFail( "%s does not exist in local filesystem" % local_path ) def get_kubeconfig(self, kubeconfig, remote_transport, new_module_args): if isinstance(kubeconfig, string_types): # find the kubeconfig in the expected search path if not remote_transport: # kubeconfig is local # find in expected paths configs = [] for config in kubeconfig.split(ENV_KUBECONFIG_PATH_SEPARATOR): config = self._find_needle("files", config) # decrypt kubeconfig found configs.append(self._loader.get_real_file(config, decrypt=True)) new_module_args["kubeconfig"] = ENV_KUBECONFIG_PATH_SEPARATOR.join( configs ) elif isinstance(kubeconfig, dict): new_module_args["kubeconfig"] = kubeconfig else: raise AnsibleActionFail( "Error while reading kubeconfig parameter - " "a string or dict expected, but got %s instead" % type(kubeconfig) ) def run(self, tmp=None, task_vars=None): """handler for k8s options""" if task_vars is None: task_vars = dict() result = super(ActionModule, self).run(tmp, task_vars) del tmp # tmp no longer has any effect # Check current transport connection and depending upon # look for kubeconfig and src # 'local' => look files on Ansible Controller # Transport other than 'local' => look files on remote node remote_transport = self._connection.transport != "local" new_module_args = copy.deepcopy(self._task.args) kubeconfig = self._task.args.get("kubeconfig", None) if kubeconfig: try: self.get_kubeconfig(kubeconfig, remote_transport, new_module_args) except AnsibleError as e: result["failed"] = True result["msg"] = to_text(e) result["exception"] = traceback.format_exc() return result # find the file in the expected search path src = self._task.args.get("src", None) if src and not src.startswith(("http://", "https://", "ftp://")): if remote_transport: # src is on remote node result.update( self._execute_module( module_name=self._task.action, task_vars=task_vars ) ) return self._ensure_invocation(result) # src is local try: # find in expected paths src = self._find_needle("files", src) except AnsibleError as e: result["failed"] = True result["msg"] = to_text(e) result["exception"] = traceback.format_exc() return result if src: new_module_args["src"] = src template = self._task.args.get("template", None) if template: self.load_template(template, new_module_args, task_vars) local_path = self._task.args.get("local_path") state = self._task.args.get("state", None) if local_path and state == "to_pod" and not remote_transport: new_module_args["local_path"] = self.get_file_realpath(local_path) # Execute the k8s_* module. module_return = self._execute_module( module_name=self._task.action, module_args=new_module_args, task_vars=task_vars, ) # Delete tmp path self._remove_tmp_path(self._connection._shell.tmpdir) result.update(module_return) return self._ensure_invocation(result)