Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 3.133.117.95
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /lib/python3/dist-packages/ansible_collections/community/general/plugins/callback/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /lib/python3/dist-packages/ansible_collections/community/general/plugins/callback/opentelemetry.py
# -*- coding: utf-8 -*-
# Copyright (c) 2021, Victor Martinez <VictorMartinezRubio@gmail.com>
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later

from __future__ import (absolute_import, division, print_function)
__metaclass__ = type

DOCUMENTATION = '''
    author: Victor Martinez (@v1v)  <VictorMartinezRubio@gmail.com>
    name: opentelemetry
    type: notification
    short_description: Create distributed traces with OpenTelemetry
    version_added: 3.7.0
    description:
      - This callback creates distributed traces for each Ansible task with OpenTelemetry.
      - You can configure the OpenTelemetry exporter and SDK with environment variables.
      - See U(https://opentelemetry-python.readthedocs.io/en/latest/exporter/otlp/otlp.html).
      - See U(https://opentelemetry-python.readthedocs.io/en/latest/sdk/environment_variables.html#opentelemetry-sdk-environment-variables).
    options:
      hide_task_arguments:
        default: false
        type: bool
        description:
          - Hide the arguments for a task.
        env:
          - name: ANSIBLE_OPENTELEMETRY_HIDE_TASK_ARGUMENTS
        ini:
          - section: callback_opentelemetry
            key: hide_task_arguments
            version_added: 5.3.0
      enable_from_environment:
        type: str
        description:
          - Whether to enable this callback only if the given environment variable exists and it is set to C(true).
          - This is handy when you use Configuration as Code and want to send distributed traces
            if running in the CI rather when running Ansible locally.
          - For such, it evaluates the given I(enable_from_environment) value as environment variable
            and if set to true this plugin will be enabled.
        env:
          - name: ANSIBLE_OPENTELEMETRY_ENABLE_FROM_ENVIRONMENT
        ini:
          - section: callback_opentelemetry
            key: enable_from_environment
            version_added: 5.3.0
        version_added: 3.8.0
      otel_service_name:
        default: ansible
        type: str
        description:
          - The service name resource attribute.
        env:
          - name: OTEL_SERVICE_NAME
        ini:
          - section: callback_opentelemetry
            key: otel_service_name
            version_added: 5.3.0
      traceparent:
        default: None
        type: str
        description:
          - The L(W3C Trace Context header traceparent,https://www.w3.org/TR/trace-context-1/#traceparent-header).
        env:
          - name: TRACEPARENT
      disable_logs:
        default: false
        type: bool
        description:
          - Disable sending logs.
        env:
          - name: ANSIBLE_OPENTELEMETRY_DISABLE_LOGS
        ini:
          - section: callback_opentelemetry
            key: disable_logs
        version_added: 5.8.0
    requirements:
      - opentelemetry-api (Python library)
      - opentelemetry-exporter-otlp (Python library)
      - opentelemetry-sdk (Python library)
'''


EXAMPLES = '''
examples: |
  Enable the plugin in ansible.cfg:
    [defaults]
    callbacks_enabled = community.general.opentelemetry
    [callback_opentelemetry]
    enable_from_environment = ANSIBLE_OPENTELEMETRY_ENABLED

  Set the environment variable:
    export OTEL_EXPORTER_OTLP_ENDPOINT=<your endpoint (OTLP/HTTP)>
    export OTEL_EXPORTER_OTLP_HEADERS="authorization=Bearer your_otel_token"
    export OTEL_SERVICE_NAME=your_service_name
    export ANSIBLE_OPENTELEMETRY_ENABLED=true
'''

import getpass
import os
import socket
import sys
import time
import uuid

from collections import OrderedDict
from os.path import basename

from ansible.errors import AnsibleError
from ansible.module_utils.six import raise_from
from ansible.module_utils.six.moves.urllib.parse import urlparse
from ansible.plugins.callback import CallbackBase

try:
    from opentelemetry import trace
    from opentelemetry.trace import SpanKind
    from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
    from opentelemetry.sdk.resources import SERVICE_NAME, Resource
    from opentelemetry.trace.status import Status, StatusCode
    from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
    from opentelemetry.sdk.trace import TracerProvider
    from opentelemetry.sdk.trace.export import (
        BatchSpanProcessor
    )

    # Support for opentelemetry-api <= 1.12
    try:
        from opentelemetry.util._time import _time_ns
    except ImportError as imp_exc:
        OTEL_LIBRARY_TIME_NS_ERROR = imp_exc
    else:
        OTEL_LIBRARY_TIME_NS_ERROR = None

except ImportError as imp_exc:
    OTEL_LIBRARY_IMPORT_ERROR = imp_exc
    OTEL_LIBRARY_TIME_NS_ERROR = imp_exc
else:
    OTEL_LIBRARY_IMPORT_ERROR = None


if sys.version_info >= (3, 7):
    time_ns = time.time_ns
elif not OTEL_LIBRARY_TIME_NS_ERROR:
    time_ns = _time_ns
else:
    def time_ns():
        # Support versions older than 3.7 with opentelemetry-api > 1.12
        return int(time.time() * 1e9)


class TaskData:
    """
    Data about an individual task.
    """

    def __init__(self, uuid, name, path, play, action, args):
        self.uuid = uuid
        self.name = name
        self.path = path
        self.play = play
        self.host_data = OrderedDict()
        self.start = time_ns()
        self.action = action
        self.args = args
        self.dump = None

    def add_host(self, host):
        if host.uuid in self.host_data:
            if host.status == 'included':
                # concatenate task include output from multiple items
                host.result = '%s\n%s' % (self.host_data[host.uuid].result, host.result)
            else:
                return

        self.host_data[host.uuid] = host


class HostData:
    """
    Data about an individual host.
    """

    def __init__(self, uuid, name, status, result):
        self.uuid = uuid
        self.name = name
        self.status = status
        self.result = result
        self.finish = time_ns()


class OpenTelemetrySource(object):
    def __init__(self, display):
        self.ansible_playbook = ""
        self.ansible_version = None
        self.session = str(uuid.uuid4())
        self.host = socket.gethostname()
        try:
            self.ip_address = socket.gethostbyname(socket.gethostname())
        except Exception as e:
            self.ip_address = None
        self.user = getpass.getuser()

        self._display = display

    def traceparent_context(self, traceparent):
        carrier = dict()
        carrier['traceparent'] = traceparent
        return TraceContextTextMapPropagator().extract(carrier=carrier)

    def start_task(self, tasks_data, hide_task_arguments, play_name, task):
        """ record the start of a task for one or more hosts """

        uuid = task._uuid

        if uuid in tasks_data:
            return

        name = task.get_name().strip()
        path = task.get_path()
        action = task.action
        args = None

        if not task.no_log and not hide_task_arguments:
            args = task.args

        tasks_data[uuid] = TaskData(uuid, name, path, play_name, action, args)

    def finish_task(self, tasks_data, status, result, dump):
        """ record the results of a task for a single host """

        task_uuid = result._task._uuid

        if hasattr(result, '_host') and result._host is not None:
            host_uuid = result._host._uuid
            host_name = result._host.name
        else:
            host_uuid = 'include'
            host_name = 'include'

        task = tasks_data[task_uuid]

        if self.ansible_version is None and hasattr(result, '_task_fields') and result._task_fields['args'].get('_ansible_version'):
            self.ansible_version = result._task_fields['args'].get('_ansible_version')

        task.dump = dump
        task.add_host(HostData(host_uuid, host_name, status, result))

    def generate_distributed_traces(self, otel_service_name, ansible_playbook, tasks_data, status, traceparent, disable_logs):
        """ generate distributed traces from the collected TaskData and HostData """

        tasks = []
        parent_start_time = None
        for task_uuid, task in tasks_data.items():
            if parent_start_time is None:
                parent_start_time = task.start
            tasks.append(task)

        trace.set_tracer_provider(
            TracerProvider(
                resource=Resource.create({SERVICE_NAME: otel_service_name})
            )
        )

        processor = BatchSpanProcessor(OTLPSpanExporter())

        trace.get_tracer_provider().add_span_processor(processor)

        tracer = trace.get_tracer(__name__)

        with tracer.start_as_current_span(ansible_playbook, context=self.traceparent_context(traceparent),
                                          start_time=parent_start_time, kind=SpanKind.SERVER) as parent:
            parent.set_status(status)
            # Populate trace metadata attributes
            if self.ansible_version is not None:
                parent.set_attribute("ansible.version", self.ansible_version)
            parent.set_attribute("ansible.session", self.session)
            parent.set_attribute("ansible.host.name", self.host)
            if self.ip_address is not None:
                parent.set_attribute("ansible.host.ip", self.ip_address)
            parent.set_attribute("ansible.host.user", self.user)
            for task in tasks:
                for host_uuid, host_data in task.host_data.items():
                    with tracer.start_as_current_span(task.name, start_time=task.start, end_on_exit=False) as span:
                        self.update_span_data(task, host_data, span, disable_logs)

    def update_span_data(self, task_data, host_data, span, disable_logs):
        """ update the span with the given TaskData and HostData """

        name = '[%s] %s: %s' % (host_data.name, task_data.play, task_data.name)

        message = 'success'
        res = {}
        rc = 0
        status = Status(status_code=StatusCode.OK)
        if host_data.status != 'included':
            # Support loops
            if 'results' in host_data.result._result:
                if host_data.status == 'failed':
                    message = self.get_error_message_from_results(host_data.result._result['results'], task_data.action)
                    enriched_error_message = self.enrich_error_message_from_results(host_data.result._result['results'], task_data.action)
            else:
                res = host_data.result._result
                rc = res.get('rc', 0)
                if host_data.status == 'failed':
                    message = self.get_error_message(res)
                    enriched_error_message = self.enrich_error_message(res)

            if host_data.status == 'failed':
                status = Status(status_code=StatusCode.ERROR, description=message)
                # Record an exception with the task message
                span.record_exception(BaseException(enriched_error_message))
            elif host_data.status == 'skipped':
                message = res['skip_reason'] if 'skip_reason' in res else 'skipped'
                status = Status(status_code=StatusCode.UNSET)
            elif host_data.status == 'ignored':
                status = Status(status_code=StatusCode.UNSET)

        span.set_status(status)
        if isinstance(task_data.args, dict) and "gather_facts" not in task_data.action:
            names = tuple(self.transform_ansible_unicode_to_str(k) for k in task_data.args.keys())
            values = tuple(self.transform_ansible_unicode_to_str(k) for k in task_data.args.values())
            self.set_span_attribute(span, ("ansible.task.args.name"), names)
            self.set_span_attribute(span, ("ansible.task.args.value"), values)
        self.set_span_attribute(span, "ansible.task.module", task_data.action)
        self.set_span_attribute(span, "ansible.task.message", message)
        self.set_span_attribute(span, "ansible.task.name", name)
        self.set_span_attribute(span, "ansible.task.result", rc)
        self.set_span_attribute(span, "ansible.task.host.name", host_data.name)
        self.set_span_attribute(span, "ansible.task.host.status", host_data.status)
        # This will allow to enrich the service map
        self.add_attributes_for_service_map_if_possible(span, task_data)
        # Send logs
        if not disable_logs:
            span.add_event(task_data.dump)
        span.end(end_time=host_data.finish)

    def set_span_attribute(self, span, attributeName, attributeValue):
        """ update the span attribute with the given attribute and value if not None """

        if span is None and self._display is not None:
            self._display.warning('span object is None. Please double check if that is expected.')
        else:
            if attributeValue is not None:
                span.set_attribute(attributeName, attributeValue)

    def add_attributes_for_service_map_if_possible(self, span, task_data):
        """Update the span attributes with the service that the task interacted with, if possible."""

        redacted_url = self.parse_and_redact_url_if_possible(task_data.args)
        if redacted_url:
            self.set_span_attribute(span, "http.url", redacted_url.geturl())

    @staticmethod
    def parse_and_redact_url_if_possible(args):
        """Parse and redact the url, if possible."""

        try:
            parsed_url = urlparse(OpenTelemetrySource.url_from_args(args))
        except ValueError:
            return None

        if OpenTelemetrySource.is_valid_url(parsed_url):
            return OpenTelemetrySource.redact_user_password(parsed_url)
        return None

    @staticmethod
    def url_from_args(args):
        # the order matters
        url_args = ("url", "api_url", "baseurl", "repo", "server_url", "chart_repo_url", "registry_url", "endpoint", "uri", "updates_url")
        for arg in url_args:
            if args is not None and args.get(arg):
                return args.get(arg)
        return ""

    @staticmethod
    def redact_user_password(url):
        return url._replace(netloc=url.hostname) if url.password else url

    @staticmethod
    def is_valid_url(url):
        if all([url.scheme, url.netloc, url.hostname]):
            return "{{" not in url.hostname
        return False

    @staticmethod
    def transform_ansible_unicode_to_str(value):
        parsed_url = urlparse(str(value))
        if OpenTelemetrySource.is_valid_url(parsed_url):
            return OpenTelemetrySource.redact_user_password(parsed_url).geturl()
        return str(value)

    @staticmethod
    def get_error_message(result):
        if result.get('exception') is not None:
            return OpenTelemetrySource._last_line(result['exception'])
        return result.get('msg', 'failed')

    @staticmethod
    def get_error_message_from_results(results, action):
        for result in results:
            if result.get('failed', False):
                return ('{0}({1}) - {2}').format(action, result.get('item', 'none'), OpenTelemetrySource.get_error_message(result))

    @staticmethod
    def _last_line(text):
        lines = text.strip().split('\n')
        return lines[-1]

    @staticmethod
    def enrich_error_message(result):
        message = result.get('msg', 'failed')
        exception = result.get('exception')
        stderr = result.get('stderr')
        return ('message: "{0}"\nexception: "{1}"\nstderr: "{2}"').format(message, exception, stderr)

    @staticmethod
    def enrich_error_message_from_results(results, action):
        message = ""
        for result in results:
            if result.get('failed', False):
                message = ('{0}({1}) - {2}\n{3}').format(action, result.get('item', 'none'), OpenTelemetrySource.enrich_error_message(result), message)
        return message


class CallbackModule(CallbackBase):
    """
    This callback creates distributed traces.
    """

    CALLBACK_VERSION = 2.0
    CALLBACK_TYPE = 'notification'
    CALLBACK_NAME = 'community.general.opentelemetry'
    CALLBACK_NEEDS_ENABLED = True

    def __init__(self, display=None):
        super(CallbackModule, self).__init__(display=display)
        self.hide_task_arguments = None
        self.disable_logs = None
        self.otel_service_name = None
        self.ansible_playbook = None
        self.play_name = None
        self.tasks_data = None
        self.errors = 0
        self.disabled = False
        self.traceparent = False

        if OTEL_LIBRARY_IMPORT_ERROR:
            raise_from(
                AnsibleError('The `opentelemetry-api`, `opentelemetry-exporter-otlp` or `opentelemetry-sdk` must be installed to use this plugin'),
                OTEL_LIBRARY_IMPORT_ERROR)

        self.tasks_data = OrderedDict()

        self.opentelemetry = OpenTelemetrySource(display=self._display)

    def set_options(self, task_keys=None, var_options=None, direct=None):
        super(CallbackModule, self).set_options(task_keys=task_keys,
                                                var_options=var_options,
                                                direct=direct)

        environment_variable = self.get_option('enable_from_environment')
        if environment_variable is not None and os.environ.get(environment_variable, 'false').lower() != 'true':
            self.disabled = True
            self._display.warning("The `enable_from_environment` option has been set and {0} is not enabled. "
                                  "Disabling the `opentelemetry` callback plugin.".format(environment_variable))

        self.hide_task_arguments = self.get_option('hide_task_arguments')

        self.disable_logs = self.get_option('disable_logs')

        self.otel_service_name = self.get_option('otel_service_name')

        if not self.otel_service_name:
            self.otel_service_name = 'ansible'

        # See https://github.com/open-telemetry/opentelemetry-specification/issues/740
        self.traceparent = self.get_option('traceparent')

    def v2_playbook_on_start(self, playbook):
        self.ansible_playbook = basename(playbook._file_name)

    def v2_playbook_on_play_start(self, play):
        self.play_name = play.get_name()

    def v2_runner_on_no_hosts(self, task):
        self.opentelemetry.start_task(
            self.tasks_data,
            self.hide_task_arguments,
            self.play_name,
            task
        )

    def v2_playbook_on_task_start(self, task, is_conditional):
        self.opentelemetry.start_task(
            self.tasks_data,
            self.hide_task_arguments,
            self.play_name,
            task
        )

    def v2_playbook_on_cleanup_task_start(self, task):
        self.opentelemetry.start_task(
            self.tasks_data,
            self.hide_task_arguments,
            self.play_name,
            task
        )

    def v2_playbook_on_handler_task_start(self, task):
        self.opentelemetry.start_task(
            self.tasks_data,
            self.hide_task_arguments,
            self.play_name,
            task
        )

    def v2_runner_on_failed(self, result, ignore_errors=False):
        if ignore_errors:
            status = 'ignored'
        else:
            status = 'failed'
            self.errors += 1

        self.opentelemetry.finish_task(
            self.tasks_data,
            status,
            result,
            self._dump_results(result._result)
        )

    def v2_runner_on_ok(self, result):
        self.opentelemetry.finish_task(
            self.tasks_data,
            'ok',
            result,
            self._dump_results(result._result)
        )

    def v2_runner_on_skipped(self, result):
        self.opentelemetry.finish_task(
            self.tasks_data,
            'skipped',
            result,
            self._dump_results(result._result)
        )

    def v2_playbook_on_include(self, included_file):
        self.opentelemetry.finish_task(
            self.tasks_data,
            'included',
            included_file,
            ""
        )

    def v2_playbook_on_stats(self, stats):
        if self.errors == 0:
            status = Status(status_code=StatusCode.OK)
        else:
            status = Status(status_code=StatusCode.ERROR)
        self.opentelemetry.generate_distributed_traces(
            self.otel_service_name,
            self.ansible_playbook,
            self.tasks_data,
            status,
            self.traceparent,
            self.disable_logs
        )

    def v2_runner_on_async_failed(self, result, **kwargs):
        self.errors += 1

Anon7 - 2022
AnonSec Team