Server IP : 85.214.239.14 / Your IP : 3.137.166.174 Web Server : Apache/2.4.62 (Debian) System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64 User : www-data ( 33) PHP Version : 7.4.18 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare, MySQL : OFF | cURL : OFF | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : OFF Directory : /proc/3/task/3/root/srv/automx/automx-master/src/automx/ |
Upload File : |
""" automx - auto configuration service Copyright (c) 2011-2013 [*] sys4 AG This program is free software: you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program. If not, see <http://www.gnu.org/licenses/>. """ from __future__ import absolute_import from __future__ import division from __future__ import print_function from __future__ import unicode_literals import os import sys import shlex import re import logging # noinspection PyCompatibility import ipaddress try: import configparser except ImportError: # noinspection PyPep8Naming import ConfigParser as configparser try: # noinspection PyUnresolvedReferences import memcache use_memcache = True except ImportError: use_memcache = False # noinspection PyCompatibility from configparser import NoOptionError, NoSectionError from dateutil import parser from collections import OrderedDict # noinspection PyCompatibility from builtins import dict, int, str __version__ = '1.1.1' __author__ = "Christian Roessner, Patrick Ben Koetter" __copyright__ = "Copyright (c) 2011-2015 [*] sys4 AG" # List of boolean words that have the meaning "true" TRUE = ('1', 'y', 'yes', 't', 'true', 'on') class DataNotFoundException(Exception): pass class ConfigNotFoundException(Exception): pass class Config(configparser.RawConfigParser): """ This class creates the internal data structure that is completely independend from the view. It may query different backends to gather all required information needed to generate XML output later on in the view class. It uses a OrderdDict to guarentee the correct service order that is needed in the XML output. This said means that it is a difference, if a service like IMAP is configured before POP3 or upside down, because a MUA follows this order. The class currently support smtp, pop, imap, carddav, caldav and ox services. The class currently supports the following backends: -> global - This backend tells automx to use the global section -> static - all kind of service information that can be sent directly to the MUA -> filter - This backend can execute commands and collects results from stdout. The result may be "", which means we skip further searching. It may return data, which should point to a section that we try to follow. -> ldap - Read all kind of information from LDAP servers. The result attributes are stored in an internal dictionary and if options later on in this backend section (is read as static backend) do contain variables in the form ${attributename}, these are expanded to the collected data. -> sql - Read all kind of information from SQL servers. The result attributes are stored in an internal dictionary. See ldpa -> script - Execute a script and split a result into attributes, which are stored in an internal dictionary, See ldap -> file - Provide static files. If present, all collected data are discarded and only the static file is sent to the remote client. This may change in future releases. Note: There may exist a DEFAULT section that is appended to _all_ sections in the configuration file. That said you can do really complex configurations that on the other hand make life easier. This section also may contain variables, which, if found in the vars-dictionary, are used. """ def __init__(self, environ): # noinspection PyCallByClass,PyTypeChecker configparser.RawConfigParser.__init__(self, defaults=None, dict_type=OrderedDict) found_conf = False conf_files = list(["/usr/local/etc/automx.conf", "/etc/automx.conf"]) conf = None for conf in iter(conf_files): if os.path.exists(conf): found_conf = True break if not found_conf: raise ConfigNotFoundException("No configuration files found:" "%s, %s" % (conf_files[0], conf_files[1])) self.read(conf) if not self.has_section("automx"): raise Exception("Missing section 'automx'") if self.has_option("automx", "logfile"): self.logfile = self.get("automx", "logfile") else: self.logfile = None if self.has_option("automx", "debug"): self.debug = self.getboolean("automx", "debug") else: self.debug = False # We need a home directory for the OpenSSL-rand file if self.has_option("automx", "homedir"): os.environ["HOME"] = self.get("automx", "homedir") else: os.environ["HOME"] = "/var/automx" self.memcache = Memcache(self, environ) # defaults self.__emailaddress = "" self.__cn = "" self.__password = "" self.__search_domain = "" self.__automx = dict() # domain individual settings (overwrites some or all defaults) self.__domain = OrderedDict() # if we use dynamic backends, we might earn variables self.__vars = dict() def configure(self, emailaddress, cn=None, password=None): if emailaddress is None: return OrderedDict() # Full email address containing local part _and_ domain self.__emailaddress = emailaddress # Mobileconfig if cn is not None: self.__cn = cn if password is not None: self.__password = password # The domain that is searched in the config file domain = emailaddress.split("@")[1] self.__search_domain = domain try: provider = self.get("automx", "provider") # provider must be a domainname pattern = "^[0-9a-zA-Z.-]+[a-zA-Z]{2,9}$" prog = re.compile(pattern) result = prog.match(provider) if result is not None: self.__automx["provider"] = result.group(0) else: logging.error("<provider> setting broken!") self.__automx["provider"] = "provider.broken" tmp = self.create_list(self.get("automx", "domains")) self.__automx["domains"] = tmp except TypeError: raise Exception("Missing options in section automx") try: self.__automx["openssl"] = self.get("automx", "openssl") except (NoSectionError, NoOptionError): self.__automx["openssl"] = "/usr/bin/openssl" # if a domain has its own section, use settings from it cmp_domains = [dom.lower() for dom in self.__automx["domains"]] if (domain.lower() in iter(cmp_domains) or self.__automx["domains"][0] == "*"): cmp_sections = [dom.lower() for dom in self.sections()] if domain.lower() in iter(cmp_sections): self.__eval_options(domain) else: if self.has_section("global"): self.__eval_options("global") else: raise Exception("Missing section 'global'") # we need to use default values from config file self.__domain = self.__replace_makro(self.__domain) def __eval_options(self, section, backend=None): settings = self.__domain settings["domain"] = self.__search_domain settings["emailaddress"] = self.__emailaddress section = self.__find_section(section) if self.has_option(section, "backend"): if backend is None: try: backend = self.get(section, "backend") except NoOptionError: raise Exception("Missing option <backend>") if backend in ("static", "static_append"): for opt in iter(self.options(section)): if opt in ("action", "account_type", "account_name", "account_name_short", "display_name", "server_url", "server_name"): tmp = self.get(section, opt) result = self.__expand_vars(tmp) result = self.__replace_makro(result) settings[opt] = result elif opt == "smtp": service = self.__service(section, "smtp") elif opt == "imap": service = self.__service(section, "imap") elif opt == "pop": service = self.__service(section, "pop") elif opt == "carddav": service = self.__service(section, "carddav") elif opt == "caldav": service = self.__service(section, "caldav") elif opt == "ox": service = self.__service(section, "ox") elif opt == "sign_mobileconfig": try: settings[opt] = self.getboolean(section, opt) except (NoSectionError, NoOptionError, ValueError): logging.error("%s is not boolean!" % opt) settings[opt] = False elif opt in ("sign_cert", "sign_key", "sign_more_certs"): result = self.get(section, opt) if os.path.exists(result): settings[opt] = result else: logging.error("%s cannot read %s" % (opt, result)) else: pass if opt in ("smtp", "imap", "pop", "caldav", "carddav", "ox"): if backend == "static_append": if opt in settings: if self.debug: logging.debug("APPEND %s" % service) settings[opt].append(service) else: if self.debug: logging.debug("APPEND NEW %s" % service) settings[opt] = [service] else: # do not include empty services if len(service) != 0: if self.debug: logging.debug("STATIC %s" % service) service_category = OrderedDict() service_category[opt] = [service] settings.update(service_category) # always follow at the end! if "follow" in self.options(section): tmp = self.get(section, "follow") result = self.__expand_vars(tmp) result = self.__replace_makro(result) self.__eval_options(result) elif backend in ("ldap", "ldap_append"): try: import ldap import ldap.sasl except: raise Exception("python ldap missing") ldap_cfg = dict(host="ldap://127.0.0.1/", base="", bindmethod="simple", binddn=None, bindpw=None, saslmech=None, authzid="", filter="(objectClass=*)", result_attrs=[], scope="sub", usetls="no", cipher="TLSv1", reqcert="never", cert=None, key=None, cacert=None) tls = False sasl = False for opt in iter(self.options(section)): if opt in ("host", "base", "bindmethod", "binddn", "bindpw", "saslmech", "authzid", "filter", "result_attrs", "scope", "usetls", "cipher", "reqcert", "cert", "key", "cacert"): result = self.get(section, opt) if opt in ("host", "result_attrs"): result = self.create_list(result) ldap_cfg[opt] = result # Do we connect with TLS? reqcert = None if ldap_cfg["usetls"].strip().lower() in TRUE: if ldap_cfg["reqcert"] in ("never", "allow", "try", "demand"): rc = ldap_cfg["reqcert"] if rc == "never": reqcert = ldap.OPT_X_TLS_NEVER elif rc == "allow": reqcert = ldap.OPT_X_TLS_ALLOW elif rc == "try": reqcert = ldap.OPT_X_TLS_TRY elif rc == "demand": reqcert = ldap.OPT_X_TLS_DEMAND ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, reqcert) ldap.set_option(ldap.OPT_X_TLS_CIPHER_SUITE, ldap_cfg["cipher"]) if ldap_cfg["cacert"] is not None: ldap.set_option(ldap.OPT_X_TLS_CACERTFILE, ldap_cfg["cacert"]) if ldap_cfg["cert"] is not None: ldap.set_option(ldap.OPT_X_TLS_CERTFILE, ldap_cfg["cert"]) if ldap_cfg["key"] is not None: ldap.set_option(ldap.OPT_X_TLS_KEYFILE, ldap_cfg["key"]) tls = True # Are we SASL binding to our servers? auth_tokens = None if ldap_cfg["bindmethod"] == "sasl": mech = ldap_cfg["saslmech"] if mech is not None: if mech.lower() == "digest-md5": auth_tokens = ldap.sasl.digest_md5( ldap_cfg["binddn"], ldap_cfg["bindpw"]) elif mech.lower() == "cram-md5": auth_tokens = ldap.sasl.cram_md5( ldap_cfg["binddn"], ldap_cfg["bindpw"]) elif mech.lower() == "external": auth_tokens = ldap.sasl.external( ldap_cfg["authzid"]) elif mech.lower() == "gssapi": auth_tokens = ldap.sasl.gssapi(ldap_cfg["authzid"]) sasl = True con = None for server in iter(ldap_cfg["host"]): try: con = ldap.initialize(server) if tls: con.start_tls_s() if sasl: con.sasl_interactive_bind_s("", auth_tokens) else: con.simple_bind_s(ldap_cfg["binddn"], ldap_cfg["bindpw"]) except Exception as e: logging.error("LDAP: %s" % e) continue break scope = None if con is not None: if ldap_cfg["scope"] in ("sub", "subtree"): scope = ldap.SCOPE_SUBTREE elif ldap_cfg["scope"] in ("one", "onelevel"): scope = ldap.SCOPE_ONELEVEL elif ldap_cfg["scope"] in ("base", "exact"): scope = ldap.SCOPE_BASE s_filter = self.__replace_makro(ldap_cfg["filter"]) rid = con.search(ldap_cfg["base"], scope, s_filter, ldap_cfg["result_attrs"]) raw_res = con.result(rid, True, 60) if raw_res[0] is None: con.abandon(rid) raise Exception("LDAP server timeout reached") # connection established, we have results self.__vars = dict() # we did not receive data from LDAP if raw_res[1]: for entry in raw_res[1]: for key, value in list(entry[1].items()): # result attributes might be multi values, but # we only accept the first value. self.__vars[key] = str(value[0].decode("utf-8")) else: logging.warning("No LDAP result from server!") raise DataNotFoundException try: con.unbind() except ldap.LDAPError: pass if backend == "ldap": self.__eval_options(section, backend="static") else: self.__eval_options(section, backend="static_append") elif backend in ("sql", "sql_append"): try: # noinspection PyPackageRequirements from sqlalchemy.engine import create_engine except: raise Exception("python sqlalchemy missing") sql_cfg = dict(host=None, query="", result_attrs=[]) for opt in iter(self.options(section)): if opt in ("host", "result_attrs"): result = self.create_list(self.get(section, opt)) sql_cfg[opt] = result if self.has_option(section, "query"): query = self.get(section, "query") sql_cfg["query"] = self.__replace_makro(query) else: raise Exception("Missing option <query>") for con in iter(sql_cfg["host"]): try: engine = create_engine(con) connection = engine.connect() except Exception as e: logging.error("SQL: %s" % e) continue result = connection.execute(sql_cfg["query"]) for row in result: keys = list(row.keys()) for key in iter(keys): if key in iter(sql_cfg["result_attrs"]): self.__vars[key] = row[key] # Implicit LIMIT 1 here break else: logging.warning("No SQL result from server!") connection.close() raise DataNotFoundException connection.close() break if backend == "sql": self.__eval_options(section, backend="static") else: self.__eval_options(section, backend="static_append") elif backend in ("file", "file_append"): for opt in iter(self.options(section)): if opt in ("autoconfig", "autodiscover", "mobileconfig"): tmp = self.get(section, opt) result = self.__expand_vars(tmp) if os.path.exists(result): settings[opt] = result if backend == "file": self.__eval_options(section, backend="static") else: self.__eval_options(section, backend="static_append") elif backend in ("script", "script_append"): if self.has_option(section, "script"): script_args = self.get(section, "script") else: raise Exception("Missing option <script>") if self.has_option(section, "result_attrs"): result_attrs = self.create_list(self.get(section, "result_attrs")) else: raise Exception("Missing option <result_attrs>") separator = None if self.has_option(section, "separator"): separator = self.get(section, "separator") cmd = shlex.split(script_args) for i, item in enumerate(cmd): cmd[i] = self.__replace_makro(item) stdout_fd = sys.__stdout__.fileno() pipe_in, pipe_out = os.pipe() pid = os.fork() recv = None result = None if pid == 0: # child os.close(pipe_in) os.dup2(pipe_out, stdout_fd) os.execvp(cmd[0], cmd) raise Exception("ERROR in execvp()") elif pid > 0: # parent os.close(pipe_out) recv = os.read(pipe_in, 1024) result = os.waitpid(pid, 0) # check return code if result[1]: raise Exception("ERROR while calling script", result, recv.strip()) if len(recv) == 0: logging.warning("No result from script!") raise DataNotFoundException result = recv.strip().split(separator, len(result_attrs)) for i in range(min(len(result_attrs), len(result))): self.__vars[result_attrs[i]] = result[i].strip() if backend == "script": self.__eval_options(section, backend="static") else: self.__eval_options(section, backend="static_append") # backends beyond this line do not have a follow option # elif backend == "filter": if self.has_option(section, "section_filter"): tmp = self.create_list(self.get(section, "section_filter")) special_opts = tmp got_data = False for special_opt in iter(special_opts): if self.has_option(section, special_opt): cmd = shlex.split(self.get(section, special_opt)) for i, item in enumerate(cmd): cmd[i] = self.__replace_makro(item) stdout_fd = sys.__stdout__.fileno() pipe_in, pipe_out = os.pipe() pid = os.fork() if pid == 0: # child os.close(pipe_in) os.dup2(pipe_out, stdout_fd) os.execvp(cmd[0], cmd) raise Exception("ERROR in execvp()") elif pid > 0: # parent os.close(pipe_out) recv = os.read(pipe_in, 1024) result = os.waitpid(pid, 0) else: continue # check return code if result[1] != 0: raise Exception("ERROR while calling filter", result, recv.strip()) else: new_emailaddress = recv.strip() # The result seems not to be an email address if '@' not in new_emailaddress: continue if self.debug: logging.debug("Email address from filter: %s" % new_emailaddress) got_data = True # we replace our search_domain self.__search_domain = special_opt self.__emailaddress = new_emailaddress # recurse again, because we now have a new section # that we need to scan self.__eval_options(special_opt) # we already got a result. Do not continue! break if not got_data: raise DataNotFoundException elif backend == "global": if self.has_section("global"): self.__eval_options("global") self.__replace_makro(settings) else: raise Exception("Missing section 'global'") else: raise Exception("Unknown backend specified") def __service(self, section, service): # This method only stores meta information. The results depend on # the MUA xml schema specification and is defined in the Viewer-class proto_settings = OrderedDict() if (self.__expand_vars(self.get(section, service)).strip().lower() in TRUE): if self.has_option(section, service + "_server"): opt = service + "_server" result = self.__expand_vars(self.get(section, opt)) proto_settings[opt] = self.__replace_makro(result) if self.has_option(section, service + "_port"): opt = service + "_port" result = self.__expand_vars(self.get(section, opt)) proto_settings[opt] = result if self.has_option(section, service + "_encryption"): opt = service + "_encryption" result = self.__expand_vars(self.get(section, opt)) if result.lower() == "none": result = "none" elif result.lower() == "ssl": result = "ssl" elif result.lower() == "starttls": result = "starttls" elif result.lower() == "auto": result = "auto" proto_settings[opt] = result if self.has_option(section, service + "_auth"): opt = service + "_auth" result = self.__expand_vars(self.get(section, opt)) if result.lower() == "plaintext": result = "cleartext" elif result.lower() == "encrypted": result = "encrypted" elif result.lower() == "ntlm": result = "ntlm" elif result.lower() == "gssapi": result = "gssapi" elif result.lower() == "client-ip-address": result = "client-ip-address" elif result.lower() == "tls-client-cert": result = "tls-client-cert" elif result.lower() == "none": result = "none" elif result.lower() == "smtp-after-pop": if service == "smtp": result = "smtp-after-pop" # TODO: we allow bogus keys/values. proto_settings[opt] = result if self.has_option(section, service + "_auth_identity"): opt = service + "_auth_identity" result = self.__expand_vars(self.get(section, opt)) proto_settings[opt] = self.__replace_makro(result) else: emaillocalpart = self.__replace_makro("%u") proto_settings[service + "_auth_identity"] = emaillocalpart if self.has_option(section, service + "_expiration_date"): opt = service + "_expiration_date" result = self.__expand_vars(self.get(section, opt)) dt = parser.parse(result, fuzzy=True) proto_settings[opt] = dt.strftime("%Y%m%d") if self.has_option(section, service + "_refresh_ttl"): opt = service + "_refresh_ttl" result = self.get(section, opt) proto_settings[opt] = result if self.has_option(section, service + "_domain_required"): opt = service + "_domain_required" result = self.get(section, opt) if result.lower() in TRUE: proto_settings[opt] = "on" else: proto_settings[opt] = "off" if self.has_option(section, service + "_domain_name"): opt = service + "_domain_name" result = self.__expand_vars(self.get(section, opt)) result = self.__replace_makro(result) proto_settings[opt] = result if service == "smtp": if self.has_option(section, service + "_author"): opt = service + "_author" author = self.__expand_vars(self.get(section, opt)) if author == "%s": proto_settings[opt] = self.__emailaddress if self.has_option(section, service + "_default"): try: opt = service + "_default" tmp = self.__expand_vars(self.get(section, opt)) if tmp.strip().lower() in TRUE: result = "Yes" else: result = "No" proto_settings[opt] = result except ValueError: pass return proto_settings @staticmethod def create_list(value): result = value.split() if len(result) > 1: for i, item in enumerate(result): result[i] = item.split(",")[0] return result def __replace_makro(self, expression): if "%u" in expression: user = self.__emailaddress.split("@")[0] expression = expression.replace("%u", user) if "%d" in expression: domain = self.__search_domain expression = expression.replace("%d", domain) if "%s" in expression: email = self.__emailaddress expression = expression.replace("%s", email) return expression def __expand_vars(self, expression): # do we have some dynamic variables? if len(self.__vars) == 0: return expression def repl(mobj): if mobj.group(1) in self.__vars: _result = self.__vars[mobj.group(1)] if mobj.group(2) is not None: macro = mobj.group(2)[1:] if self.debug: logging.debug("__expand_vars()->macro=%s" % macro) if "@" in _result: if macro == "%u": return _result.split("@")[0] if macro == "%d": return _result.split("@")[1] if macro == "%s": return _result _result = _result.split("@")[1] # now the macro may only be part of a FQDN hostname if "." in _result: dcs = _result.split(".") if macro in ("%1", "%2", "%3", "%4", "%5", "%6", "%7", "%8", "%9"): i = int(macro[1]) if len(dcs) < i: return "" return dcs[-i] return _result else: # we always must expand variables. Even if it is the empty # string return "" result = re.sub(r"\$\{(\w+)(:%[sud1-9])?\}", repl, expression, re.UNICODE) if self.debug: logging.debug("__expand_vars()->result=%s" % result) return result def __find_section(self, domain): l = self.sections() for section in iter(l): if section.lower() == domain.lower(): return section raise NoSectionError(domain) @property def provider(self): return self.__automx["provider"] @property def openssl(self): return self.__automx['openssl'] @property def domain(self): return self.__domain @property def cn(self): return self.__cn @property def password(self): return self.__password @property def emailaddress(self): return self.__emailaddress class Memcache(object): def __init__(self, config, environ): self.__config = config self.__environ = environ # Memcache usage is optional self.__has_memcache = use_memcache self.__found = False self.__client = None self.__current = 0 try: if use_memcache: self.__mc = memcache.Client([config.get("automx", "memcache")]) except ValueError as e: logging.warning("Memcache misconfigured: ", e) self.__has_memcache = False except NoOptionError: logging.warning("Not using Memcache") self.__has_memcache = False def counter(self): return self.__current def set_client(self): if not self.__has_memcache: return if self.__is_trusted_network(): return if self.__config.has_option("automx", "memcache_ttl"): try: ttl = self.__config.getint("automx", "memcache_ttl") except ValueError as e: logging.warning("Memcache <memcache_ttl>, using default: ", e) ttl = 600 else: ttl = 600 if self.__found: self.__current += 1 self.__mc.set(self.__client, self.__current, time=ttl) def allow_client(self): if not self.__has_memcache: return True self.__client = self.__environ["REMOTE_ADDR"] if self.__is_trusted_network(): if self.__config.debug: logging.debug("TRUSTED %s" % self.__client) return True else: if self.__config.debug: logging.debug("NOT TRUSTED %s" % self.__client) if self.__config.has_option("automx", "client_error_limit"): try: limit = self.__config.getint("automx", "client_error_limit") except ValueError as e: logging.warning("Memcache <client_error_limit>, " "using default: ", e) limit = 20 else: limit = 20 result = self.__mc.get(self.__client) if result is not None: self.__found = True self.__current = result if self.__current < limit: return True else: self.set_client() return False def __is_trusted_network(self): if self.__config.has_option("automx", "rate_limit_exception_networks"): networks = self.__config.get("automx", "rate_limit_exception_networks") networks = self.__config.create_list(networks) else: networks = ("127.0.0.1", "::1/128") if sys.version_info < (3,): a = ipaddress.ip_address(self.__client.decode("utf-8")) else: a = ipaddress.ip_address(self.__client) for network in iter(networks): n = ipaddress.ip_network(network) if a in n: if self.__config.debug: logging.debug("FOUND %s, %s" % (a, n)) return True else: if self.__config.debug: logging.debug("NOT FOUND %s, %s" % (a, n)) return False # vim: expandtab ts=4 sw=4