Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 3.149.234.78
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /usr/lib/python3/dist-packages/ansible_collections/community/dns/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /usr/lib/python3/dist-packages/ansible_collections/community/dns//update-docs-fragments.py
#!/usr/bin/env python
# Copyright (c) Ansible Project
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later

from __future__ import (absolute_import, division, print_function)
__metaclass__ = type


import importlib
import os
import re
import sys
import textwrap

import yaml


PROVIDERS = ['hetzner', 'hosttech']

DEPENDENT_FRAGMENTS = [
    ('RECORD_TYPE_CHOICES', [
        ('record_type', 'options.type', [
            'module_record',
            'module_record_info',
            'module_record_set',
            'module_record_set_info',
        ]),
    ]),
    ('RECORD_DEFAULT_TTL', [
        ('record_default_ttl', 'options.ttl', [
            'module_record',
            'module_record_set',
        ]),
    ]),
    ('RECORD_TYPE_CHOICES_RECORD_SETS_MODULE', [
        ('record_type', 'options.record_sets.suboptions.type', [
            'module_record_sets',
        ]),
        ('record_default_ttl', 'options.record_sets.suboptions.ttl', [
            'module_record_sets',
        ]),
    ]),
    ('RECORD_TYPE_CHOICES_RECORDS_INVENTORY', [
        ('record_type', 'options.filters.suboptions.type', [
            'inventory_records',
        ]),
    ]),
    ('ZONE_ID_TYPE', [
        ('zone_id_type', 'options.zone_id', [
            'module_record',
            'module_record_info',
            'module_record_set',
            'module_record_set_info',
            'module_record_sets',
            'module_zone_info',
            'inventory_records',
        ]),
    ]),
]


def get_provider_informations(providers):
    files_to_remove = []

    def add_init_py(path):
        path = os.path.join(path, '__init__.py')
        if os.path.exists(path):
            return
        with open(path, 'wb') as f:
            f.write(b'')
        files_to_remove.append(path)

    try:
        sys.path.append(os.path.join('..', '..', '..'))

        add_init_py(os.path.join('..', '..'))
        add_init_py(os.path.join('..'))
        add_init_py(os.path.join('.'))
        add_init_py(os.path.join('plugins'))
        add_init_py(os.path.join('plugins', 'module_utils'))

        provider_infos = {}
        errors = []

        for provider in providers:
            add_init_py(os.path.join('plugins', 'module_utils', provider))
            full_py_path = 'ansible_collections.community.dns.plugins.module_utils.{0}.api'.format(provider)
            full_pathname = os.path.join('plugins', 'module_utils', provider, 'api.py')
            try:
                loader = importlib.machinery.SourceFileLoader(full_py_path, full_pathname)
                spec = importlib.util.spec_from_loader(full_py_path, loader)
                the_module = importlib.util.module_from_spec(spec)
                loader.exec_module(the_module)
            except Exception as e:
                errors.append('{0}: Error while importing module {1}: {2}'.format(full_pathname, full_py_path, e))
                continue

            create_provider_info_fn_name = 'create_{0}_provider_information'.format(provider)
            try:
                create_provider_info_fn = provider_information = the_module.__dict__[create_provider_info_fn_name]
                provider_infos[provider] = create_provider_info_fn()
            except KeyError as e:
                errors.append('{0}: Cannot find function {1}'.format(full_pathname, create_provider_info_fn))
            except Exception as e:
                errors.append('{0}: Error while invoking function {1}: {2}'.format(full_pathname, create_provider_info_fn, e))

        return provider_infos, errors
    finally:
        for path in files_to_remove:
            os.remove(path)


class DocFragmentParseError(Exception):
    def __init__(self, path, error_message):
        self.path = path
        self.error_message = error_message
        super(DocFragmentParseError, self).__init__('Error while parsing {0}: {1}'.format(path, error_message))


DOC_FRAGMENT_START_MATCHER = re.compile(r"^    ([A-Z_]+) = r'''$")


class Dumper(yaml.SafeDumper):
    def ignore_aliases(self, *args):
        return True

    def increase_indent(self, flow=False, *args, **kwargs):
        self.best_indent = kwargs.pop('ident_override', 4)
        return super().increase_indent(*args, **kwargs)

    def expect_block_sequence(self):
        self.increase_indent(flow=False, indentless=False, ident_override=2)
        self.state = self.expect_first_block_sequence_item


class DocFragment:
    def __init__(self, path, prefix_lines, name, lines):
        self.prefix_lines = prefix_lines
        self.name = name
        self.lines = lines

        try:
            self.data = yaml.safe_load('\n'.join(self.lines))
        except Exception as e:
            raise DocFragmentParseError(path, 'Error while parsing part {0}: {1}'.format(name, e))

    def recreate_lines(self):
        data = yaml.dump(self.data, default_flow_style=False, indent=4, Dumper=Dumper, sort_keys=False)
        self.lines = data.splitlines()

    def serialize_lines(self):
        return self.prefix_lines + ["    {0} = r'''".format(self.name)] + self.lines + ["'''"]


class DocFragmentFile:
    def __init__(self, path):
        with open(path, 'r', encoding='utf-8') as f:
            lines = f.read().splitlines()

        self.prefix = []
        self.fragments = []
        self.fragments_by_name = {}

        where = 'prefix'
        for line in lines:
            if where == 'prefix':
                self.prefix.append(line)
                if line == 'class ModuleDocFragment(object):':
                    where = 'body'
                    body_prefix = []
                    body_name = None
                    body_lines = []
            elif where == 'body':
                if body_name is None:
                    m = DOC_FRAGMENT_START_MATCHER.match(line)
                    if m:
                        body_name = m.group(1)
                    else:
                        body_prefix.append(line)
                elif line == "'''":
                    fragment = DocFragment(path, body_prefix, body_name, body_lines)
                    self.fragments.append(fragment)
                    self.fragments_by_name[body_name] = fragment
                    body_prefix = []
                    body_name = None
                    body_lines = []
                else:
                    body_lines.append(line)

        if where == 'prefix':
            raise DocFragmentParseError(path, 'Cannot find body')

    def serialize_to_string(self):
        lines = []
        lines.extend(self.prefix)
        for fragment in self.fragments:
            lines.extend(fragment.serialize_lines())
        lines.append('')
        return '\n'.join(lines)


def doc_fragment_fn(name):
    return os.path.join('plugins', 'doc_fragments', '{0}.py'.format(name))


def load_doc_fragment(name):
    fn = doc_fragment_fn(name)
    return DocFragmentFile(fn)


def load_single_doc_fragment(name):
    fragment = 'DOCUMENTATION'
    if '.' in name:
        name, fragment = name.split('.', 1)
        fragment = fragment.upper()
    doc_fragment = load_doc_fragment(name)
    return doc_fragment.fragments_by_name[fragment]


def write_doc_fragment(name, doc_fragment):
    fn = doc_fragment_fn(name)
    data = doc_fragment.serialize_to_string()
    with open(fn, 'w', encoding='utf-8') as f:
        f.write(data)


def compare_doc_fragment(name, doc_fragment):
    fn = doc_fragment_fn(name)
    data = doc_fragment.serialize_to_string()
    with open(fn, 'r', encoding='utf-8') as f:
        compare_data = f.read()
    return data == compare_data


def augment_fragment(provider_fragment, provider_info):
    data = {
        'record_type': {'choices': sorted(provider_info.get_supported_record_types())},
        'zone_id_type': {'type': provider_info.get_zone_id_type()},
        'record_id_type': {'type': provider_info.get_record_id_type()},
        'record_default_ttl': {'default': provider_info.get_record_default_ttl()}
    }

    for fragment_name, fragment_insertion_data in DEPENDENT_FRAGMENTS:
        insertion_fragment = provider_fragment.fragments_by_name.get(fragment_name)
        if insertion_fragment is None:
            insertion_fragment = DocFragment('', [], fragment_name, [])
            provider_fragment.fragments.append(insertion_fragment)
            provider_fragment.fragments_by_name[fragment_name] = insertion_fragment

        insertion_fragment.data = {}
        all_doc_fragment_names = set()
        for what, insertion_point, doc_fragment_names in fragment_insertion_data:
            all_doc_fragment_names.update(doc_fragment_names)
            doc_fragments = [load_single_doc_fragment(doc_fragment) for doc_fragment in doc_fragment_names]
            insertion_point = insertion_point.split('.')

            insertion_pos = insertion_fragment.data
            original_pos = doc_fragments[0].data  # FIXME
            for depth, part in enumerate(insertion_point):
                if part not in insertion_pos:
                    insertion_pos[part] = {}
                insertion_pos = insertion_pos[part]
                original_pos = original_pos[part]
                if depth >= 2:
                    for x in original_pos:
                        if x not in insertion_pos:
                            insertion_pos[x] = original_pos[x]
            insertion_pos.update(data[what])

        insertion_fragment.prefix_lines = [
            '',
            '    # WARNING: This section is automatically generated by update-docs-fragments.py.',
        ]
        insertion_fragment.prefix_lines.extend([
            '    #          {0}'.format(line)
            for line in textwrap.wrap(
                'It is used to augment the docs fragment{0} {1}.'.format(
                    's' if len(all_doc_fragment_names) != 1 else '',
                    ', '.join(sorted(all_doc_fragment_names))),
                width=80)
        ])
        insertion_fragment.prefix_lines.append('    #          DO NOT EDIT MANUALLY!')
        insertion_fragment.recreate_lines()


def main(program, arguments):
    lint = '--lint' in arguments
    provider_infos, errors = get_provider_informations(PROVIDERS)
    try:
        for provider, provider_info in sorted(provider_infos.items()):
            try:
                doc_fragment = load_doc_fragment(provider)

                augment_fragment(doc_fragment, provider_info)

                if not compare_doc_fragment(provider, doc_fragment):
                    path = doc_fragment_fn(provider)
                    if lint:
                        errors.append('{0}: Needs to be updated by update-docs-fragments.py'.format(path))
                    else:
                        print('Writing {0}...'.format(path))
                        write_doc_fragment(provider, doc_fragment)

            except DocFragmentParseError as e:
                errors.append('{0}: Error while parsing docs fragment: {1}'.format(e.path, e.error_message))

    except Exception as e:
        errors.append('{0}: Unexpected error: {1}'.format(program, e))

    for error in errors:
        print(error)
    return 5 if errors else 0


if __name__ == '__main__':
    sys.exit(main(sys.argv[0], sys.argv[1:]))

Anon7 - 2022
AnonSec Team