Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 3.146.37.217
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /lib/python3/dist-packages/ansible_collections/ansible/utils/plugins/plugin_utils/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /lib/python3/dist-packages/ansible_collections/ansible/utils/plugins/plugin_utils/consolidate.py
#
# -*- coding: utf-8 -*-
# Copyright 2022 Red Hat
# GNU General Public License v3.0+
# (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
#

"""
The consolidate plugin code
"""
from __future__ import absolute_import, division, print_function


__metaclass__ = type

import itertools

from ansible.errors import AnsibleFilterError


def _raise_error(err):
    """Raise an error message, prepend with filter name

    Args:
        filter (str): Filter name
        msg (str): Message specific to filter supplied

    Raises:
        AnsibleFilterError: AnsibleError with filter name and message
    """
    tmp_err = []
    tmplt_err = "Error when using plugin 'consolidate': '{filter}' reported {msg}"
    for filter in list(err.keys()):
        if err.get(filter):
            msg = ", ".join(err.get(filter))
            tmp_err.append(tmplt_err.format(filter=filter, msg=msg))
    error = "; ".join(tmp_err)
    raise AnsibleFilterError(error)


def fail_on_filter(validator_func):
    """Decorator to fail on supplied filters

    Args:
        validator_func (func): Function that generates failure messages

    Returns:
        raw: Value without errors if generated and not failed
    """

    def update_err(*args, **kwargs):
        """Filters return value or raises error as per supplied parameters

        Returns:
            any: Return value to the function call
        """
        res, err = validator_func(*args, **kwargs)
        if any(
            [
                err.get("fail_missing_match_key"),
                err.get("fail_duplicate"),
                err.get("fail_missing_match_value"),
            ],
        ):
            _raise_error(err)
        return res

    return update_err


@fail_on_filter
def check_missing_match_key_duplicate(data_sources, fail_missing_match_key, fail_duplicate):
    """Check if the match_key specified is present in all the supplied data,
    also check for duplicate data accross all the data sources

    Args:
        data_sources (list): list of dicts as data sources
        fail_missing_match_key (bool): Fails if match_keys not present in data set
        fail_duplicate (bool): Fails if duplicate data present in a data
    Returns:
        list: list of unique keys based on specified match_keys
    """
    results, errors_match_key, errors_duplicate = [], [], []
    for ds_idx, data_source in enumerate(data_sources, start=1):
        match_key = data_source["match_key"]
        ds_values = []

        for dd_idx, data_dict in enumerate(data_source["data"], start=1):
            try:
                ds_values.append(data_dict[match_key])
            except KeyError:
                if fail_missing_match_key:
                    errors_match_key.append(
                        "missing match key '{match_key}' in data source {ds_idx} in list entry {dd_idx}".format(
                            match_key=match_key,
                            ds_idx=ds_idx,
                            dd_idx=dd_idx,
                        ),
                    )
                continue

        if sorted(set(ds_values)) != sorted(ds_values) and fail_duplicate:
            errors_duplicate.append(
                "duplicate values in data source {ds_idx}".format(ds_idx=ds_idx),
            )
        results.append(set(ds_values))
    return results, {
        "fail_missing_match_key": errors_match_key,
        "fail_duplicate": errors_duplicate,
    }


@fail_on_filter
def check_missing_match_values(matched_keys, fail_missing_match_value):
    """Checks values to match be consistent over all the whole data source

    Args:
        matched_keys (list): list of unique keys based on specified match_keys
        fail_missing_match_value (bool): Fail if match_key value is missing in a data set
    Returns:
        set: set of unique values
    """
    all_values = set(itertools.chain.from_iterable(matched_keys))
    if not fail_missing_match_value:
        return all_values, {}
    errors_match_values = []
    for ds_idx, ds_values in enumerate(matched_keys, start=1):
        missing_match = all_values - ds_values
        if missing_match:
            m_matches = ", ".join(missing_match)
            errors_match_values.append(
                "missing match value {m_matches} in data source {ds_idx}".format(
                    ds_idx=ds_idx,
                    m_matches=m_matches,
                ),
            )
    return all_values, {"fail_missing_match_value": errors_match_values}


def consolidate_facts(data_sources, all_values):
    """Iterate over all the data sources and consolidate the data

    Args:
        data_sources (list): supplied data sources
        all_values (set): a set of keys to iterate over

    Returns:
        list: list of consolidated data
    """

    consolidated_facts = {}
    for data_source in data_sources:
        match_key = data_source["match_key"]
        source = data_source["name"]
        data_dict = {d[match_key]: d for d in data_source["data"] if match_key in d}
        for value in sorted(all_values):
            if value not in consolidated_facts:
                consolidated_facts[value] = {}
            consolidated_facts[value][source] = data_dict.get(value, {})
    return consolidated_facts


def consolidate(
    data_sources,
    fail_missing_match_key,
    fail_missing_match_value,
    fail_duplicate,
):
    """Calls data validation and consolidation functions

    Args:
        data_source (list): list of dicts as data sources
        fail_missing_match_key (bool, optional): Fails if match_keys not present in data set. Defaults to False.
        fail_missing_match_value (bool, optional): Fails if matching attribute missing in a data. Defaults to False.
        fail_duplicate (bool, optional): Fails if duplicate data present in a data. Defaults to False.

    Returns:
        list: list of dicts of validated and consolidated data
    """

    key_sets = check_missing_match_key_duplicate(
        data_sources,
        fail_missing_match_key,
        fail_duplicate,
    )
    key_vals = check_missing_match_values(key_sets, fail_missing_match_value)
    consolidated_facts = consolidate_facts(data_sources, key_vals)
    return consolidated_facts

Anon7 - 2022
AnonSec Team