Server IP : 85.214.239.14 / Your IP : 3.140.185.250 Web Server : Apache/2.4.62 (Debian) System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64 User : www-data ( 33) PHP Version : 7.4.18 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare, MySQL : OFF | cURL : OFF | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : OFF Directory : /srv/modoboa/env/lib/python3.5/site-packages/modoboa/ldapsync/ |
Upload File : |
"""LDAP related functions.""" import ldap import ldap.modlist as modlist from django.conf import settings from django.utils.encoding import force_bytes, force_str from django.utils.translation import ugettext as _ from modoboa.core import models as core_models from modoboa.lib.email_utils import split_mailbox from modoboa.lib.exceptions import InternalError def create_connection(srv_address, srv_port, config, username, password): """Create a new connection with given server.""" uri = "{}:{}".format(srv_address, srv_port) uri = "{}://{}".format( "ldaps" if config["ldap_secured"] == "ssl" else "ldap", uri) conn = ldap.initialize(uri) conn.protocol_version = 3 conn.set_option(ldap.OPT_X_TLS_DEMAND, True) conn.set_option(ldap.OPT_DEBUG_LEVEL, 255) conn.set_option(ldap.OPT_REFERRALS, 0) conn.simple_bind_s( force_str(username if username else config["ldap_sync_bind_dn"]), force_str(password if password else config["ldap_sync_bind_password"]) ) return conn def get_connection(config, username=None, password=None): """Get a new connection to the LDAP directory.""" try: conn = create_connection( config["ldap_server_address"], config["ldap_server_port"], config, username, password ) except ldap.LDAPError: if not config["ldap_enable_secondary_server"]: raise conn = create_connection( config["ldap_secondary_server_address"], config["ldap_secondary_server_port"], config, username, password ) return conn def get_user_password(user, disable=False): """Return ready-to-use password from user instance.""" scheme, password = user.password.split("}") return ( force_bytes(scheme) + b"}" + b"#" if disable else b"" + force_bytes(password) ) def create_ldap_account(user, dn, conn): """Create new account.""" attrs = { "objectClass": [ force_bytes("inetOrgPerson"), force_bytes("organizationalPerson") ], "uid": [force_bytes(user.username)], "sn": [force_bytes(user.last_name)], "givenName": [force_bytes(user.first_name)], "cn": [force_bytes(user.username)], "displayName": [force_bytes(user.fullname)], "mail": [ force_bytes(user.email), force_bytes(user.secondary_email)], "homePhone": [force_bytes(user.phone_number)], } if user.password: scheme, password = user.password.split("}") attrs["userPassword"] = [get_user_password(user)] ldif = modlist.addModlist(attrs) try: conn.add_s(dn, ldif) except ldap.LDAPError as e: raise InternalError( _("Failed to create LDAP account: {}").format(e) ) def check_if_dn_exists(conn, dn): """Check if DN already exists in directory.""" try: res = conn.search_s( force_str(dn), ldap.SCOPE_SUBTREE, force_str("(&(objectClass=inetOrgPerson))") ) res = res[0][0] except ldap.LDAPError: return False return True def update_ldap_account(user, config): """Update existing account.""" dn = config["ldap_sync_account_dn_template"] % {"user": user.username} conn = get_connection(config) if not check_if_dn_exists(conn, dn): create_ldap_account(user, dn, conn) return ldif = [ (ldap.MOD_REPLACE, "uid", force_bytes(user.username)), (ldap.MOD_REPLACE, "sn", force_bytes(user.last_name)), (ldap.MOD_REPLACE, "givenName", force_bytes(user.first_name)), (ldap.MOD_REPLACE, "cn", force_bytes(user.username)), (ldap.MOD_REPLACE, "displayName", force_bytes(user.fullname)), (ldap.MOD_REPLACE, "mail", force_bytes(user.email)), (ldap.MOD_REPLACE, "homePhone", force_bytes(user.phone_number)), ] if user.password: password = get_user_password(user, not user.is_active) ldif.append((ldap.MOD_REPLACE, "userPassword", password)) try: conn.modify_s(dn, ldif) except ldap.LDAPError as e: raise InternalError( _("Failed to update LDAP account: {}").format(e)) def delete_ldap_account(user, config): """Delete remote LDAP account.""" dn = config["ldap_sync_account_dn_template"] % {"user": user.username} conn = get_connection(config) if not check_if_dn_exists(conn, dn): return if config["ldap_sync_delete_remote_account"]: try: conn.delete_s(dn) except ldap.LDAPError as e: raise InternalError( _("Failed to delete LDAP account: {}").format(e) ) else: password = get_user_password(user, True) ldif = [ (ldap.MOD_REPLACE, "userPassword", password) ] try: conn.modify_s(dn, ldif) except ldap.LDAPError as e: raise InternalError( _("Failed to disable LDAP account: {}").format(e)) def find_user_groups(conn, config, dn, entry): """Retrieve groups for given user.""" condition = ( config["ldap_is_active_directory"] or config["ldap_group_type"] == "groupofnames" ) if condition: flt = "(member={})".format(dn) elif config["ldap_group_type"] == "posixgroup": flt = "(memberUid={})".format(force_str(entry["uid"][0])) result = conn.search_s( config["ldap_groups_search_base"], ldap.SCOPE_SUBTREE, flt ) groups = [] for dn, entry in result: if not dn: continue groups.append(dn.split(',')[0].split('=')[1]) return groups def user_is_disabled(config, entry): """Check if LDAP user is disabled or not.""" if config["ldap_is_active_directory"]: if "userAccountControl" in entry: value = int(force_str(entry["userAccountControl"][0])) return value == 514 # FIXME: is there a way to detect a disabled user with OpenLDAP? return False def import_accounts_from_ldap(config): """Import user accounts from LDAP directory.""" conn = get_connection(config) result = conn.search_s( config["ldap_import_search_base"], ldap.SCOPE_SUBTREE, config["ldap_import_search_filter"] ) admin_groups = config["ldap_admin_groups"].split(";") for dn, entry in result: if dn is None: continue role = "SimpleUsers" groups = find_user_groups(conn, config, dn, entry) for grp in admin_groups: if grp.strip() in groups: role = "DomainAdmins" break username = force_str(entry[config["ldap_import_username_attr"]][0]) lpart, domain = split_mailbox(username) if domain is None: # Try to find associated email email = None for attr in ["mail", "userPrincipalName"]: if attr in entry: email = force_str(entry[attr][0]) break if email is None: if grp == "SimpleUsers": print("Skipping {} because no email found".format(dn)) continue else: username = email defaults = { "username": username.lower(), "is_local": False, "language": settings.LANGUAGE_CODE } user, created = core_models.User.objects.get_or_create( username__iexact=username, defaults=defaults ) if created: core_models.populate_callback(user, role) attr_map = { "first_name": "givenName", "email": "mail", "last_name": "sn" } for attr, ldap_attr in attr_map.items(): if ldap_attr in entry: setattr(user, attr, force_str(entry[ldap_attr][0])) user.is_active = not user_is_disabled(config, entry) user.save() # FIXME: handle delete and rename operations? def build_ldap_uri(config, node=""): """ Building LDAP uris for dovecot conf """ if node: node += "_" return "{}://{}:{}".format( "ldaps" if config["ldap_secured"] == "ssl" else "ldap", config["ldap_{}server_address".format(node)], config["ldap_{}server_port".format(node)] ) def update_dovecot_config_file(config): """Update dovecot configuration file from LDAP parameters.""" conf_file = config["ldap_dovecot_conf_file"] # Hosts conf uris = build_ldap_uri(config) if config["ldap_enable_secondary_server"]: uris += " " + build_ldap_uri(config, "secondary") # Auth conf bind_dn = config["ldap_bind_dn"] bind_pwd = config["ldap_bind_password"] # Search conf base = config["ldap_search_base"] user_filter = config["ldap_search_filter"].replace("(user)s", "u") with open(conf_file, "w") as fp: fp.write("""uris = {uris} dn = "{bind_dn}" dnpass = '{bind_pwd}' base = {base} user_filter = {user_filter} pass_filter = {pass_filter} """.format(uris=uris, bind_dn=bind_dn, bind_pwd=bind_pwd, base=base, user_filter=user_filter, pass_filter=user_filter) )