Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 18.226.166.207
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /proc/3/task/3/cwd/lib/python3/dist-packages/ansible_test/_internal/commands/coverage/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /proc/3/task/3/cwd/lib/python3/dist-packages/ansible_test/_internal/commands/coverage/combine.py
"""Combine code coverage files."""
from __future__ import annotations

import collections.abc as c
import os
import json
import typing as t

from ...target import (
    walk_compile_targets,
    walk_powershell_targets,
)

from ...io import (
    read_text_file,
)

from ...util import (
    ANSIBLE_TEST_TOOLS_ROOT,
    display,
    ApplicationError,
    raw_command,
)

from ...util_common import (
    ResultType,
    write_json_file,
    write_json_test_results,
)

from ...executor import (
    Delegate,
)

from ...data import (
    data_context,
    PayloadConfig,
)

from ...host_configs import (
    DockerConfig,
    RemoteConfig,
)

from ...provisioning import (
    HostState,
    prepare_profiles,
)

from . import (
    enumerate_python_arcs,
    enumerate_powershell_lines,
    get_collection_path_regexes,
    get_all_coverage_files,
    get_python_coverage_files,
    get_python_modules,
    get_powershell_coverage_files,
    initialize_coverage,
    COVERAGE_OUTPUT_FILE_NAME,
    COVERAGE_GROUPS,
    CoverageConfig,
    PathChecker,
)

TValue = t.TypeVar('TValue')


def command_coverage_combine(args: CoverageCombineConfig) -> None:
    """Patch paths in coverage files and merge into a single file."""
    host_state = prepare_profiles(args)  # coverage combine
    combine_coverage_files(args, host_state)


def combine_coverage_files(args: CoverageCombineConfig, host_state: HostState) -> list[str]:
    """Combine coverage and return a list of the resulting files."""
    if args.delegate:
        if isinstance(args.controller, (DockerConfig, RemoteConfig)):
            paths = get_all_coverage_files()
            exported_paths = [path for path in paths if os.path.basename(path).split('=')[-1].split('.')[:2] == ['coverage', 'combined']]

            if not exported_paths:
                raise ExportedCoverageDataNotFound()

            pairs = [(path, os.path.relpath(path, data_context().content.root)) for path in exported_paths]

            def coverage_callback(payload_config: PayloadConfig) -> None:
                """Add the coverage files to the payload file list."""
                display.info('Including %d exported coverage file(s) in payload.' % len(pairs), verbosity=1)
                files = payload_config.files
                files.extend(pairs)

            data_context().register_payload_callback(coverage_callback)

        raise Delegate(host_state=host_state)

    paths = _command_coverage_combine_powershell(args) + _command_coverage_combine_python(args, host_state)

    for path in paths:
        display.info('Generated combined output: %s' % path, verbosity=1)

    return paths


class ExportedCoverageDataNotFound(ApplicationError):
    """Exception when no combined coverage data is present yet is required."""

    def __init__(self) -> None:
        super().__init__(
            'Coverage data must be exported before processing with the `--docker` or `--remote` option.\n'
            'Export coverage with `ansible-test coverage combine` using the `--export` option.\n'
            'The exported files must be in the directory: %s/' % ResultType.COVERAGE.relative_path
        )


def _command_coverage_combine_python(args: CoverageCombineConfig, host_state: HostState) -> list[str]:
    """Combine Python coverage files and return a list of the output files."""
    coverage = initialize_coverage(args, host_state)

    modules = get_python_modules()

    coverage_files = get_python_coverage_files()

    def _default_stub_value(source_paths: list[str]) -> dict[str, set[tuple[int, int]]]:
        return {path: set() for path in source_paths}

    counter = 0
    sources = _get_coverage_targets(args, walk_compile_targets)
    groups = _build_stub_groups(args, sources, _default_stub_value)

    collection_search_re, collection_sub_re = get_collection_path_regexes()

    for coverage_file in coverage_files:
        counter += 1
        display.info('[%4d/%4d] %s' % (counter, len(coverage_files), coverage_file), verbosity=2)

        group = get_coverage_group(args, coverage_file)

        if group is None:
            display.warning('Unexpected name for coverage file: %s' % coverage_file)
            continue

        for filename, arcs in enumerate_python_arcs(coverage_file, coverage, modules, collection_search_re, collection_sub_re):
            if args.export:
                filename = os.path.relpath(filename)  # exported paths must be relative since absolute paths may differ between systems

            if group not in groups:
                groups[group] = {}

            arc_data = groups[group]

            if filename not in arc_data:
                arc_data[filename] = set()

            arc_data[filename].update(arcs)

    output_files = []

    if args.export:
        coverage_file = os.path.join(args.export, '')
        suffix = '=coverage.combined'
    else:
        coverage_file = os.path.join(ResultType.COVERAGE.path, COVERAGE_OUTPUT_FILE_NAME)
        suffix = ''

    path_checker = PathChecker(args, collection_search_re)

    for group in sorted(groups):
        arc_data = groups[group]
        output_file = coverage_file + group + suffix

        if args.explain:
            continue

        updated = coverage.CoverageData(output_file)

        for filename in arc_data:
            if not path_checker.check_path(filename):
                continue

            updated.add_arcs({filename: list(arc_data[filename])})

        if args.all:
            updated.add_arcs(dict((source[0], []) for source in sources))

        updated.write()  # always write files to make sure stale files do not exist

        if updated:
            # only report files which are non-empty to prevent coverage from reporting errors
            output_files.append(output_file)

    path_checker.report()

    return sorted(output_files)


def _command_coverage_combine_powershell(args: CoverageCombineConfig) -> list[str]:
    """Combine PowerShell coverage files and return a list of the output files."""
    coverage_files = get_powershell_coverage_files()

    def _default_stub_value(source_paths: list[str]) -> dict[str, dict[int, int]]:
        cmd = ['pwsh', os.path.join(ANSIBLE_TEST_TOOLS_ROOT, 'coverage_stub.ps1')]
        cmd.extend(source_paths)

        stubs = json.loads(raw_command(cmd, capture=True)[0])

        return dict((d['Path'], dict((line, 0) for line in d['Lines'])) for d in stubs)

    counter = 0
    sources = _get_coverage_targets(args, walk_powershell_targets)
    groups = _build_stub_groups(args, sources, _default_stub_value)

    collection_search_re, collection_sub_re = get_collection_path_regexes()

    for coverage_file in coverage_files:
        counter += 1
        display.info('[%4d/%4d] %s' % (counter, len(coverage_files), coverage_file), verbosity=2)

        group = get_coverage_group(args, coverage_file)

        if group is None:
            display.warning('Unexpected name for coverage file: %s' % coverage_file)
            continue

        for filename, hits in enumerate_powershell_lines(coverage_file, collection_search_re, collection_sub_re):
            if args.export:
                filename = os.path.relpath(filename)  # exported paths must be relative since absolute paths may differ between systems

            if group not in groups:
                groups[group] = {}

            coverage_data = groups[group]

            if filename not in coverage_data:
                coverage_data[filename] = {}

            file_coverage = coverage_data[filename]

            for line_no, hit_count in hits.items():
                file_coverage[line_no] = file_coverage.get(line_no, 0) + hit_count

    output_files = []

    path_checker = PathChecker(args)

    for group in sorted(groups):
        coverage_data = dict((filename, data) for filename, data in groups[group].items() if path_checker.check_path(filename))

        if args.all:
            missing_sources = [source for source, _source_line_count in sources if source not in coverage_data]
            coverage_data.update(_default_stub_value(missing_sources))

        if not args.explain:
            if args.export:
                output_file = os.path.join(args.export, group + '=coverage.combined')
                write_json_file(output_file, coverage_data, formatted=False)
                output_files.append(output_file)
                continue

            output_file = COVERAGE_OUTPUT_FILE_NAME + group + '-powershell'

            write_json_test_results(ResultType.COVERAGE, output_file, coverage_data, formatted=False)

            output_files.append(os.path.join(ResultType.COVERAGE.path, output_file))

    path_checker.report()

    return sorted(output_files)


def _get_coverage_targets(args: CoverageCombineConfig, walk_func: c.Callable) -> list[tuple[str, int]]:
    """Return a list of files to cover and the number of lines in each file, using the given function as the source of the files."""
    sources = []

    if args.all or args.stub:
        # excludes symlinks of regular files to avoid reporting on the same file multiple times
        # in the future it would be nice to merge any coverage for symlinks into the real files
        for target in walk_func(include_symlinks=False):
            target_path = os.path.abspath(target.path)

            target_lines = len(read_text_file(target_path).splitlines())

            sources.append((target_path, target_lines))

        sources.sort()

    return sources


def _build_stub_groups(
    args: CoverageCombineConfig,
    sources: list[tuple[str, int]],
    default_stub_value: c.Callable[[list[str]], dict[str, TValue]],
) -> dict[str, dict[str, TValue]]:
    """
    Split the given list of sources with line counts into groups, maintaining a maximum line count for each group.
    Each group consists of a dictionary of sources and default coverage stubs generated by the provided default_stub_value function.
    """
    groups = {}

    if args.stub:
        stub_group: list[str] = []
        stub_groups = [stub_group]
        stub_line_limit = 500000
        stub_line_count = 0

        for source, source_line_count in sources:
            stub_group.append(source)
            stub_line_count += source_line_count

            if stub_line_count > stub_line_limit:
                stub_line_count = 0
                stub_group = []
                stub_groups.append(stub_group)

        for stub_index, stub_group in enumerate(stub_groups):
            if not stub_group:
                continue

            groups['=stub-%02d' % (stub_index + 1)] = default_stub_value(stub_group)

    return groups


def get_coverage_group(args: CoverageCombineConfig, coverage_file: str) -> t.Optional[str]:
    """Return the name of the coverage group for the specified coverage file, or None if no group was found."""
    parts = os.path.basename(coverage_file).split('=', 4)

    if len(parts) != 5 or not parts[4].startswith('coverage.'):
        return None

    names = dict(
        command=parts[0],
        target=parts[1],
        environment=parts[2],
        version=parts[3],
    )

    export_names = dict(
        version=parts[3],
    )

    group = ''

    for part in COVERAGE_GROUPS:
        if part in args.group_by:
            group += '=%s' % names[part]
        elif args.export:
            group += '=%s' % export_names.get(part, 'various')

    if args.export:
        group = group.lstrip('=')

    return group


class CoverageCombineConfig(CoverageConfig):
    """Configuration for the coverage combine command."""

    def __init__(self, args: t.Any) -> None:
        super().__init__(args)

        self.group_by: frozenset[str] = frozenset(args.group_by) if args.group_by else frozenset()
        self.all: bool = args.all
        self.stub: bool = args.stub

        # only available to coverage combine
        self.export: str = args.export if 'export' in args else False

Anon7 - 2022
AnonSec Team