Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 3.14.83.146
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Mon Sep 30 15:36:27 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /lib/python3/dist-packages/ansible_collections/cloud/common/plugins/module_utils/turbo/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /lib/python3/dist-packages/ansible_collections/cloud/common/plugins/module_utils/turbo/server.py
# Copyright (c) 2021 Red Hat
#
# This code is part of Ansible, but is an independent component.
# This particular file snippet, and this file snippet only, is BSD licensed.
# Modules you write using this snippet, which is embedded dynamically by Ansible
# still belong to the author of the module, and may assign their own license
# to the complete work.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
#    * Redistributions of source code must retain the above copyright
#      notice, this list of conditions and the following disclaimer.
#    * Redistributions in binary form must reproduce the above copyright notice,
#      this list of conditions and the following disclaimer in the documentation
#      and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
# IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
# USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
import argparse
import asyncio
from datetime import datetime
import importlib

# py38 only, See: https://github.com/PyCQA/pylint/issues/2976
import inspect  # pylint: disable=syntax-error
import io
import json

# py38 only, See: https://github.com/PyCQA/pylint/issues/2976
import collections  # pylint: disable=syntax-error
import os
import signal
import sys
import traceback
import zipfile
from zipimport import zipimporter
import pickle
import uuid

sys_path_lock = None
env_lock = None

import ansible.module_utils.basic

please_include_me = "bar"


def fork_process():
    """
    This function performs the double fork process to detach from the
    parent process and execute.
    """
    pid = os.fork()

    if pid == 0:
        fd = os.open(os.devnull, os.O_RDWR)

        # clone stdin/out/err
        for num in range(3):
            if fd != num:
                os.dup2(fd, num)

        if fd not in range(3):
            os.close(fd)

        pid = os.fork()
        if pid > 0:
            os._exit(0)

        # get new process session and detach
        sid = os.setsid()
        if sid == -1:
            raise Exception("Unable to detach session while daemonizing")

        # avoid possible problems with cwd being removed
        os.chdir("/")

        pid = os.fork()
        if pid > 0:
            sys.exit(0)  # pylint: disable=ansible-bad-function
    else:
        sys.exit(0)  # pylint: disable=ansible-bad-function
    return pid


class EmbeddedModule:
    def __init__(self, ansiblez_path, params):
        self.ansiblez_path = ansiblez_path
        self.collection_name, self.module_name = self.find_module_name()
        self.params = params
        self.module_class = None
        self.debug_mode = False
        self.module_path = (
            "ansible_collections.{collection_name}." "plugins.modules.{module_name}"
        ).format(collection_name=self.collection_name, module_name=self.module_name)

    def find_module_name(self):
        with zipfile.ZipFile(self.ansiblez_path) as zip:
            for path in zip.namelist():
                if not path.startswith("ansible_collections"):
                    continue
                if not path.endswith(".py"):
                    continue
                if path.endswith("__init__.py"):
                    continue
                splitted = path.split("/")
                if len(splitted) != 6:
                    continue
                if splitted[-3:-1] != ["plugins", "modules"]:
                    continue
                collection = ".".join(splitted[1:3])
                name = splitted[-1][:-3]
                return collection, name
        raise Exception("Cannot find module name")

    async def load(self):
        async with sys_path_lock:
            # Add the Ansiblez_path in sys.path
            sys.path.insert(0, self.ansiblez_path)

            # resettle the loaded modules that were associated
            # with a different Ansiblez.
            for path, module in sorted(tuple(sys.modules.items())):
                if path and module and path.startswith("ansible_collections"):
                    try:
                        prefix = sys.modules[path].__loader__.prefix
                    except AttributeError:
                        # Not from a zipimporter loader, skipping
                        continue
                    # Reload package modules only, to pick up new modules from
                    # packages that have been previously loaded.
                    if hasattr(sys.modules[path], "__path__"):
                        py_path = self.ansiblez_path + os.sep + prefix
                        my_loader = zipimporter(py_path)
                        sys.modules[path].__loader__ = my_loader
                        try:
                            importlib.reload(sys.modules[path])
                        except ModuleNotFoundError:
                            pass
            # Finally, load the plugin class.
            self.module_class = importlib.import_module(self.module_path)

    async def unload(self):
        async with sys_path_lock:
            sys.path = [i for i in sys.path if i != self.ansiblez_path]

    def create_profiler(self):
        if self.debug_mode:
            import cProfile

            pr = cProfile.Profile()
            pr.enable()
            return pr

    def print_profiling_info(self, pr):
        if self.debug_mode:
            import pstats

            sortby = pstats.SortKey.CUMULATIVE
            ps = pstats.Stats(pr).sort_stats(sortby)
            ps.print_stats(20)

    def print_backtrace(self, backtrace):
        if self.debug_mode:
            print(backtrace)  # pylint: disable=ansible-bad-function

    async def run(self):
        class FakeStdin:
            buffer = None

        from .exceptions import (
            EmbeddedModuleFailure,
            EmbeddedModuleUnexpectedFailure,
            EmbeddedModuleSuccess,
        )

        # monkeypatching to pass the argument to the module, this is not
        # really safe, and in the future, this will prevent us to run several
        # modules in parallel. We can maybe use a scoped monkeypatch instead
        _fake_stdin = FakeStdin()
        _fake_stdin.buffer = io.BytesIO(self.params.encode())
        sys.stdin = _fake_stdin
        # Trick to be sure ansible.module_utils.basic._load_params() won't
        # try to build the module parameters from the daemon arguments
        sys.argv = sys.argv[:1]
        ansible.module_utils.basic._ANSIBLE_ARGS = None
        pr = self.create_profiler()
        if not hasattr(self.module_class, "main"):
            raise EmbeddedModuleFailure("No main() found!")
        try:
            if inspect.iscoroutinefunction(self.module_class.main):
                await self.module_class.main()
            elif pr:
                pr.runcall(self.module_class.main)
            else:
                self.module_class.main()
        except EmbeddedModuleSuccess as e:
            self.print_profiling_info(pr)
            return e.kwargs
        except EmbeddedModuleFailure as e:
            backtrace = traceback.format_exc()
            self.print_backtrace(backtrace)
            raise
        except Exception as e:
            backtrace = traceback.format_exc()
            self.print_backtrace(backtrace)
            raise EmbeddedModuleUnexpectedFailure(str(backtrace))
        else:
            raise EmbeddedModuleUnexpectedFailure(
                "Likely a bug: exit_json() or fail_json() should be called during the module excution"
            )


async def run_as_lookup_plugin(data):
    errors = None
    from ansible.module_utils._text import to_native

    try:
        import ansible.plugins.loader as plugin_loader
        from ansible.parsing.dataloader import DataLoader
        from ansible.template import Templar

        (
            lookup_name,
            terms,
            variables,
            kwargs,
        ) = data

        # load lookup plugin
        templar = Templar(loader=DataLoader(), variables=None)
        ansible_collections = "ansible_collections."
        if lookup_name.startswith(ansible_collections):
            lookup_name = lookup_name.replace(ansible_collections, "", 1)
        ansible_plugins_lookup = ".plugins.lookup."
        if ansible_plugins_lookup in lookup_name:
            lookup_name = lookup_name.replace(ansible_plugins_lookup, ".", 1)

        instance = plugin_loader.lookup_loader.get(
            name=lookup_name, loader=templar._loader, templar=templar
        )

        if not hasattr(instance, "_run"):
            return [None, "No _run() found"]
        if inspect.iscoroutinefunction(instance._run):
            result = await instance._run(terms, variables=variables, **kwargs)
        else:
            result = instance._run(terms, variables=variables, **kwargs)
    except Exception as e:
        errors = to_native(e)
    return [result, errors]


async def run_as_module(content, debug_mode):
    from ansible_collections.cloud.common.plugins.module_utils.turbo.exceptions import (
        EmbeddedModuleFailure,
    )

    try:
        (
            ansiblez_path,
            params,
            env,
        ) = json.loads(content)
        if debug_mode:
            print(  # pylint: disable=ansible-bad-function
                f"-----\nrunning {ansiblez_path} with params: ¨{params}¨"
            )

        embedded_module = EmbeddedModule(ansiblez_path, params)
        if debug_mode:
            embedded_module.debug_mode = True

        await embedded_module.load()
        try:
            async with env_lock:
                os.environ.clear()
                os.environ.update(env)
                result = await embedded_module.run()
        except SystemExit:
            backtrace = traceback.format_exc()
            result = {"msg": str(backtrace), "failed": True}
        except EmbeddedModuleFailure as e:
            result = {"msg": str(e), "failed": True}
            if e.kwargs:
                result.update(e.kwargs)
        except Exception as e:
            result = {
                "msg": traceback.format_stack() + [str(e)],
                "failed": True,
            }
        await embedded_module.unload()
    except Exception as e:
        result = {"msg": traceback.format_stack() + [str(e)], "failed": True}
    return result


class AnsibleVMwareTurboMode:
    def __init__(self):
        self.sessions = collections.defaultdict(dict)
        self.socket_path = None
        self.ttl = None
        self.debug_mode = None
        self.jobs_ongoing = {}

    async def ghost_killer(self):
        while True:
            await asyncio.sleep(self.ttl)
            running_jobs = {
                job_id: start_date
                for job_id, start_date in self.jobs_ongoing.items()
                if (datetime.now() - start_date).total_seconds() < 3600
            }
            if running_jobs:
                continue
            self.stop()

    async def handle(self, reader, writer):
        self._watcher.cancel()
        self._watcher = self.loop.create_task(self.ghost_killer())
        job_id = str(uuid.uuid4())
        self.jobs_ongoing[job_id] = datetime.now()
        raw_data = await reader.read()
        if not raw_data:
            return

        (plugin_type, content) = pickle.loads(raw_data)

        def _terminate(result):
            writer.write(json.dumps(result).encode())
            writer.close()

        if plugin_type == "module":
            result = await run_as_module(content, debug_mode=self.debug_mode)
        elif plugin_type == "lookup":
            result = await run_as_lookup_plugin(content)
        _terminate(result)
        del self.jobs_ongoing[job_id]

    def handle_exception(self, loop, context):
        e = context.get("exception")
        traceback.print_exception(type(e), e, e.__traceback__)
        self.stop()

    def start(self):
        self.loop = asyncio.get_event_loop()
        self.loop.add_signal_handler(signal.SIGTERM, self.stop)
        self.loop.set_exception_handler(self.handle_exception)
        self._watcher = self.loop.create_task(self.ghost_killer())

        import sys

        if sys.hexversion >= 0x30A00B1:
            # py3.10 drops the loop argument of create_task.
            self.loop.create_task(
                asyncio.start_unix_server(self.handle, path=self.socket_path)
            )
        else:
            self.loop.create_task(
                asyncio.start_unix_server(
                    self.handle, path=self.socket_path, loop=self.loop
                )
            )
        self.loop.run_forever()

    def stop(self):
        os.unlink(self.socket_path)
        self.loop.stop()


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Start a background daemon.")
    parser.add_argument("--socket-path")
    parser.add_argument("--ttl", default=15, type=int)
    parser.add_argument("--fork", action="store_true")

    args = parser.parse_args()
    if args.fork:
        fork_process()
    sys_path_lock = asyncio.Lock()
    env_lock = asyncio.Lock()

    server = AnsibleVMwareTurboMode()
    server.socket_path = args.socket_path
    server.ttl = args.ttl
    server.debug_mode = not args.fork
    server.start()

Anon7 - 2022
AnonSec Team