Server IP : 85.214.239.14 / Your IP : 13.59.130.154 Web Server : Apache/2.4.62 (Debian) System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64 User : www-data ( 33) PHP Version : 7.4.18 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare, MySQL : OFF | cURL : OFF | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : OFF Directory : /srv/radicale/env/lib64/python3.5/site-packages/radicale/ |
Upload File : |
# This file is part of Radicale Server - Calendar Server # Copyright © 2008-2017 Guillaume Ayoub # Copyright © 2008 Nicolas Kandel # Copyright © 2008 Pascal Halter # Copyright © 2017-2019 Unrud <unrud@outlook.com> # # This library is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with Radicale. If not, see <http://www.gnu.org/licenses/>. """ Configuration module Use ``load()`` to obtain an instance of ``Configuration`` for use with ``radicale.app.Application``. """ import contextlib import math import os import string from collections import OrderedDict from configparser import RawConfigParser from radicale import auth, rights, storage, web DEFAULT_CONFIG_PATH = os.pathsep.join([ "?/etc/radicale/config", "?~/.config/radicale/config"]) def positive_int(value): value = int(value) if value < 0: raise ValueError("value is negative: %d" % value) return value def positive_float(value): value = float(value) if not math.isfinite(value): raise ValueError("value is infinite") if math.isnan(value): raise ValueError("value is not a number") if value < 0: raise ValueError("value is negative: %f" % value) return value def logging_level(value): if value not in ("debug", "info", "warning", "error", "critical"): raise ValueError("unsupported level: %r" % value) return value def filepath(value): if not value: return "" value = os.path.expanduser(value) if os.name == "nt": value = os.path.expandvars(value) return os.path.abspath(value) def list_of_ip_address(value): def ip_address(value): try: address, port = value.rsplit(":", 1) return address.strip(string.whitespace + "[]"), int(port) except ValueError: raise ValueError("malformed IP address: %r" % value) return [ip_address(s) for s in value.split(",")] def str_or_callable(value): if callable(value): return value return str(value) def unspecified_type(value): return value def _convert_to_bool(value): if value.lower() not in RawConfigParser.BOOLEAN_STATES: raise ValueError("Not a boolean: %r" % value) return RawConfigParser.BOOLEAN_STATES[value.lower()] INTERNAL_OPTIONS = ("_allow_extra",) # Default configuration DEFAULT_CONFIG_SCHEMA = OrderedDict([ ("server", OrderedDict([ ("hosts", { "value": "localhost:5232", "help": "set server hostnames including ports", "aliases": ["-H", "--hosts"], "type": list_of_ip_address}), ("max_connections", { "value": "8", "help": "maximum number of parallel connections", "type": positive_int}), ("max_content_length", { "value": "100000000", "help": "maximum size of request body in bytes", "type": positive_int}), ("timeout", { "value": "30", "help": "socket timeout", "type": positive_int}), ("ssl", { "value": "False", "help": "use SSL connection", "aliases": ["-s", "--ssl"], "opposite": ["-S", "--no-ssl"], "type": bool}), ("certificate", { "value": "/etc/ssl/radicale.cert.pem", "help": "set certificate file", "aliases": ["-c", "--certificate"], "type": filepath}), ("key", { "value": "/etc/ssl/radicale.key.pem", "help": "set private key file", "aliases": ["-k", "--key"], "type": filepath}), ("certificate_authority", { "value": "", "help": "set CA certificate for validating clients", "aliases": ["--certificate-authority"], "type": filepath}), ("_internal_server", { "value": "False", "help": "the internal server is used", "type": bool})])), ("encoding", OrderedDict([ ("request", { "value": "utf-8", "help": "encoding for responding requests", "type": str}), ("stock", { "value": "utf-8", "help": "encoding for storing local collections", "type": str})])), ("auth", OrderedDict([ ("type", { "value": "none", "help": "authentication method", "type": str_or_callable, "internal": auth.INTERNAL_TYPES}), ("htpasswd_filename", { "value": "/etc/radicale/users", "help": "htpasswd filename", "type": filepath}), ("htpasswd_encryption", { "value": "md5", "help": "htpasswd encryption method", "type": str}), ("realm", { "value": "Radicale - Password Required", "help": "message displayed when a password is needed", "type": str}), ("delay", { "value": "1", "help": "incorrect authentication delay", "type": positive_float})])), ("rights", OrderedDict([ ("type", { "value": "owner_only", "help": "rights backend", "type": str_or_callable, "internal": rights.INTERNAL_TYPES}), ("file", { "value": "/etc/radicale/rights", "help": "file for rights management from_file", "type": filepath})])), ("storage", OrderedDict([ ("type", { "value": "multifilesystem", "help": "storage backend", "type": str_or_callable, "internal": storage.INTERNAL_TYPES}), ("filesystem_folder", { "value": "/var/lib/radicale/collections", "help": "path where collections are stored", "type": filepath}), ("max_sync_token_age", { "value": "2592000", # 30 days "help": "delete sync token that are older", "type": positive_int}), ("hook", { "value": "", "help": "command that is run after changes to storage", "type": str}), ("_filesystem_fsync", { "value": "True", "help": "sync all changes to filesystem during requests", "type": bool})])), ("web", OrderedDict([ ("type", { "value": "internal", "help": "web interface backend", "type": str_or_callable, "internal": web.INTERNAL_TYPES})])), ("logging", OrderedDict([ ("level", { "value": "warning", "help": "threshold for the logger", "type": logging_level}), ("mask_passwords", { "value": "True", "help": "mask passwords in logs", "type": bool})])), ("headers", OrderedDict([ ("_allow_extra", str)]))]) def parse_compound_paths(*compound_paths): """Parse a compound path and return the individual paths. Paths in a compound path are joined by ``os.pathsep``. If a path starts with ``?`` the return value ``IGNORE_IF_MISSING`` is set. When multiple ``compound_paths`` are passed, the last argument that is not ``None`` is used. Returns a dict of the format ``[(PATH, IGNORE_IF_MISSING), ...]`` """ compound_path = "" for p in compound_paths: if p is not None: compound_path = p paths = [] for path in compound_path.split(os.pathsep): ignore_if_missing = path.startswith("?") if ignore_if_missing: path = path[1:] path = filepath(path) if path: paths.append((path, ignore_if_missing)) return paths def load(paths=()): """ Create instance of ``Configuration`` for use with ``radicale.app.Application``. ``paths`` a list of configuration files with the format ``[(PATH, IGNORE_IF_MISSING), ...]``. If a configuration file is missing and IGNORE_IF_MISSING is set, the config is set to ``Configuration.SOURCE_MISSING``. The configuration can later be changed with ``Configuration.update()``. """ configuration = Configuration(DEFAULT_CONFIG_SCHEMA) for path, ignore_if_missing in paths: parser = RawConfigParser() config_source = "config file %r" % path try: if not parser.read(path): config = Configuration.SOURCE_MISSING if not ignore_if_missing: raise RuntimeError("No such file: %r" % path) else: config = {s: {o: parser[s][o] for o in parser.options(s)} for s in parser.sections()} except Exception as e: raise RuntimeError( "Failed to load %s: %s" % (config_source, e)) from e configuration.update(config, config_source) return configuration class Configuration: SOURCE_MISSING = {} def __init__(self, schema): """Initialize configuration. ``schema`` a dict that describes the configuration format. See ``DEFAULT_CONFIG_SCHEMA``. The content of ``schema`` must not change afterwards, it is kept as an internal reference. Use ``load()`` to create an instance for use with ``radicale.app.Application``. """ self._schema = schema self._values = {} self._configs = [] default = {section: {option: self._schema[section][option]["value"] for option in self._schema[section] if option not in INTERNAL_OPTIONS} for section in self._schema} self.update(default, "default config", privileged=True) def update(self, config, source=None, privileged=False): """Update the configuration. ``config`` a dict of the format {SECTION: {OPTION: VALUE, ...}, ...}. The configuration is checked for errors according to the config schema. The content of ``config`` must not change afterwards, it is kept as an internal reference. ``source`` a description of the configuration source (used in error messages). ``privileged`` allows updating sections and options starting with "_". """ source = source or "unspecified config" new_values = {} for section in config: if (section not in self._schema or section.startswith("_") and not privileged): raise ValueError( "Invalid section %r in %s" % (section, source)) new_values[section] = {} extra_type = None extra_type = self._schema[section].get("_allow_extra") if "type" in self._schema[section]: if "type" in config[section]: plugin = config[section]["type"] else: plugin = self.get(section, "type") if plugin not in self._schema[section]["type"]["internal"]: extra_type = unspecified_type for option in config[section]: type_ = extra_type if option in self._schema[section]: type_ = self._schema[section][option]["type"] if (not type_ or option in INTERNAL_OPTIONS or option.startswith("_") and not privileged): raise RuntimeError("Invalid option %r in section %r in " "%s" % (option, section, source)) raw_value = config[section][option] try: if type_ == bool and not isinstance(raw_value, bool): raw_value = _convert_to_bool(raw_value) new_values[section][option] = type_(raw_value) except Exception as e: raise RuntimeError( "Invalid %s value for option %r in section %r in %s: " "%r" % (type_.__name__, option, section, source, raw_value)) from e self._configs.append((config, source, bool(privileged))) for section in new_values: self._values[section] = self._values.get(section, {}) self._values[section].update(new_values[section]) def get(self, section, option): """Get the value of ``option`` in ``section``.""" with contextlib.suppress(KeyError): return self._values[section][option] raise KeyError(section, option) def get_raw(self, section, option): """Get the raw value of ``option`` in ``section``.""" for config, _, _ in reversed(self._configs): if option in config.get(section, {}): return config[section][option] raise KeyError(section, option) def get_source(self, section, option): """Get the source that provides ``option`` in ``section``.""" for config, source, _ in reversed(self._configs): if option in config.get(section, {}): return source raise KeyError(section, option) def sections(self): """List all sections.""" return self._values.keys() def options(self, section): """List all options in ``section``""" return self._values[section].keys() def sources(self): """List all config sources.""" return [(source, config is self.SOURCE_MISSING) for config, source, _ in self._configs] def copy(self, plugin_schema=None): """Create a copy of the configuration ``plugin_schema`` is a optional dict that contains additional options for usage with a plugin. See ``DEFAULT_CONFIG_SCHEMA``. """ if plugin_schema is None: schema = self._schema else: schema = self._schema.copy() for section, options in plugin_schema.items(): if (section not in schema or "type" not in schema[section] or "internal" not in schema[section]["type"]): raise ValueError("not a plugin section: %r" % section) schema[section] = schema[section].copy() schema[section]["type"] = schema[section]["type"].copy() schema[section]["type"]["internal"] = [ self.get(section, "type")] for option, value in options.items(): if option in schema[section]: raise ValueError("option already exists in %r: %r" % ( section, option)) schema[section][option] = value copy = type(self)(schema) for config, source, privileged in self._configs: copy.update(config, source, privileged) return copy