Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 3.144.235.195
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /lib/python3/dist-packages/ansible_collections/microsoft/ad/plugins/action/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /lib/python3/dist-packages/ansible_collections/microsoft/ad/plugins/action//debug_ldap_client.py
# Copyright (c) 2023 Ansible Project
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)

import importlib
import importlib.metadata
import typing as t
import traceback


from ansible.plugins.action import ActionBase
from ansible.utils.display import Display

display = Display()

try:
    import dns.resolver

    HAS_DNSRESOLVER = True
except Exception:
    HAS_DNSRESOLVER = False


try:
    import krb5

    HAS_KRB5 = True
except Exception:
    HAS_KRB5 = False


class ActionModule(ActionBase):
    def run(
        self,
        tmp: t.Optional[str] = None,
        task_vars: t.Optional[t.Dict[str, t.Any]] = None,
    ) -> t.Dict[str, t.Any]:
        self._supports_check_mode = True
        self._supports_async = True

        result = super().run(tmp=tmp, task_vars=task_vars)
        del tmp

        kerb_info = self._get_kerberos_info()

        dns_info: t.Dict[str, t.Any] = {}
        default_realm = kerb_info.get("default_realm", None)
        if default_realm:
            dns_info = self._get_server_lookup_info(default_realm)

        result.update(
            {
                "dns": dns_info,
                "kerberos": kerb_info,
                "packages": {
                    "dnspython": self._import_lib("dns.resolver", package_name="dnspython"),
                    "dpapi_ng": self._import_lib("dpapi_ng", package_name="dpapi-ng"),
                    "krb5": self._import_lib("krb5"),
                    "pyspnego": self._import_lib("spnego", package_name="pyspnego"),
                    "sansldap": self._import_lib("sansldap"),
                },
            }
        )

        return result

    def _get_kerberos_info(self) -> t.Dict[str, t.Any]:
        if not HAS_KRB5:
            return {}

        res: t.Dict[str, t.Any] = {
            "exception": None,
            "default_realm": None,
            "default_cc": {},
        }

        try:
            ctx = krb5.init_context()
        except Exception:
            res["exception"] = traceback.format_exc()

        if not ctx:
            return res

        try:
            res["default_realm"] = krb5.get_default_realm(ctx).decode("utf-8")
        except Exception:
            res["exception"] = traceback.format_exc()

        res["default_cc"] = self._get_kerberos_cc_info(ctx)

        return res

    def _get_kerberos_cc_info(
        self,
        ctx: "krb5.Context",
    ) -> t.Dict[str, t.Any]:
        creds: t.List[t.Dict[str, t.Any]] = []
        res: t.Dict[str, t.Any] = {
            "exception": None,
            "name": None,
            "principal": None,
            "creds": creds,
        }

        try:
            default_cc = krb5.cc_default(ctx)
        except Exception:
            res["exception"] = traceback.format_exc()

        if not default_cc:
            return res

        try:
            res["name"] = str(default_cc)
            res["principal"] = str(default_cc.principal)
            for cred in default_cc:
                # cred attrs added in krb5 0.5.0
                creds.append(
                    {
                        "client": str(getattr(cred, "client", "krb5 too old")),
                        "server": str(getattr(cred, "server", "krb5 too old")),
                    }
                )
        except Exception:
            res["exception"] = traceback.format_exc()

        return res

    def _get_server_lookup_info(
        self,
        default_realm: str,
    ) -> t.Dict[str, t.Any]:
        if not HAS_DNSRESOLVER:
            return {}

        records: t.List[t.Dict[str, t.Any]] = []
        res: t.Dict[str, t.Any] = {
            "exception": None,
            "default_server": None,
            "default_port": None,
            "records": records,
        }

        try:
            srv_record = f"_ldap._tcp.dc._msdcs.{default_realm}"

            for rec in dns.resolver.resolve(srv_record, "SRV"):
                records.append(
                    {
                        "target": str(rec.target),
                        "port": rec.port,
                        "weight": rec.weight,
                        "priority": rec.priority,
                    }
                )

            highest_record = next(iter(sorted(records, key=lambda k: (k["priority"], -k["weight"]))), None)
            if highest_record:
                res["default_server"] = highest_record["target"].rstrip(".")
                res["default_port"] = highest_record["port"]

        except Exception:
            res["exception"] = traceback.format_exc()

        return res

    def _import_lib(
        self,
        name: str,
        package_name: t.Optional[str] = None,
    ) -> str:
        try:
            importlib.import_module(name)
            return importlib.metadata.version(package_name or name)
        except Exception:
            return traceback.format_exc()

Anon7 - 2022
AnonSec Team