Server IP : 85.214.239.14 / Your IP : 3.16.212.203 Web Server : Apache/2.4.62 (Debian) System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64 User : www-data ( 33) PHP Version : 7.4.18 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare, MySQL : OFF | cURL : OFF | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : OFF Directory : /lib/python3/dist-packages/ansible_collections/community/okd/plugins/module_utils/ |
Upload File : |
#!/usr/bin/env python from __future__ import (absolute_import, division, print_function) __metaclass__ = type import re import operator from functools import reduce from ansible_collections.community.okd.plugins.module_utils.openshift_common import AnsibleOpenshiftModule try: from ansible_collections.kubernetes.core.plugins.module_utils.k8s.resource import create_definitions from ansible_collections.kubernetes.core.plugins.module_utils.k8s.exceptions import CoreException except ImportError: pass from ansible.module_utils._text import to_native try: from kubernetes.dynamic.exceptions import DynamicApiError, NotFoundError, ForbiddenError except ImportError as e: pass TRIGGER_ANNOTATION = 'image.openshift.io/triggers' TRIGGER_CONTAINER = re.compile(r"(?P<path>.*)\[((?P<index>[0-9]+)|\?\(@\.name==[\"'\\]*(?P<name>[a-z0-9]([-a-z0-9]*[a-z0-9])?))") class OKDRawModule(AnsibleOpenshiftModule): def __init__(self, **kwargs): super(OKDRawModule, self).__init__(**kwargs) @property def module(self): return self._module def execute_module(self): results = [] changed = False try: definitions = create_definitions(self.params) except Exception as e: msg = "Failed to load resource definition: {0}".format(e) raise CoreException(msg) from e for definition in definitions: result = {"changed": False, "result": {}} warnings = [] if self.params.get("state") != 'absent': existing = None name = definition.get("metadata", {}).get("name") namespace = definition.get("metadata", {}).get("namespace") if definition.get("kind") in ['Project', 'ProjectRequest']: try: resource = self.svc.find_resource(kind=definition.get("kind"), api_version=definition.get("apiVersion", "v1")) existing = resource.get(name=name, namespace=namespace).to_dict() except (NotFoundError, ForbiddenError): result = self.create_project_request(definition) changed |= result["changed"] results.append(result) continue except DynamicApiError as exc: self.fail_json(msg='Failed to retrieve requested object: {0}'.format(exc.body), error=exc.status, status=exc.status, reason=exc.reason) if definition.get("kind") not in ['Project', 'ProjectRequest']: try: resource = self.svc.find_resource(kind=definition.get("kind"), api_version=definition.get("apiVersion", "v1")) existing = resource.get(name=name, namespace=namespace).to_dict() except Exception: existing = None if existing: if resource.kind == 'DeploymentConfig': if definition.get('spec', {}).get('triggers'): definition = self.resolve_imagestream_triggers(existing, definition) elif existing['metadata'].get('annotations', {}).get(TRIGGER_ANNOTATION): definition = self.resolve_imagestream_trigger_annotation(existing, definition) if self.params.get("validate") is not None: warnings = self.validate(definition) try: result = self.perform_action(definition, self.params) except Exception as e: try: error = e.result except AttributeError: error = {} try: error["reason"] = e.__cause__.reason except AttributeError: pass error["msg"] = to_native(e) if warnings: error.setdefault("warnings", []).extend(warnings) if self.params.get("continue_on_error"): result["error"] = error else: self.fail_json(**error) if warnings: result.setdefault("warnings", []).extend(warnings) changed |= result["changed"] results.append(result) if len(results) == 1: self.exit_json(**results[0]) self.exit_json(**{"changed": changed, "result": {"results": results}}) @staticmethod def get_index(desired, objects, keys): """ Iterates over keys, returns the first object from objects where the value of the key matches the value in desired """ # pylint: disable=use-a-generator # Use a generator instead 'all(desired.get(key, True) == item.get(key, False) for key in keys)' for i, item in enumerate(objects): if item and all([desired.get(key, True) == item.get(key, False) for key in keys]): return i def resolve_imagestream_trigger_annotation(self, existing, definition): import yaml def get_from_fields(d, fields): try: return reduce(operator.getitem, fields, d) except Exception: return None def set_from_fields(d, fields, value): get_from_fields(d, fields[:-1])[fields[-1]] = value if TRIGGER_ANNOTATION in definition['metadata'].get('annotations', {}).keys(): triggers = yaml.safe_load(definition['metadata']['annotations'][TRIGGER_ANNOTATION] or '[]') else: triggers = yaml.safe_load(existing['metadata'].get('annotations', '{}').get(TRIGGER_ANNOTATION, '[]')) if not isinstance(triggers, list): return definition for trigger in triggers: if trigger.get('fieldPath'): parsed = self.parse_trigger_fieldpath(trigger['fieldPath']) path = parsed.get('path', '').split('.') if path: existing_containers = get_from_fields(existing, path) new_containers = get_from_fields(definition, path) if parsed.get('name'): existing_index = self.get_index({'name': parsed['name']}, existing_containers, ['name']) new_index = self.get_index({'name': parsed['name']}, new_containers, ['name']) elif parsed.get('index') is not None: existing_index = new_index = int(parsed['index']) else: existing_index = new_index = None if existing_index is not None and new_index is not None: if existing_index < len(existing_containers) and new_index < len(new_containers): set_from_fields(definition, path + [new_index, 'image'], get_from_fields(existing, path + [existing_index, 'image'])) return definition def resolve_imagestream_triggers(self, existing, definition): existing_triggers = existing.get('spec', {}).get('triggers') new_triggers = definition['spec']['triggers'] existing_containers = existing.get('spec', {}).get('template', {}).get('spec', {}).get('containers', []) new_containers = definition.get('spec', {}).get('template', {}).get('spec', {}).get('containers', []) for i, trigger in enumerate(new_triggers): if trigger.get('type') == 'ImageChange' and trigger.get('imageChangeParams'): names = trigger['imageChangeParams'].get('containerNames', []) for name in names: old_container_index = self.get_index({'name': name}, existing_containers, ['name']) new_container_index = self.get_index({'name': name}, new_containers, ['name']) if old_container_index is not None and new_container_index is not None: image = existing['spec']['template']['spec']['containers'][old_container_index]['image'] definition['spec']['template']['spec']['containers'][new_container_index]['image'] = image existing_index = self.get_index(trigger['imageChangeParams'], [x.get('imageChangeParams') for x in existing_triggers], ['containerNames']) if existing_index is not None: existing_image = existing_triggers[existing_index].get('imageChangeParams', {}).get('lastTriggeredImage') if existing_image: definition['spec']['triggers'][i]['imageChangeParams']['lastTriggeredImage'] = existing_image existing_from = existing_triggers[existing_index].get('imageChangeParams', {}).get('from', {}) new_from = trigger['imageChangeParams'].get('from', {}) existing_namespace = existing_from.get('namespace') existing_name = existing_from.get('name', False) new_name = new_from.get('name', True) add_namespace = existing_namespace and 'namespace' not in new_from.keys() and existing_name == new_name if add_namespace: definition['spec']['triggers'][i]['imageChangeParams']['from']['namespace'] = existing_from['namespace'] return definition def parse_trigger_fieldpath(self, expression): parsed = TRIGGER_CONTAINER.search(expression).groupdict() if parsed.get('index'): parsed['index'] = int(parsed['index']) return parsed def create_project_request(self, definition): definition['kind'] = 'ProjectRequest' result = {'changed': False, 'result': {}} resource = self.svc.find_resource(kind='ProjectRequest', api_version=definition['apiVersion'], fail=True) if not self.check_mode: try: k8s_obj = resource.create(definition) result['result'] = k8s_obj.to_dict() except DynamicApiError as exc: self.fail_json(msg="Failed to create object: {0}".format(exc.body), error=exc.status, status=exc.status, reason=exc.reason) result['changed'] = True result['method'] = 'create' return result