Server IP : 85.214.239.14 / Your IP : 3.15.5.211 Web Server : Apache/2.4.62 (Debian) System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64 User : www-data ( 33) PHP Version : 7.4.18 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare, MySQL : OFF | cURL : OFF | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : OFF Directory : /srv/radicale/env/lib64/python3.5/site-packages/radicale/app/ |
Upload File : |
# This file is part of Radicale Server - Calendar Server # Copyright © 2008 Nicolas Kandel # Copyright © 2008 Pascal Halter # Copyright © 2008-2017 Guillaume Ayoub # Copyright © 2017-2018 Unrud <unrud@outlook.com> # # This library is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with Radicale. If not, see <http://www.gnu.org/licenses/>. import itertools import posixpath import socket import sys from http import client import vobject from radicale import app, httputils from radicale import item as radicale_item from radicale import pathutils, rights, storage, xmlutils from radicale.log import logger MIMETYPE_TAGS = {value: key for key, value in xmlutils.MIMETYPES.items()} def prepare(vobject_items, path, content_type, permissions, parent_permissions, tag=None, write_whole_collection=None): if (write_whole_collection or permissions and not parent_permissions): write_whole_collection = True tag = radicale_item.predict_tag_of_whole_collection( vobject_items, MIMETYPE_TAGS.get(content_type)) if not tag: raise ValueError("Can't determine collection tag") collection_path = pathutils.strip_path(path) elif (write_whole_collection is not None and not write_whole_collection or not permissions and parent_permissions): write_whole_collection = False if tag is None: tag = radicale_item.predict_tag_of_parent_collection(vobject_items) collection_path = posixpath.dirname(pathutils.strip_path(path)) props = None stored_exc_info = None items = [] try: if tag: radicale_item.check_and_sanitize_items( vobject_items, is_collection=write_whole_collection, tag=tag) if write_whole_collection and tag == "VCALENDAR": vobject_components = [] vobject_item, = vobject_items for content in ("vevent", "vtodo", "vjournal"): vobject_components.extend( getattr(vobject_item, "%s_list" % content, [])) vobject_components_by_uid = itertools.groupby( sorted(vobject_components, key=radicale_item.get_uid), radicale_item.get_uid) for _, components in vobject_components_by_uid: vobject_collection = vobject.iCalendar() for component in components: vobject_collection.add(component) item = radicale_item.Item(collection_path=collection_path, vobject_item=vobject_collection) item.prepare() items.append(item) elif write_whole_collection and tag == "VADDRESSBOOK": for vobject_item in vobject_items: item = radicale_item.Item(collection_path=collection_path, vobject_item=vobject_item) item.prepare() items.append(item) elif not write_whole_collection: vobject_item, = vobject_items item = radicale_item.Item(collection_path=collection_path, vobject_item=vobject_item) item.prepare() items.append(item) if write_whole_collection: props = {} if tag: props["tag"] = tag if tag == "VCALENDAR" and vobject_items: if hasattr(vobject_items[0], "x_wr_calname"): calname = vobject_items[0].x_wr_calname.value if calname: props["D:displayname"] = calname if hasattr(vobject_items[0], "x_wr_caldesc"): caldesc = vobject_items[0].x_wr_caldesc.value if caldesc: props["C:calendar-description"] = caldesc radicale_item.check_and_sanitize_props(props) except Exception: stored_exc_info = sys.exc_info() # Use generator for items and delete references to free memory # early def items_generator(): while items: yield items.pop(0) return (items_generator(), tag, write_whole_collection, props, stored_exc_info) class ApplicationPutMixin: def do_PUT(self, environ, base_prefix, path, user): """Manage PUT request.""" access = app.Access(self._rights, user, path) if not access.check("w"): return httputils.NOT_ALLOWED try: content = self._read_content(environ) except RuntimeError as e: logger.warning("Bad PUT request on %r: %s", path, e, exc_info=True) return httputils.BAD_REQUEST except socket.timeout: logger.debug("client timed out", exc_info=True) return httputils.REQUEST_TIMEOUT # Prepare before locking content_type = environ.get("CONTENT_TYPE", "").split(";")[0] try: vobject_items = tuple(vobject.readComponents(content or "")) except Exception as e: logger.warning( "Bad PUT request on %r: %s", path, e, exc_info=True) return httputils.BAD_REQUEST (prepared_items, prepared_tag, prepared_write_whole_collection, prepared_props, prepared_exc_info) = prepare( vobject_items, path, content_type, bool(rights.intersect(access.permissions, "Ww")), bool(rights.intersect(access.parent_permissions, "w"))) with self._storage.acquire_lock("w", user): item = next(self._storage.discover(path), None) parent_item = next( self._storage.discover(access.parent_path), None) if not parent_item: return httputils.CONFLICT write_whole_collection = ( isinstance(item, storage.BaseCollection) or not parent_item.get_meta("tag")) if write_whole_collection: tag = prepared_tag else: tag = parent_item.get_meta("tag") if write_whole_collection: if ("w" if tag else "W") not in access.permissions: return httputils.NOT_ALLOWED elif "w" not in access.parent_permissions: return httputils.NOT_ALLOWED etag = environ.get("HTTP_IF_MATCH", "") if not item and etag: # Etag asked but no item found: item has been removed return httputils.PRECONDITION_FAILED if item and etag and item.etag != etag: # Etag asked but item not matching: item has changed return httputils.PRECONDITION_FAILED match = environ.get("HTTP_IF_NONE_MATCH", "") == "*" if item and match: # Creation asked but item found: item can't be replaced return httputils.PRECONDITION_FAILED if (tag != prepared_tag or prepared_write_whole_collection != write_whole_collection): (prepared_items, prepared_tag, prepared_write_whole_collection, prepared_props, prepared_exc_info) = prepare( vobject_items, path, content_type, bool(rights.intersect(access.permissions, "Ww")), bool(rights.intersect(access.parent_permissions, "w")), tag, write_whole_collection) props = prepared_props if prepared_exc_info: logger.warning( "Bad PUT request on %r: %s", path, prepared_exc_info[1], exc_info=prepared_exc_info) return httputils.BAD_REQUEST if write_whole_collection: try: etag = self._storage.create_collection( path, prepared_items, props).etag except ValueError as e: logger.warning( "Bad PUT request on %r: %s", path, e, exc_info=True) return httputils.BAD_REQUEST else: prepared_item, = prepared_items if (item and item.uid != prepared_item.uid or not item and parent_item.has_uid(prepared_item.uid)): return self._webdav_error_response("%s:no-uid-conflict" % ( "C" if tag == "VCALENDAR" else "CR")) href = posixpath.basename(pathutils.strip_path(path)) try: etag = parent_item.upload(href, prepared_item).etag except ValueError as e: logger.warning( "Bad PUT request on %r: %s", path, e, exc_info=True) return httputils.BAD_REQUEST headers = {"ETag": etag} return client.CREATED, headers, None