Server IP : 85.214.239.14 / Your IP : 18.117.76.255 Web Server : Apache/2.4.62 (Debian) System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64 User : www-data ( 33) PHP Version : 7.4.18 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare, MySQL : OFF | cURL : OFF | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : OFF Directory : /srv/modoboa/env/lib64/python3.5/site-packages/modoboa_contacts/ |
Upload File : |
"""Async tasks.""" from django.utils import timezone from modoboa.lib import cryptutils from .lib import carddav from . import models def get_cdav_client(request, addressbook, write_support=False): """Instantiate a new CardDAV client.""" return carddav.PyCardDAV( addressbook.url, user=request.user.username, passwd=cryptutils.decrypt(request.session["password"]), write_support=write_support ) def create_cdav_addressbook(addressbook, password): """Create CardDAV address book.""" clt = carddav.PyCardDAV( addressbook.url, user=addressbook.user.username, passwd=password, write_support=True ) clt.create_abook() def push_addressbook_to_carddav(request, addressbook): """Push every addressbook item to carddav collection. Use only once. """ clt = get_cdav_client(request, addressbook, True) for contact in addressbook.contact_set.all(): href, etag = clt.upload_new_card(contact.uid, contact.to_vcard()) contact.etag = etag contact.save(update_fields=["etag"]) addressbook.last_sync = timezone.now() addressbook.sync_token = clt.get_sync_token() addressbook.save(update_fields=["last_sync", "sync_token"]) def sync_addressbook_from_cdav(request, addressbook): """Fetch changes from CardDAV server.""" clt = get_cdav_client(request, addressbook) changes = clt.sync_vcards(addressbook.sync_token) if not len(changes["cards"]): return for card in changes["cards"]: # UID sometimes embded .vcf extension, sometimes not... long_uid = card["href"].split("/")[-1] short_uid = long_uid.split(".")[0] if "200" in card["status"]: content = clt.get_vcard(card["href"]).decode() contact = models.Contact.objects.filter( uid__in=[long_uid, short_uid]).first() if not contact: contact = models.Contact(addressbook=addressbook) if contact.etag != card["etag"]: contact.etag = card["etag"] contact.update_from_vcard(content) elif "404" in card["status"]: models.Contact.objects.filter( uid__in=[long_uid, short_uid]).delete() addressbook.last_sync = timezone.now() addressbook.sync_token = changes["token"] addressbook.save(update_fields=["last_sync", "sync_token"]) def push_contact_to_cdav(request, contact): """Upload new contact to cdav collection.""" clt = get_cdav_client(request, contact.addressbook, True) path, etag = clt.upload_new_card(contact.uid, contact.to_vcard()) contact.etag = etag contact.save(update_fields=["etag"]) def update_contact_cdav(request, contact): """Update existing contact.""" clt = get_cdav_client(request, contact.addressbook, True) uid = contact.uid if not uid.endswith(".vcf"): uid += ".vcf" result = clt.update_vcard(contact.to_vcard(), uid, contact.etag) contact.etag = result["cards"][0]["etag"] contact.save(update_fields=["etag"]) def delete_contact_cdav(request, contact): """Delete a contact.""" clt = get_cdav_client(request, contact.addressbook, True) uid = contact.uid if not uid.endswith(".vcf"): uid += ".vcf" clt.delete_vcard(uid, contact.etag)