Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 3.144.224.105
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /proc/3/task/3/cwd/srv/modoboa/env/lib64/python3.5/site-packages/caldav/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /proc/3/task/3/cwd/srv/modoboa/env/lib64/python3.5/site-packages/caldav//davclient.py
#!/usr/bin/env python
# -*- encoding: utf-8 -*-

import logging
import re
import requests
import six
from caldav.lib.python_utilities import to_wire
from lxml import etree

from caldav.lib import error
from caldav.lib.url import URL
from caldav.objects import Calendar, Principal

if six.PY3:
    from urllib.parse import unquote
else:
    from urlparse import unquote

log = logging.getLogger('caldav')


class DAVResponse:
    """
    This class is a response from a DAV request.  It is instantiated from
    the DAVClient class.  End users of the library should not need to
    know anything about this class.  Since we often get XML responses,
    it tries to parse it into `self.tree`
    """
    raw = ""
    reason = ""
    tree = None
    headers = {}
    status = 0

    def __init__(self, response):
        self.raw = response.content
        self.headers = response.headers
        self.status = response.status_code
        self.reason = response.reason
        log.debug("response headers: " + str(self.headers))
        log.debug("response status: " + str(self.status))
        log.debug("raw response: " + str(self.raw))

        try:
            self.tree = etree.XML(self.raw)
        except:
            self.tree = None


class DAVClient:
    """
    Basic client for webdav, uses the requests lib; gives access to
    low-level operations towards the caldav server.

    Unless you have special needs, you should probably care most about
    the __init__ and principal methods.
    """
    proxy = None
    url = None

    def __init__(self, url, proxy=None, username=None, password=None,
                 auth=None, ssl_verify_cert=True):
        """
        Sets up a HTTPConnection object towards the server in the url.
        Parameters:
         * url: A fully qualified url: `scheme://user:pass@hostname:port`
         * proxy: A string defining a proxy server: `hostname:port`
         * username and password should be passed as arguments or in the URL
         * auth and ssl_verify_cert is passed to requests.request.
         ** ssl_verify_cert can be the path of a CA-bundle or False.
        """

        log.debug("url: " + str(url))
        self.url = URL.objectify(url)

        # Prepare proxy info
        if proxy is not None:
            self.proxy = proxy
            # requests library expects the proxy url to have a scheme
            if re.match('^.*://', proxy) is None:
                self.proxy = self.url.scheme + '://' + proxy

            # add a port is one is not specified
            # TODO: this will break if using basic auth and embedding
            # username:password in the proxy URL
            p = self.proxy.split(":")
            if len(p) == 2:
                self.proxy += ':8080'
            log.debug("init - proxy: %s" % (self.proxy))

        # Build global headers
        self.headers = {"User-Agent": "Mozilla/5.0",
                        "Content-Type": "text/xml",
                        "Accept": "text/xml"}
        if self.url.username is not None:
            username = unquote(self.url.username)
            password = unquote(self.url.password)

        self.username = username
        self.password = password
        self.auth = auth
        # TODO: it's possible to force through a specific auth method here,
        # but no test code for this.
        self.ssl_verify_cert = ssl_verify_cert
        self.url = self.url.unauth()
        log.debug("self.url: " + str(url))

    def principal(self):
        """
        Convenience method, it gives a bit more object-oriented feel to
        write client.principal() than Principal(client).

        This method returns a :class:`caldav.Principal` object, with
        higher-level methods for dealing with the principals
        calendars.
        """
        return Principal(self)

    def calendar(self, url):
        """
        Instantiante a calendar by knowing its url.

        This method returns a :class:`caldav.Calendar` object, with
        higher-level methods for dealing with the calendar
        events.
        """
        return Calendar(self, url)

    def propfind(self, url=None, props="", depth=0):
        """
        Send a propfind request.

        Parameters:
         * url: url for the root of the propfind.
         * props = (xml request), properties we want
         * depth: maximum recursion depth

        Returns
         * DAVResponse
        """
        return self.request(url or self.url, "PROPFIND", props,
                            {'Depth': str(depth)})

    def proppatch(self, url, body, dummy=None):
        """
        Send a proppatch request.

        Parameters:
         * url: url for the root of the propfind.
         * body: XML propertyupdate request
         * dummy: compatibility parameter

        Returns
         * DAVResponse
        """
        return self.request(url, "PROPPATCH", body)

    def report(self, url, query="", depth=0):
        """
        Send a report request.

        Parameters:
         * url: url for the root of the propfind.
         * query: XML request
         * depth: maximum recursion depth

        Returns
         * DAVResponse
        """
        return self.request(url, "REPORT", query,
                            {'Depth': str(depth), "Content-Type":
                             "application/xml; charset=\"utf-8\""})

    def mkcol(self, url, body, dummy=None):
        """
        Send a mkcol request.

        Parameters:
         * url: url for the root of the mkcol
         * body: XML request
         * dummy: compatibility parameter

        Returns
         * DAVResponse
        """
        return self.request(url, "MKCOL", body)

    def mkcalendar(self, url, body="", dummy=None):
        """
        Send a mkcalendar request.

        Parameters:
         * url: url for the root of the mkcalendar
         * body: XML request
         * dummy: compatibility parameter

        Returns
         * DAVResponse
        """
        return self.request(url, "MKCALENDAR", body)

    def put(self, url, body, headers={}):
        """
        Send a put request.
        """
        return self.request(url, "PUT", body, headers)

    def delete(self, url):
        """
        Send a delete request.
        """
        return self.request(url, "DELETE")

    def request(self, url, method="GET", body="", headers={}):
        """
        Actually sends the request
        """

        # objectify the url
        url = URL.objectify(url)

        proxies = None
        if self.proxy is not None:
            proxies = {url.scheme: self.proxy}
            log.debug("using proxy - %s" % (proxies))

        # ensure that url is a unicode string
        url = str(url)

        combined_headers = dict(self.headers)
        combined_headers.update(headers)
        if body is None or body == "" and "Content-Type" in combined_headers:
            del combined_headers["Content-Type"]

        log.debug(
            "sending request - method={0}, url={1}, headers={2}\nbody:\n{3}"
            .format(method, url, combined_headers, body))
        auth = None
        if self.auth is None and self.username is not None:
            auth = requests.auth.HTTPDigestAuth(self.username, self.password)
        else:
            auth = self.auth

        r = requests.request(method, url, data=to_wire(body),
                             headers=combined_headers, proxies=proxies,
                             auth=auth, verify=self.ssl_verify_cert)
        response = DAVResponse(r)

        # If server supports BasicAuth and not DigestAuth, let's try again:
        if response.status == 401 and self.auth is None and auth is not None:
            auth = requests.auth.HTTPBasicAuth(self.username, self.password)
            r = requests.request(method, url, data=to_wire(body),
                                 headers=combined_headers, proxies=proxies,
                                 auth=auth, verify=self.ssl_verify_cert)
            response = DAVResponse(r)

        # this is an error condition the application wants to know
        if response.status == requests.codes.forbidden or \
                response.status == requests.codes.unauthorized:
            ex = error.AuthorizationError()
            ex.url = url
            ex.reason = response.reason
            raise ex

        # let's save the auth object and remove the user/pass information
        if not self.auth and auth:
            self.auth = auth
            del self.username
            del self.password

        return response

Anon7 - 2022
AnonSec Team