Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 18.224.62.198
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /lib/python3/dist-packages/ansible_collections/community/general/plugins/callback/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /lib/python3/dist-packages/ansible_collections/community/general/plugins/callback/elastic.py
# Copyright (c) 2021, Victor Martinez <VictorMartinezRubio@gmail.com>
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later

from __future__ import (absolute_import, division, print_function)
__metaclass__ = type

DOCUMENTATION = '''
    author: Victor Martinez (@v1v)  <VictorMartinezRubio@gmail.com>
    name: elastic
    type: notification
    short_description: Create distributed traces for each Ansible task in Elastic APM
    version_added: 3.8.0
    description:
      - This callback creates distributed traces for each Ansible task in Elastic APM.
      - You can configure the plugin with environment variables.
      - See U(https://www.elastic.co/guide/en/apm/agent/python/current/configuration.html).
    options:
      hide_task_arguments:
        default: false
        type: bool
        description:
          - Hide the arguments for a task.
        env:
          - name: ANSIBLE_OPENTELEMETRY_HIDE_TASK_ARGUMENTS
      apm_service_name:
        default: ansible
        type: str
        description:
          - The service name resource attribute.
        env:
          - name: ELASTIC_APM_SERVICE_NAME
      apm_server_url:
        type: str
        description:
          - Use the APM server and its environment variables.
        env:
          - name: ELASTIC_APM_SERVER_URL
      apm_secret_token:
        type: str
        description:
          - Use the APM server token
        env:
          - name: ELASTIC_APM_SECRET_TOKEN
      apm_api_key:
        type: str
        description:
          - Use the APM API key
        env:
          - name: ELASTIC_APM_API_KEY
      apm_verify_server_cert:
        default: true
        type: bool
        description:
          - Verifies the SSL certificate if an HTTPS connection.
        env:
          - name: ELASTIC_APM_VERIFY_SERVER_CERT
      traceparent:
        type: str
        description:
          - The L(W3C Trace Context header traceparent,https://www.w3.org/TR/trace-context-1/#traceparent-header).
        env:
          - name: TRACEPARENT
    requirements:
      - elastic-apm (Python library)
'''


EXAMPLES = '''
examples: |
  Enable the plugin in ansible.cfg:
    [defaults]
    callbacks_enabled = community.general.elastic

  Set the environment variable:
    export ELASTIC_APM_SERVER_URL=<your APM server URL)>
    export ELASTIC_APM_SERVICE_NAME=your_service_name
    export ELASTIC_APM_API_KEY=your_APM_API_KEY
'''

import getpass
import socket
import time
import uuid

from collections import OrderedDict
from os.path import basename

from ansible.errors import AnsibleError, AnsibleRuntimeError
from ansible.module_utils.six import raise_from
from ansible.plugins.callback import CallbackBase

try:
    from elasticapm import Client, capture_span, trace_parent_from_string, instrument, label
except ImportError as imp_exc:
    ELASTIC_LIBRARY_IMPORT_ERROR = imp_exc
else:
    ELASTIC_LIBRARY_IMPORT_ERROR = None


class TaskData:
    """
    Data about an individual task.
    """

    def __init__(self, uuid, name, path, play, action, args):
        self.uuid = uuid
        self.name = name
        self.path = path
        self.play = play
        self.host_data = OrderedDict()
        self.start = time.time()
        self.action = action
        self.args = args

    def add_host(self, host):
        if host.uuid in self.host_data:
            if host.status == 'included':
                # concatenate task include output from multiple items
                host.result = '%s\n%s' % (self.host_data[host.uuid].result, host.result)
            else:
                return

        self.host_data[host.uuid] = host


class HostData:
    """
    Data about an individual host.
    """

    def __init__(self, uuid, name, status, result):
        self.uuid = uuid
        self.name = name
        self.status = status
        self.result = result
        self.finish = time.time()


class ElasticSource(object):
    def __init__(self, display):
        self.ansible_playbook = ""
        self.ansible_version = None
        self.session = str(uuid.uuid4())
        self.host = socket.gethostname()
        try:
            self.ip_address = socket.gethostbyname(socket.gethostname())
        except Exception as e:
            self.ip_address = None
        self.user = getpass.getuser()

        self._display = display

    def start_task(self, tasks_data, hide_task_arguments, play_name, task):
        """ record the start of a task for one or more hosts """

        uuid = task._uuid

        if uuid in tasks_data:
            return

        name = task.get_name().strip()
        path = task.get_path()
        action = task.action
        args = None

        if not task.no_log and not hide_task_arguments:
            args = ', '.join(('%s=%s' % a for a in task.args.items()))

        tasks_data[uuid] = TaskData(uuid, name, path, play_name, action, args)

    def finish_task(self, tasks_data, status, result):
        """ record the results of a task for a single host """

        task_uuid = result._task._uuid

        if hasattr(result, '_host') and result._host is not None:
            host_uuid = result._host._uuid
            host_name = result._host.name
        else:
            host_uuid = 'include'
            host_name = 'include'

        task = tasks_data[task_uuid]

        if self.ansible_version is None and result._task_fields['args'].get('_ansible_version'):
            self.ansible_version = result._task_fields['args'].get('_ansible_version')

        task.add_host(HostData(host_uuid, host_name, status, result))

    def generate_distributed_traces(self, tasks_data, status, end_time, traceparent, apm_service_name,
                                    apm_server_url, apm_verify_server_cert, apm_secret_token, apm_api_key):
        """ generate distributed traces from the collected TaskData and HostData """

        tasks = []
        parent_start_time = None
        for task_uuid, task in tasks_data.items():
            if parent_start_time is None:
                parent_start_time = task.start
            tasks.append(task)

        apm_cli = self.init_apm_client(apm_server_url, apm_service_name, apm_verify_server_cert, apm_secret_token, apm_api_key)
        if apm_cli:
            instrument()  # Only call this once, as early as possible.
            if traceparent:
                parent = trace_parent_from_string(traceparent)
                apm_cli.begin_transaction("Session", trace_parent=parent, start=parent_start_time)
            else:
                apm_cli.begin_transaction("Session", start=parent_start_time)
            # Populate trace metadata attributes
            if self.ansible_version is not None:
                label(ansible_version=self.ansible_version)
            label(ansible_session=self.session, ansible_host_name=self.host, ansible_host_user=self.user)
            if self.ip_address is not None:
                label(ansible_host_ip=self.ip_address)

            for task_data in tasks:
                for host_uuid, host_data in task_data.host_data.items():
                    self.create_span_data(apm_cli, task_data, host_data)

            apm_cli.end_transaction(name=__name__, result=status, duration=end_time - parent_start_time)

    def create_span_data(self, apm_cli, task_data, host_data):
        """ create the span with the given TaskData and HostData """

        name = '[%s] %s: %s' % (host_data.name, task_data.play, task_data.name)

        message = "success"
        status = "success"
        enriched_error_message = None
        if host_data.status == 'included':
            rc = 0
        else:
            res = host_data.result._result
            rc = res.get('rc', 0)
            if host_data.status == 'failed':
                message = self.get_error_message(res)
                enriched_error_message = self.enrich_error_message(res)
                status = "failure"
            elif host_data.status == 'skipped':
                if 'skip_reason' in res:
                    message = res['skip_reason']
                else:
                    message = 'skipped'
                status = "unknown"

        with capture_span(task_data.name,
                          start=task_data.start,
                          span_type="ansible.task.run",
                          duration=host_data.finish - task_data.start,
                          labels={"ansible.task.args": task_data.args,
                                  "ansible.task.message": message,
                                  "ansible.task.module": task_data.action,
                                  "ansible.task.name": name,
                                  "ansible.task.result": rc,
                                  "ansible.task.host.name": host_data.name,
                                  "ansible.task.host.status": host_data.status}) as span:
            span.outcome = status
            if 'failure' in status:
                exception = AnsibleRuntimeError(message="{0}: {1} failed with error message {2}".format(task_data.action, name, enriched_error_message))
                apm_cli.capture_exception(exc_info=(type(exception), exception, exception.__traceback__), handled=True)

    def init_apm_client(self, apm_server_url, apm_service_name, apm_verify_server_cert, apm_secret_token, apm_api_key):
        if apm_server_url:
            return Client(service_name=apm_service_name,
                          server_url=apm_server_url,
                          verify_server_cert=False,
                          secret_token=apm_secret_token,
                          api_key=apm_api_key,
                          use_elastic_traceparent_header=True,
                          debug=True)

    @staticmethod
    def get_error_message(result):
        if result.get('exception') is not None:
            return ElasticSource._last_line(result['exception'])
        return result.get('msg', 'failed')

    @staticmethod
    def _last_line(text):
        lines = text.strip().split('\n')
        return lines[-1]

    @staticmethod
    def enrich_error_message(result):
        message = result.get('msg', 'failed')
        exception = result.get('exception')
        stderr = result.get('stderr')
        return ('message: "{0}"\nexception: "{1}"\nstderr: "{2}"').format(message, exception, stderr)


class CallbackModule(CallbackBase):
    """
    This callback creates distributed traces with Elastic APM.
    """

    CALLBACK_VERSION = 2.0
    CALLBACK_TYPE = 'notification'
    CALLBACK_NAME = 'community.general.elastic'
    CALLBACK_NEEDS_ENABLED = True

    def __init__(self, display=None):
        super(CallbackModule, self).__init__(display=display)
        self.hide_task_arguments = None
        self.apm_service_name = None
        self.ansible_playbook = None
        self.traceparent = False
        self.play_name = None
        self.tasks_data = None
        self.errors = 0
        self.disabled = False

        if ELASTIC_LIBRARY_IMPORT_ERROR:
            raise_from(
                AnsibleError('The `elastic-apm` must be installed to use this plugin'),
                ELASTIC_LIBRARY_IMPORT_ERROR)

        self.tasks_data = OrderedDict()

        self.elastic = ElasticSource(display=self._display)

    def set_options(self, task_keys=None, var_options=None, direct=None):
        super(CallbackModule, self).set_options(task_keys=task_keys,
                                                var_options=var_options,
                                                direct=direct)

        self.hide_task_arguments = self.get_option('hide_task_arguments')

        self.apm_service_name = self.get_option('apm_service_name')
        if not self.apm_service_name:
            self.apm_service_name = 'ansible'

        self.apm_server_url = self.get_option('apm_server_url')
        self.apm_secret_token = self.get_option('apm_secret_token')
        self.apm_api_key = self.get_option('apm_api_key')
        self.apm_verify_server_cert = self.get_option('apm_verify_server_cert')
        self.traceparent = self.get_option('traceparent')

    def v2_playbook_on_start(self, playbook):
        self.ansible_playbook = basename(playbook._file_name)

    def v2_playbook_on_play_start(self, play):
        self.play_name = play.get_name()

    def v2_runner_on_no_hosts(self, task):
        self.elastic.start_task(
            self.tasks_data,
            self.hide_task_arguments,
            self.play_name,
            task
        )

    def v2_playbook_on_task_start(self, task, is_conditional):
        self.elastic.start_task(
            self.tasks_data,
            self.hide_task_arguments,
            self.play_name,
            task
        )

    def v2_playbook_on_cleanup_task_start(self, task):
        self.elastic.start_task(
            self.tasks_data,
            self.hide_task_arguments,
            self.play_name,
            task
        )

    def v2_playbook_on_handler_task_start(self, task):
        self.elastic.start_task(
            self.tasks_data,
            self.hide_task_arguments,
            self.play_name,
            task
        )

    def v2_runner_on_failed(self, result, ignore_errors=False):
        self.errors += 1
        self.elastic.finish_task(
            self.tasks_data,
            'failed',
            result
        )

    def v2_runner_on_ok(self, result):
        self.elastic.finish_task(
            self.tasks_data,
            'ok',
            result
        )

    def v2_runner_on_skipped(self, result):
        self.elastic.finish_task(
            self.tasks_data,
            'skipped',
            result
        )

    def v2_playbook_on_include(self, included_file):
        self.elastic.finish_task(
            self.tasks_data,
            'included',
            included_file
        )

    def v2_playbook_on_stats(self, stats):
        if self.errors == 0:
            status = "success"
        else:
            status = "failure"
        self.elastic.generate_distributed_traces(
            self.tasks_data,
            status,
            time.time(),
            self.traceparent,
            self.apm_service_name,
            self.apm_server_url,
            self.apm_verify_server_cert,
            self.apm_secret_token,
            self.apm_api_key
        )

    def v2_runner_on_async_failed(self, result, **kwargs):
        self.errors += 1

Anon7 - 2022
AnonSec Team