| Server IP : 85.214.239.14 / Your IP : 216.73.216.210 Web Server : Apache/2.4.65 (Debian) System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Mon Sep 30 15:36:27 MSK 2024 x86_64 User : www-data ( 33) PHP Version : 8.2.29 Disable Function : NONE MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : OFF Directory : /proc/3/cwd/srv/modoboa/env/lib/python3.5/site-packages/modoboa_contacts/ |
Upload File : |
"""Async tasks."""
from django.utils import timezone
from modoboa.lib import cryptutils
from .lib import carddav
from . import models
def get_cdav_client(request, addressbook, write_support=False):
"""Instantiate a new CardDAV client."""
return carddav.PyCardDAV(
addressbook.url, user=request.user.username,
passwd=cryptutils.decrypt(request.session["password"]),
write_support=write_support
)
def create_cdav_addressbook(addressbook, password):
"""Create CardDAV address book."""
clt = carddav.PyCardDAV(
addressbook.url, user=addressbook.user.username,
passwd=password,
write_support=True
)
clt.create_abook()
def push_addressbook_to_carddav(request, addressbook):
"""Push every addressbook item to carddav collection.
Use only once.
"""
clt = get_cdav_client(request, addressbook, True)
for contact in addressbook.contact_set.all():
href, etag = clt.upload_new_card(contact.uid, contact.to_vcard())
contact.etag = etag
contact.save(update_fields=["etag"])
addressbook.last_sync = timezone.now()
addressbook.sync_token = clt.get_sync_token()
addressbook.save(update_fields=["last_sync", "sync_token"])
def sync_addressbook_from_cdav(request, addressbook):
"""Fetch changes from CardDAV server."""
clt = get_cdav_client(request, addressbook)
changes = clt.sync_vcards(addressbook.sync_token)
if not len(changes["cards"]):
return
for card in changes["cards"]:
# UID sometimes embded .vcf extension, sometimes not...
long_uid = card["href"].split("/")[-1]
short_uid = long_uid.split(".")[0]
if "200" in card["status"]:
content = clt.get_vcard(card["href"]).decode()
contact = models.Contact.objects.filter(
uid__in=[long_uid, short_uid]).first()
if not contact:
contact = models.Contact(addressbook=addressbook)
if contact.etag != card["etag"]:
contact.etag = card["etag"]
contact.update_from_vcard(content)
elif "404" in card["status"]:
models.Contact.objects.filter(
uid__in=[long_uid, short_uid]).delete()
addressbook.last_sync = timezone.now()
addressbook.sync_token = changes["token"]
addressbook.save(update_fields=["last_sync", "sync_token"])
def push_contact_to_cdav(request, contact):
"""Upload new contact to cdav collection."""
clt = get_cdav_client(request, contact.addressbook, True)
path, etag = clt.upload_new_card(contact.uid, contact.to_vcard())
contact.etag = etag
contact.save(update_fields=["etag"])
def update_contact_cdav(request, contact):
"""Update existing contact."""
clt = get_cdav_client(request, contact.addressbook, True)
uid = contact.uid
if not uid.endswith(".vcf"):
uid += ".vcf"
result = clt.update_vcard(contact.to_vcard(), uid, contact.etag)
contact.etag = result["cards"][0]["etag"]
contact.save(update_fields=["etag"])
def delete_contact_cdav(request, contact):
"""Delete a contact."""
clt = get_cdav_client(request, contact.addressbook, True)
uid = contact.uid
if not uid.endswith(".vcf"):
uid += ".vcf"
clt.delete_vcard(uid, contact.etag)