Server IP : 85.214.239.14 / Your IP : 3.133.114.90 Web Server : Apache/2.4.62 (Debian) System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64 User : www-data ( 33) PHP Version : 7.4.18 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare, MySQL : OFF | cURL : OFF | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : OFF Directory : /lib/python3/dist-packages/ansible_collections/community/general/plugins/module_utils/ |
Upload File : |
# -*- coding: utf-8 -*- # Copyright (c) 2016, Peter Sagerson <psagers@ignorare.net> # Copyright (c) 2016, Jiri Tyr <jiri.tyr@gmail.com> # Copyright (c) 2017-2018 Keller Fuchs (@KellerFuchs) <kellerfuchs@hashbang.sh> # # GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt) # SPDX-License-Identifier: GPL-3.0-or-later from __future__ import absolute_import, division, print_function __metaclass__ = type import re import traceback from ansible.module_utils.common.text.converters import to_native try: import ldap import ldap.dn import ldap.filter import ldap.sasl HAS_LDAP = True SASCL_CLASS = { 'gssapi': ldap.sasl.gssapi, 'external': ldap.sasl.external, } except ImportError: HAS_LDAP = False def gen_specs(**specs): specs.update({ 'bind_dn': dict(), 'bind_pw': dict(default='', no_log=True), 'ca_path': dict(type='path'), 'dn': dict(required=True), 'referrals_chasing': dict(type='str', default='anonymous', choices=['disabled', 'anonymous']), 'server_uri': dict(default='ldapi:///'), 'start_tls': dict(default=False, type='bool'), 'validate_certs': dict(default=True, type='bool'), 'sasl_class': dict(choices=['external', 'gssapi'], default='external', type='str'), 'xorder_discovery': dict(choices=['enable', 'auto', 'disable'], default='auto', type='str'), }) return specs class LdapGeneric(object): def __init__(self, module): # Shortcuts self.module = module self.bind_dn = self.module.params['bind_dn'] self.bind_pw = self.module.params['bind_pw'] self.ca_path = self.module.params['ca_path'] self.referrals_chasing = self.module.params['referrals_chasing'] self.server_uri = self.module.params['server_uri'] self.start_tls = self.module.params['start_tls'] self.verify_cert = self.module.params['validate_certs'] self.sasl_class = self.module.params['sasl_class'] self.xorder_discovery = self.module.params['xorder_discovery'] # Establish connection self.connection = self._connect_to_ldap() if self.xorder_discovery == "enable" or (self.xorder_discovery == "auto" and not self._xorder_dn()): # Try to find the X_ORDERed version of the DN self.dn = self._find_dn() else: self.dn = self.module.params['dn'] def fail(self, msg, exn): self.module.fail_json( msg=msg, details=to_native(exn), exception=traceback.format_exc() ) def _find_dn(self): dn = self.module.params['dn'] explode_dn = ldap.dn.explode_dn(dn) if len(explode_dn) > 1: try: escaped_value = ldap.filter.escape_filter_chars(explode_dn[0]) filterstr = "(%s)" % escaped_value dns = self.connection.search_s(','.join(explode_dn[1:]), ldap.SCOPE_ONELEVEL, filterstr) if len(dns) == 1: dn, dummy = dns[0] except Exception: pass return dn def _connect_to_ldap(self): if not self.verify_cert: ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER) if self.ca_path: ldap.set_option(ldap.OPT_X_TLS_CACERTFILE, self.ca_path) connection = ldap.initialize(self.server_uri) if self.referrals_chasing == 'disabled': # Switch off chasing of referrals (https://github.com/ansible-collections/community.general/issues/1067) connection.set_option(ldap.OPT_REFERRALS, 0) if self.start_tls: try: connection.start_tls_s() except ldap.LDAPError as e: self.fail("Cannot start TLS.", e) try: if self.bind_dn is not None: connection.simple_bind_s(self.bind_dn, self.bind_pw) else: klass = SASCL_CLASS.get(self.sasl_class, ldap.sasl.external) connection.sasl_interactive_bind_s('', klass()) except ldap.LDAPError as e: self.fail("Cannot bind to the server.", e) return connection def _xorder_dn(self): # match X_ORDERed DNs regex = r"\w+=\{\d+\}.+" return re.match(regex, self.module.params['dn']) is not None