Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 3.144.117.164
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /proc/2/task/2/cwd/proc/2/cwd/srv/radicale/env/lib64/python3.5/site-packages/radicale/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /proc/2/task/2/cwd/proc/2/cwd/srv/radicale/env/lib64/python3.5/site-packages/radicale//config.py
# This file is part of Radicale Server - Calendar Server
# Copyright © 2008-2017 Guillaume Ayoub
# Copyright © 2008 Nicolas Kandel
# Copyright © 2008 Pascal Halter
# Copyright © 2017-2019 Unrud <unrud@outlook.com>
#
# This library is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Radicale.  If not, see <http://www.gnu.org/licenses/>.

"""
Configuration module

Use ``load()`` to obtain an instance of ``Configuration`` for use with
``radicale.app.Application``.

"""

import contextlib
import math
import os
import string
from collections import OrderedDict
from configparser import RawConfigParser

from radicale import auth, rights, storage, web

DEFAULT_CONFIG_PATH = os.pathsep.join([
    "?/etc/radicale/config",
    "?~/.config/radicale/config"])


def positive_int(value):
    value = int(value)
    if value < 0:
        raise ValueError("value is negative: %d" % value)
    return value


def positive_float(value):
    value = float(value)
    if not math.isfinite(value):
        raise ValueError("value is infinite")
    if math.isnan(value):
        raise ValueError("value is not a number")
    if value < 0:
        raise ValueError("value is negative: %f" % value)
    return value


def logging_level(value):
    if value not in ("debug", "info", "warning", "error", "critical"):
        raise ValueError("unsupported level: %r" % value)
    return value


def filepath(value):
    if not value:
        return ""
    value = os.path.expanduser(value)
    if os.name == "nt":
        value = os.path.expandvars(value)
    return os.path.abspath(value)


def list_of_ip_address(value):
    def ip_address(value):
        try:
            address, port = value.rsplit(":", 1)
            return address.strip(string.whitespace + "[]"), int(port)
        except ValueError:
            raise ValueError("malformed IP address: %r" % value)
    return [ip_address(s) for s in value.split(",")]


def str_or_callable(value):
    if callable(value):
        return value
    return str(value)


def unspecified_type(value):
    return value


def _convert_to_bool(value):
    if value.lower() not in RawConfigParser.BOOLEAN_STATES:
        raise ValueError("Not a boolean: %r" % value)
    return RawConfigParser.BOOLEAN_STATES[value.lower()]


INTERNAL_OPTIONS = ("_allow_extra",)
# Default configuration
DEFAULT_CONFIG_SCHEMA = OrderedDict([
    ("server", OrderedDict([
        ("hosts", {
            "value": "localhost:5232",
            "help": "set server hostnames including ports",
            "aliases": ["-H", "--hosts"],
            "type": list_of_ip_address}),
        ("max_connections", {
            "value": "8",
            "help": "maximum number of parallel connections",
            "type": positive_int}),
        ("max_content_length", {
            "value": "100000000",
            "help": "maximum size of request body in bytes",
            "type": positive_int}),
        ("timeout", {
            "value": "30",
            "help": "socket timeout",
            "type": positive_int}),
        ("ssl", {
            "value": "False",
            "help": "use SSL connection",
            "aliases": ["-s", "--ssl"],
            "opposite": ["-S", "--no-ssl"],
            "type": bool}),
        ("certificate", {
            "value": "/etc/ssl/radicale.cert.pem",
            "help": "set certificate file",
            "aliases": ["-c", "--certificate"],
            "type": filepath}),
        ("key", {
            "value": "/etc/ssl/radicale.key.pem",
            "help": "set private key file",
            "aliases": ["-k", "--key"],
            "type": filepath}),
        ("certificate_authority", {
            "value": "",
            "help": "set CA certificate for validating clients",
            "aliases": ["--certificate-authority"],
            "type": filepath}),
        ("_internal_server", {
            "value": "False",
            "help": "the internal server is used",
            "type": bool})])),
    ("encoding", OrderedDict([
        ("request", {
            "value": "utf-8",
            "help": "encoding for responding requests",
            "type": str}),
        ("stock", {
            "value": "utf-8",
            "help": "encoding for storing local collections",
            "type": str})])),
    ("auth", OrderedDict([
        ("type", {
            "value": "none",
            "help": "authentication method",
            "type": str_or_callable,
            "internal": auth.INTERNAL_TYPES}),
        ("htpasswd_filename", {
            "value": "/etc/radicale/users",
            "help": "htpasswd filename",
            "type": filepath}),
        ("htpasswd_encryption", {
            "value": "md5",
            "help": "htpasswd encryption method",
            "type": str}),
        ("realm", {
            "value": "Radicale - Password Required",
            "help": "message displayed when a password is needed",
            "type": str}),
        ("delay", {
            "value": "1",
            "help": "incorrect authentication delay",
            "type": positive_float})])),
    ("rights", OrderedDict([
        ("type", {
            "value": "owner_only",
            "help": "rights backend",
            "type": str_or_callable,
            "internal": rights.INTERNAL_TYPES}),
        ("file", {
            "value": "/etc/radicale/rights",
            "help": "file for rights management from_file",
            "type": filepath})])),
    ("storage", OrderedDict([
        ("type", {
            "value": "multifilesystem",
            "help": "storage backend",
            "type": str_or_callable,
            "internal": storage.INTERNAL_TYPES}),
        ("filesystem_folder", {
            "value": "/var/lib/radicale/collections",
            "help": "path where collections are stored",
            "type": filepath}),
        ("max_sync_token_age", {
            "value": "2592000",  # 30 days
            "help": "delete sync token that are older",
            "type": positive_int}),
        ("hook", {
            "value": "",
            "help": "command that is run after changes to storage",
            "type": str}),
        ("_filesystem_fsync", {
            "value": "True",
            "help": "sync all changes to filesystem during requests",
            "type": bool})])),
    ("web", OrderedDict([
        ("type", {
            "value": "internal",
            "help": "web interface backend",
            "type": str_or_callable,
            "internal": web.INTERNAL_TYPES})])),
    ("logging", OrderedDict([
        ("level", {
            "value": "warning",
            "help": "threshold for the logger",
            "type": logging_level}),
        ("mask_passwords", {
            "value": "True",
            "help": "mask passwords in logs",
            "type": bool})])),
    ("headers", OrderedDict([
        ("_allow_extra", str)]))])


def parse_compound_paths(*compound_paths):
    """Parse a compound path and return the individual paths.
    Paths in a compound path are joined by ``os.pathsep``. If a path starts
    with ``?`` the return value ``IGNORE_IF_MISSING`` is set.

    When multiple ``compound_paths`` are passed, the last argument that is
    not ``None`` is used.

    Returns a dict of the format ``[(PATH, IGNORE_IF_MISSING), ...]``

    """
    compound_path = ""
    for p in compound_paths:
        if p is not None:
            compound_path = p
    paths = []
    for path in compound_path.split(os.pathsep):
        ignore_if_missing = path.startswith("?")
        if ignore_if_missing:
            path = path[1:]
        path = filepath(path)
        if path:
            paths.append((path, ignore_if_missing))
    return paths


def load(paths=()):
    """
    Create instance of ``Configuration`` for use with
    ``radicale.app.Application``.

    ``paths`` a list of configuration files with the format
    ``[(PATH, IGNORE_IF_MISSING), ...]``.
    If a configuration file is missing and IGNORE_IF_MISSING is set, the
    config is set to ``Configuration.SOURCE_MISSING``.

    The configuration can later be changed with ``Configuration.update()``.

    """
    configuration = Configuration(DEFAULT_CONFIG_SCHEMA)
    for path, ignore_if_missing in paths:
        parser = RawConfigParser()
        config_source = "config file %r" % path
        try:
            if not parser.read(path):
                config = Configuration.SOURCE_MISSING
                if not ignore_if_missing:
                    raise RuntimeError("No such file: %r" % path)
            else:
                config = {s: {o: parser[s][o] for o in parser.options(s)}
                          for s in parser.sections()}
        except Exception as e:
            raise RuntimeError(
                "Failed to load %s: %s" % (config_source, e)) from e
        configuration.update(config, config_source)
    return configuration


class Configuration:
    SOURCE_MISSING = {}

    def __init__(self, schema):
        """Initialize configuration.

        ``schema`` a dict that describes the configuration format.
        See ``DEFAULT_CONFIG_SCHEMA``.
        The content of ``schema`` must not change afterwards, it is kept
        as an internal reference.

        Use ``load()`` to create an instance for use with
        ``radicale.app.Application``.

        """
        self._schema = schema
        self._values = {}
        self._configs = []
        default = {section: {option: self._schema[section][option]["value"]
                             for option in self._schema[section]
                             if option not in INTERNAL_OPTIONS}
                   for section in self._schema}
        self.update(default, "default config", privileged=True)

    def update(self, config, source=None, privileged=False):
        """Update the configuration.

        ``config`` a dict of the format {SECTION: {OPTION: VALUE, ...}, ...}.
        The configuration is checked for errors according to the config schema.
        The content of ``config`` must not change afterwards, it is kept
        as an internal reference.

        ``source`` a description of the configuration source (used in error
        messages).

        ``privileged`` allows updating sections and options starting with "_".

        """
        source = source or "unspecified config"
        new_values = {}
        for section in config:
            if (section not in self._schema or
                    section.startswith("_") and not privileged):
                raise ValueError(
                    "Invalid section %r in %s" % (section, source))
            new_values[section] = {}
            extra_type = None
            extra_type = self._schema[section].get("_allow_extra")
            if "type" in self._schema[section]:
                if "type" in config[section]:
                    plugin = config[section]["type"]
                else:
                    plugin = self.get(section, "type")
                if plugin not in self._schema[section]["type"]["internal"]:
                    extra_type = unspecified_type
            for option in config[section]:
                type_ = extra_type
                if option in self._schema[section]:
                    type_ = self._schema[section][option]["type"]
                if (not type_ or option in INTERNAL_OPTIONS or
                        option.startswith("_") and not privileged):
                    raise RuntimeError("Invalid option %r in section %r in "
                                       "%s" % (option, section, source))
                raw_value = config[section][option]
                try:
                    if type_ == bool and not isinstance(raw_value, bool):
                        raw_value = _convert_to_bool(raw_value)
                    new_values[section][option] = type_(raw_value)
                except Exception as e:
                    raise RuntimeError(
                        "Invalid %s value for option %r in section %r in %s: "
                        "%r" % (type_.__name__, option, section, source,
                                raw_value)) from e
        self._configs.append((config, source, bool(privileged)))
        for section in new_values:
            self._values[section] = self._values.get(section, {})
            self._values[section].update(new_values[section])

    def get(self, section, option):
        """Get the value of ``option`` in ``section``."""
        with contextlib.suppress(KeyError):
            return self._values[section][option]
        raise KeyError(section, option)

    def get_raw(self, section, option):
        """Get the raw value of ``option`` in ``section``."""
        for config, _, _ in reversed(self._configs):
            if option in config.get(section, {}):
                return config[section][option]
        raise KeyError(section, option)

    def get_source(self, section, option):
        """Get the source that provides ``option`` in ``section``."""
        for config, source, _ in reversed(self._configs):
            if option in config.get(section, {}):
                return source
        raise KeyError(section, option)

    def sections(self):
        """List all sections."""
        return self._values.keys()

    def options(self, section):
        """List all options in ``section``"""
        return self._values[section].keys()

    def sources(self):
        """List all config sources."""
        return [(source, config is self.SOURCE_MISSING) for
                config, source, _ in self._configs]

    def copy(self, plugin_schema=None):
        """Create a copy of the configuration

        ``plugin_schema`` is a optional dict that contains additional options
        for usage with a plugin. See ``DEFAULT_CONFIG_SCHEMA``.

        """
        if plugin_schema is None:
            schema = self._schema
        else:
            schema = self._schema.copy()
            for section, options in plugin_schema.items():
                if (section not in schema or "type" not in schema[section] or
                        "internal" not in schema[section]["type"]):
                    raise ValueError("not a plugin section: %r" % section)
                schema[section] = schema[section].copy()
                schema[section]["type"] = schema[section]["type"].copy()
                schema[section]["type"]["internal"] = [
                    self.get(section, "type")]
                for option, value in options.items():
                    if option in schema[section]:
                        raise ValueError("option already exists in %r: %r" % (
                            section, option))
                    schema[section][option] = value
        copy = type(self)(schema)
        for config, source, privileged in self._configs:
            copy.update(config, source, privileged)
        return copy

Anon7 - 2022
AnonSec Team