Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 3.145.83.96
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /srv/modoboa/env/lib64/python3.5/site-packages/modoboa/admin/tests/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /srv/modoboa/env/lib64/python3.5/site-packages/modoboa/admin/tests/test_import_.py
import os
from unittest import mock

import dns.resolver

from django.core.files.base import ContentFile
from django.core.management import call_command
from django.core.management.base import CommandError
from django.urls import reverse

from modoboa.core import factories as core_factories
from modoboa.core.models import User
from modoboa.lib.tests import ModoTestCase
from . import utils
from .. import factories
from ..models import Alias, Domain, DomainAlias


class ImportTestCase(ModoTestCase):

    @classmethod
    def setUpTestData(cls):  # NOQA:N802
        """Create test data."""
        super(ImportTestCase, cls).setUpTestData()
        cls.localconfig.parameters.set_value(
            "enable_admin_limits", False, app="limits")
        cls.localconfig.save()
        factories.populate_database()

    def test_domains_import(self):
        response = self.client.get(reverse("admin:domain_import"))
        self.assertEqual(response.status_code, 200)
        self.assertIn("Provide a CSV", response.content.decode())

        f = ContentFile(b"""domain; domain1.com; 1000; 100; True
domain; domain2.com; 1000; 200; False
domainalias; domalias1.com; domain1.com; True
""", name="domains.csv")
        self.client.post(
            reverse("admin:domain_import"), {
                "sourcefile": f
            }
        )
        admin = User.objects.get(username="admin")
        dom = Domain.objects.get(name="domain1.com")
        self.assertEqual(dom.quota, 1000)
        self.assertEqual(dom.default_mailbox_quota, 100)
        self.assertTrue(dom.enabled)
        self.assertTrue(admin.is_owner(dom))
        domalias = DomainAlias.objects.get(name="domalias1.com")
        self.assertEqual(domalias.target, dom)
        self.assertTrue(dom.enabled)
        self.assertTrue(admin.is_owner(domalias))
        dom = Domain.objects.get(name="domain2.com")
        self.assertEqual(dom.default_mailbox_quota, 200)
        self.assertFalse(dom.enabled)
        self.assertTrue(admin.is_owner(dom))

    def test_domain_import_bad_syntax(self):
        """Check errors handling."""
        url = reverse("admin:domain_import")
        f = ContentFile("domain; domain1.com; 100; True",
                        name="domains.csv")
        response = self.client.post(url, {"sourcefile": f})
        self.assertContains(response, "Invalid line")
        f = ContentFile("domain; domain1.com; XX; 100; True",
                        name="domains.csv")
        response = self.client.post(url, {"sourcefile": f})
        self.assertContains(response, "Invalid quota value")
        f = ContentFile("domain; domain1.com; 100; XX; True",
                        name="domains.csv")
        response = self.client.post(url, {"sourcefile": f})
        self.assertContains(response, "Invalid default mailbox quota")
        f = ContentFile("domain; domain1.com; 10; 100; True",
                        name="domains.csv")
        response = self.client.post(url, {"sourcefile": f})
        self.assertContains(
            response,
            "Default mailbox quota cannot be greater than domain quota")

    @mock.patch.object(dns.resolver.Resolver, "query")
    @mock.patch("socket.getaddrinfo")
    def test_domain_import_with_mx_check(self, mock_getaddrinfo, mock_query):
        """Check domain import when MX check is enabled."""
        reseller = core_factories.UserFactory(
            username="reseller", groups=("Resellers", ))
        self.client.force_login(reseller)
        self.set_global_parameter("valid_mxs", "192.0.2.1 2001:db8::1")
        self.set_global_parameter("domains_must_have_authorized_mx", True)

        mock_query.side_effect = utils.mock_dns_query_result
        mock_getaddrinfo.side_effect = utils.mock_ip_query_result
        f = ContentFile(
            b"domain; test3.com; 100; 1; True", name="domains.csv")
        resp = self.client.post(
            reverse("admin:domain_import"), {
                "sourcefile": f
            }
        )
        self.assertContains(resp, "No authorized MX record found for domain")

        mock_getaddrinfo.side_effect = utils.mock_ip_query_result
        f = ContentFile(
            b"domain; domain1.com; 100; 1; True", name="domains.csv")
        resp = self.client.post(
            reverse("admin:domain_import"), {
                "sourcefile": f
            }
        )
        self.assertTrue(
            Domain.objects.filter(name="domain1.com").exists())

    def test_import_domains_with_conflict(self):
        f = ContentFile(b"""domain;test.alias;100;10;True
domainalias;test.alias;test.com;True
""", name="domains.csv")
        resp = self.client.post(
            reverse("admin:domain_import"), {
                "sourcefile": f
            }
        )
        self.assertIn(
            "Object already exists: domainalias", resp.content.decode())

    def test_identities_import(self):
        response = self.client.get(reverse("admin:identity_import"))
        self.assertEqual(response.status_code, 200)
        self.assertIn("Provide a CSV", response.content.decode())

        f = ContentFile("""
account; user1@test.com; toto; User; One; True; SimpleUsers; user1@test.com; 0
account; Truc@test.com; toto; René; Truc; True; DomainAdmins; truc@test.com; 5; test.com
alias; alias1@test.com; True; user1@test.com
forward; alias2@test.com; True; user1+ext@test.com
forward; fwd1@test.com; True; user@extdomain.com
dlist; dlist@test.com; True; user1@test.com; user@extdomain.com
""", name="identities.csv")  # NOQA:E501
        self.client.post(
            reverse("admin:identity_import"),
            {"sourcefile": f, "crypt_password": True}
        )
        admin = User.objects.get(username="admin")
        u1 = User.objects.get(username="user1@test.com")
        mb1 = u1.mailbox
        self.assertTrue(admin.is_owner(u1))
        self.assertEqual(u1.email, "user1@test.com")
        self.assertEqual(u1.first_name, "User")
        self.assertEqual(u1.last_name, "One")
        self.assertTrue(u1.is_active)
        self.assertEqual(u1.role, "SimpleUsers")
        self.assertTrue(mb1.use_domain_quota)
        self.assertEqual(mb1.quota, 0)
        self.assertTrue(admin.is_owner(mb1))
        self.assertEqual(mb1.full_address, "user1@test.com")
        self.assertTrue(
            self.client.login(username="user1@test.com", password="toto")
        )

        da = User.objects.get(username="truc@test.com")
        damb = da.mailbox
        self.assertEqual(da.first_name, u"René")
        self.assertEqual(da.role, "DomainAdmins")
        self.assertEqual(damb.quota, 5)
        self.assertFalse(damb.use_domain_quota)
        self.assertEqual(damb.full_address, "truc@test.com")
        dom = Domain.objects.get(name="test.com")
        self.assertIn(da, dom.admins)
        u = User.objects.get(username="user@test.com")
        self.assertTrue(da.can_access(u))

        al = Alias.objects.get(address="alias1@test.com")
        self.assertTrue(
            al.aliasrecipient_set
            .filter(r_mailbox=u1.mailbox).exists()
        )
        self.assertTrue(admin.is_owner(al))

        fwd = Alias.objects.get(address="fwd1@test.com")
        self.assertTrue(
            fwd.aliasrecipient_set
            .filter(
                address="user@extdomain.com", r_mailbox__isnull=True,
                r_alias__isnull=True)
            .exists()
        )
        self.assertTrue(admin.is_owner(fwd))

        dlist = Alias.objects.get(address="dlist@test.com")
        self.assertTrue(
            dlist.aliasrecipient_set
            .filter(r_mailbox=u1.mailbox).exists()
        )
        self.assertTrue(
            dlist.aliasrecipient_set.filter(address="user@extdomain.com")
            .exists()
        )
        self.assertTrue(admin.is_owner(dlist))

    def test_import_for_nonlocal_domain(self):
        """Try to import an account for nonlocal domain."""
        f = ContentFile(b"""
account; user1@nonlocal.com; toto; User; One; True; SimpleUsers; user1@nonlocal.com; 0
""", name="identities.csv")  # NOQA:E501
        self.client.post(
            reverse("admin:identity_import"),
            {"sourcefile": f, "crypt_password": True}
        )
        self.assertFalse(
            User.objects.filter(username="user1@nonlocal.com").exists())

    def test_import_invalid_quota(self):
        f = ContentFile(b"""
account; user1@test.com; toto; User; One; True; SimpleUsers; user1@test.com; ; test.com
""", name="identities.csv")  # NOQA:E501
        resp = self.client.post(
            reverse("admin:identity_import"),
            {"sourcefile": f, "crypt_password": True}
        )
        self.assertIn("wrong quota value", resp.content.decode())

    def test_import_domain_by_domainadmin(self):
        """Check if a domain admin is not allowed to import a domain."""
        self.client.logout()
        self.client.login(username="admin@test.com", password="toto")
        f = ContentFile(b"""
domain; domain2.com; 1000; 200; False
""", name="identities.csv")
        resp = self.client.post(
            reverse("admin:identity_import"),
            {"sourcefile": f, "crypt_password": True}
        )
        self.assertIn(
            "You are not allowed to import domains",
            resp.content.decode())
        f = ContentFile(b"""
domainalias; domalias1.com; test.com; True
""", name="identities.csv")
        resp = self.client.post(
            reverse("admin:identity_import"),
            {"sourcefile": f, "crypt_password": True}
        )
        self.assertIn(
            "You are not allowed to import domain aliases",
            resp.content.decode())

    def test_import_quota_too_big(self):
        self.client.logout()
        self.client.login(username="admin@test.com", password="toto")
        f = ContentFile(b"""
account; user1@test.com; toto; User; One; True; SimpleUsers; user1@test.com; 40
""", name="identities.csv")
        resp = self.client.post(
            reverse("admin:identity_import"),
            {"sourcefile": f, "crypt_password": True}
        )
        self.assertIn("Domain quota exceeded", resp.content.decode())

    def test_import_missing_quota(self):
        f = ContentFile(b"""
account; user1@test.com; toto; User; One; True; SimpleUsers; user1@test.com
""", name="identities.csv")
        self.client.post(
            reverse("admin:identity_import"),
            {"sourcefile": f, "crypt_password": True}
        )
        account = User.objects.get(username="user1@test.com")
        self.assertEqual(
            account.mailbox.quota,
            account.mailbox.domain.default_mailbox_quota
        )

    def test_import_duplicate(self):
        f = ContentFile("""
account; admin@test.com; toto; Admin; ; True; DomainAdmins; admin@test.com; 0; test.com
account; truc@test.com; toto; René; Truc; True; DomainAdmins; truc@test.com; 0; test.com
""", name="identities.csv")  # NOQA:E501
        self.client.post(
            reverse("admin:identity_import"),
            {"sourcefile": f, "crypt_password": True,
             "continue_if_exists": True}
        )
        admin = User.objects.get(username="admin")
        u1 = User.objects.get(username="truc@test.com")
        self.assertTrue(admin.is_owner(u1))

    def test_import_superadmin(self):
        """Check if a domain admin can import a superadmin

        Expected result: no
        """
        self.client.logout()
        self.assertTrue(
            self.client.login(username="admin@test.com", password="toto")
        )
        f = ContentFile(b"""
account; sa@test.com; toto; Super; Admin; True; SuperAdmins; superadmin@test.com; 50
""", name="identities.csv")  # NOQA:E501
        self.client.post(
            reverse("admin:identity_import"),
            {"sourcefile": f, "crypt_password": True,
             "continue_if_exists": True}
        )
        with self.assertRaises(User.DoesNotExist):
            User.objects.get(username="sa@test.com")

    def test_import_alias_with_empty_values(self):
        f = ContentFile(b"""
alias;user.alias@test.com;True;user@test.com;;;;;;;;;;;;;;;;
""", name="identities.csv")
        self.client.post(
            reverse("admin:identity_import"),
            {"sourcefile": f, "crypt_password": True,
             "continue_if_exists": True}
        )
        alias = Alias.objects.get(address="user.alias@test.com")
        self.assertEqual(alias.type, "alias")

    def test_import_account_alias_conflict(self):
        """Specific test for #1144."""
        f = ContentFile(b"""
alias;user@test.com;True;admin@test.com
""", name="identities.csv")
        self.client.post(
            reverse("admin:identity_import"),
            {"sourcefile": f, "crypt_password": True}
        )
        self.assertTrue(
            Alias.objects.filter(
                address="user@test.com", internal=False).exists())

    def test_domains_import_utf8(self):
        response = self.client.get(reverse("admin:domain_import"))
        self.assertEqual(response.status_code, 200)
        self.assertIn("Provide a CSV", response.content.decode())

        f = ContentFile("""domain; dómªin1.com; 1000; 100; True
dómain; dómªin2.com; 1000; 100; True
""".encode("utf8"), name="dómains.csv")
        self.client.post(
            reverse("admin:domain_import"), {
                "sourcefile": f
            }
        )
        admin = User.objects.get(username="admin")
        dom = Domain.objects.get(name="dómªin1.com")
        self.assertEqual(dom.quota, 1000)
        self.assertEqual(dom.default_mailbox_quota, 100)
        self.assertTrue(dom.enabled)
        self.assertTrue(admin.is_owner(dom))

    def test_import_command_missing_file(self):
        with mock.patch("os.path.isfile") as mock_isfile:
            mock_isfile.return_value = False
            with self.assertRaises(CommandError) as cm:
                call_command("modo", "import", "test.csv")
                ex_message = cm.exception.messages
                self.assertEqual(ex_message, "File not found")

    def test_import_command(self):
        test_file = os.path.join(
            os.path.dirname(__file__),
            "test_data/import_domains.csv"
        )
        call_command("modo", "import", test_file)

        admin = User.objects.get(username="admin")

        dom = Domain.objects.get(name="domain1.com")
        self.assertEqual(dom.quota, 1000)
        self.assertEqual(dom.default_mailbox_quota, 100)
        self.assertTrue(dom.enabled)
        self.assertTrue(admin.is_owner(dom))

        dom = Domain.objects.get(name="dómain2.com")
        self.assertEqual(dom.quota, 2000)
        self.assertEqual(dom.default_mailbox_quota, 200)
        self.assertTrue(dom.enabled)
        self.assertTrue(admin.is_owner(dom))

    def test_import_command_non_utf8(self):
        test_file = os.path.join(
            os.path.dirname(__file__),
            "test_data/import_domains_iso8859.csv"
        )
        call_command("modo", "import", test_file)
        dom = Domain.objects.get(name="dómain2.com")
        self.assertEqual(dom.quota, 2000)

    def test_import_command_duplicates(self):
        test_file = os.path.join(
            os.path.dirname(__file__),
            "test_data/import_domains_duplicates.csv"
        )
        with self.assertRaises(CommandError) as cm:
            call_command("modo", "import", test_file)
            ex_message = cm.exception.messages
            self.assertTrue(ex_message.startswith("Object already exists: "))

    def test_import_command_duplicates_continue(self):
        test_file = os.path.join(
            os.path.dirname(__file__),
            "test_data/import_domains_duplicates.csv"
        )
        call_command("modo", "import", "--continue-if-exists", test_file)

        admin = User.objects.get(username="admin")

        dom = Domain.objects.get(name="domain1.com")
        self.assertEqual(dom.quota, 1000)
        self.assertEqual(dom.default_mailbox_quota, 100)
        self.assertTrue(dom.enabled)
        self.assertTrue(admin.is_owner(dom))

        dom = Domain.objects.get(name="dómain2.com")
        self.assertEqual(dom.quota, 2000)
        self.assertEqual(dom.default_mailbox_quota, 200)
        self.assertTrue(dom.enabled)
        self.assertTrue(admin.is_owner(dom))

Anon7 - 2022
AnonSec Team