Server IP : 85.214.239.14 / Your IP : 3.145.161.199 Web Server : Apache/2.4.62 (Debian) System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64 User : www-data ( 33) PHP Version : 7.4.18 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare, MySQL : OFF | cURL : OFF | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : OFF Directory : /proc/2/cwd/proc/2/cwd/srv/modoboa/env/lib64/python3.5/site-packages/modoboa/admin/ |
Upload File : |
"""Admin API.""" from django import http from django.contrib.contenttypes.models import ContentType from django.utils.translation import ugettext as _ from rest_framework import filters, status, viewsets from rest_framework.decorators import action from rest_framework.exceptions import ParseError from rest_framework.permissions import DjangoModelPermissions, IsAuthenticated from rest_framework.response import Response from modoboa.core import models as core_models from modoboa.core import sms_backends from . import lib, models, serializers class DomainViewSet(viewsets.ModelViewSet): """ retrieve: Return the given domain. list: Return a list of all existing domains. create: Create a new domain instance. """ permission_classes = [IsAuthenticated, DjangoModelPermissions, ] serializer_class = serializers.DomainSerializer def get_queryset(self): """Filter queryset based on current user.""" return models.Domain.objects.get_for_admin(self.request.user) def perform_destroy(self, instance): """Add custom args to delete call.""" instance.delete(self.request.user) class DomainAliasViewSet(viewsets.ModelViewSet): """ViewSet for DomainAlias.""" permission_classes = [IsAuthenticated, DjangoModelPermissions, ] serializer_class = serializers.DomainAliasSerializer http_method_names = ["get", "post", "put", "delete"] def get_queryset(self): """Filter queryset based on current user.""" queryset = models.DomainAlias.objects.get_for_admin(self.request.user) domain = self.request.query_params.get("domain") if domain: queryset = queryset.filter(target__name=domain) return queryset class AccountViewSet(viewsets.ModelViewSet): """ViewSet for User/Mailbox.""" filter_backends = [filters.SearchFilter] permission_classes = [IsAuthenticated, DjangoModelPermissions, ] search_fields = ("^first_name", "^last_name", "^email") def get_serializer_class(self): """Return a serializer.""" action_dict = { "list": serializers.AccountSerializer, "retrieve": serializers.AccountSerializer, "password": serializers.AccountPasswordSerializer, "reset_password": serializers.ResetPasswordSerializer, } return action_dict.get( self.action, serializers.WritableAccountSerializer) def get_queryset(self): """Filter queryset based on current user.""" user = self.request.user ids = user.objectaccess_set \ .filter(content_type=ContentType.objects.get_for_model(user)) \ .values_list("object_id", flat=True) queryset = core_models.User.objects.filter(pk__in=ids) domain = self.request.query_params.get("domain") if domain: queryset = queryset.filter(mailbox__domain__name=domain) return queryset @action(methods=["put"], detail=True) def password(self, request, pk=None): """Change account password.""" try: user = core_models.User.objects.get(pk=pk) except core_models.User.DoesNotExist: raise http.Http404 serializer = self.get_serializer(user, data=request.data) if serializer.is_valid(): serializer.save() return Response() return Response( serializer.errors, status=status.HTTP_400_BAD_REQUEST) @action(detail=False) def exists(self, request): """Check if account exists. Requires a valid email address as argument. Example: GET /exists/?email=user@test.com """ email = request.GET.get("email") if not email: raise ParseError("email not provided") if not core_models.User.objects.filter(email=email).exists(): data = {"exists": False} else: data = {"exists": True} serializer = serializers.AccountExistsSerializer(data) return Response(serializer.data) @action(methods=["post"], detail=False) def reset_password(self, request): """Reset account password and send a new one by SMS.""" sms_password_recovery = ( request.localconfig.parameters .get_value("sms_password_recovery", app="core") ) if not sms_password_recovery: return Response(status=404) serializer = self.get_serializer(data=request.data) serializer.is_valid(raise_exception=True) user = core_models.User.objects.filter( email=serializer.validated_data["email"]).first() if not user or not user.phone_number: return Response(status=404) backend = sms_backends.get_active_backend( request.localconfig.parameters) if not backend: return Response(status=404) password = lib.make_password() content = _("Here is your new Modoboa password: {}").format( password) if not backend.send(content, [str(user.phone_number)]): body = {"status": "ko"} else: # SMS was sent, now we can set the new password. body = {"status": "ok"} user.set_password(password) user.save(update_fields=["password"]) return Response(body) class AliasViewSet(viewsets.ModelViewSet): """ create: Create a new alias instance. """ permission_classes = [IsAuthenticated, DjangoModelPermissions, ] serializer_class = serializers.AliasSerializer http_method_names = ["get", "post", "put", "delete"] def get_queryset(self): """Filter queryset based on current user.""" user = self.request.user ids = ( user.objectaccess_set.filter( content_type=ContentType.objects.get_for_model(models.Alias)) .values_list("object_id", flat=True) ) queryset = models.Alias.objects.filter(pk__in=ids) domain = self.request.query_params.get("domain") if domain: queryset = queryset.filter(domain__name=domain) return queryset class SenderAddressViewSet(viewsets.ModelViewSet): """View set for SenderAddress model.""" permission_classes = [IsAuthenticated, DjangoModelPermissions, ] serializer_class = serializers.SenderAddressSerializer def get_queryset(self): """Filter queryset based on current user.""" user = self.request.user mb_ids = ( user.objectaccess_set.filter( content_type=ContentType.objects.get_for_model(models.Mailbox)) .values_list("object_id", flat=True) ) return models.SenderAddress.objects.filter(mailbox__pk__in=mb_ids)