Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 3.144.252.243
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /lib/python3/dist-packages/ansible_collections/community/general/plugins/module_utils/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /lib/python3/dist-packages/ansible_collections/community/general/plugins/module_utils/deps.py
# -*- coding: utf-8 -*-
# (c) 2022, Alexei Znamensky <russoz@gmail.com>
# Copyright (c) 2022, Ansible Project
# Simplified BSD License (see LICENSES/BSD-2-Clause.txt or https://opensource.org/licenses/BSD-2-Clause)
# SPDX-License-Identifier: BSD-2-Clause

from __future__ import absolute_import, division, print_function
__metaclass__ = type


import traceback
from contextlib import contextmanager

from ansible.module_utils.common.text.converters import to_native
from ansible.module_utils.basic import missing_required_lib


_deps = dict()


class _Dependency(object):
    _states = ["pending", "failure", "success"]

    def __init__(self, name, reason=None, url=None, msg=None):
        self.name = name
        self.reason = reason
        self.url = url
        self.msg = msg

        self.state = 0
        self.trace = None
        self.exc = None

    def succeed(self):
        self.state = 2

    def fail(self, exc, trace):
        self.state = 1
        self.exc = exc
        self.trace = trace

    @property
    def message(self):
        if self.msg:
            return to_native(self.msg)
        else:
            return missing_required_lib(self.name, reason=self.reason, url=self.url)

    @property
    def failed(self):
        return self.state == 1

    def validate(self, module):
        if self.failed:
            module.fail_json(msg=self.message, exception=self.trace)

    def __str__(self):
        return "<dependency: {0} [{1}]>".format(self.name, self._states[self.state])


@contextmanager
def declare(name, *args, **kwargs):
    dep = _Dependency(name, *args, **kwargs)
    try:
        yield dep
    except Exception as e:
        dep.fail(e, traceback.format_exc())
    else:
        dep.succeed()
    finally:
        _deps[name] = dep


def _select_names(spec):
    dep_names = sorted(_deps)

    if spec:
        if spec.startswith("-"):
            spec_split = spec[1:].split(":")
            for d in spec_split:
                dep_names.remove(d)
        else:
            spec_split = spec.split(":")
            dep_names = []
            for d in spec_split:
                _deps[d]  # ensure it exists
                dep_names.append(d)

    return dep_names


def validate(module, spec=None):
    for dep in _select_names(spec):
        _deps[dep].validate(module)


def failed(spec=None):
    return any(_deps[d].failed for d in _select_names(spec))

Anon7 - 2022
AnonSec Team