| Server IP : 85.214.239.14  /  Your IP : 216.73.216.181 Web Server : Apache/2.4.65 (Debian) System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Mon Sep 30 15:36:27 MSK 2024 x86_64 User : www-data ( 33) PHP Version : 8.2.29 Disable Function : NONE MySQL : OFF | cURL : ON | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : OFF Directory : /proc/3/cwd/proc/3/root/srv/modoboa/env/lib64/python3.5/site-packages/modoboa_stats/ | 
| Upload File : | 
"""Classes to define graphics."""
import inspect
from itertools import chain
import os
from lxml import etree
import six
from django.conf import settings
from django.utils.encoding import smart_bytes, smart_text
from django.utils.translation import ugettext as _, ugettext_lazy
from modoboa.admin import models as admin_models
from modoboa.lib import exceptions
from modoboa.lib.sysutils import exec_cmd
from modoboa.parameters import tools as param_tools
class Curve:
    """Graphic curve.
    Simple way to represent a graphic curve.
    """
    def __init__(self, dsname, color, legend, cfunc="AVERAGE"):
        """Constructor.
        """
        self.dsname = dsname
        self.color = color
        self.legend = legend
        self.cfunc = cfunc
    def to_rrd_command_args(self, rrdfile):
        """Convert this curve to the approriate RRDtool command.
        :param str rrdfile: RRD file name
        :return: a list
        """
        rrdfile = os.path.join(
            param_tools.get_global_parameter("rrd_rootdir"), "%s.rrd" % rrdfile
        )
        return [
            'DEF:%s=%s:%s:%s' %
            (self.dsname, rrdfile, self.dsname, self.cfunc),
            'CDEF:%(ds)spm=%(ds)s,UN,0,%(ds)s,IF,60,*' % {"ds": self.dsname},
            'XPORT:%spm:"%s"' % (self.dsname, self.legend)
        ]
class Graphic:
    """Graphic."""
    def __init__(self):
        """Constructor."""
        self._curves = []
        try:
            order = getattr(self, "order")
        except AttributeError:
            for member in inspect.getmembers(self):
                if isinstance(member[1], Curve):
                    self._curves += [member[1]]
        else:
            for name in order:
                try:
                    curve = getattr(self, name)
                except AttributeError:
                    continue
                if not isinstance(curve, Curve):
                    continue
                self._curves += [curve]
    @property
    def display_name(self):
        return self.__class__.__name__.lower()
    @property
    def rrdtool_binary(self):
        """Return path to rrdtool binary."""
        dpath = None
        code, output = exec_cmd("which rrdtool")
        if not code:
            dpath = output.strip()
        else:
            known_paths = getattr(
                settings, "RRDTOOL_LOOKUP_PATH",
                ("/usr/bin/rrdtool", "/usr/local/bin/rrdtool")
            )
            for fpath in known_paths:
                if os.path.isfile(fpath) and os.access(fpath, os.X_OK):
                    dpath = fpath
        if dpath is None:
            raise exceptions.InternalError(
                _("Failed to locate rrdtool binary."))
        return smart_text(dpath)
    def export(self, rrdfile, start, end):
        """Export data to XML using rrdtool and convert it to JSON."""
        result = []
        cmdargs = []
        for curve in self._curves:
            result += [{
                "name": str(curve.legend),
                "color": curve.color, "data": []
            }]
            cmdargs += curve.to_rrd_command_args(rrdfile)
        code = 0
        cmd = "{} xport --start {} --end {} ".format(
            self.rrdtool_binary, str(start), str(end))
        cmd += " ".join(cmdargs)
        code, output = exec_cmd(smart_bytes(cmd))
        if code:
            return []
        tree = etree.fromstring(output)
        timestamp = int(tree.xpath('/xport/meta/start')[0].text)
        step = int(tree.xpath('/xport/meta/step')[0].text)
        for row in tree.xpath('/xport/data/row'):
            for vindex, value in enumerate(row.findall('v')):
                if value.text == 'NaN':
                    result[vindex]['data'].append({'x': timestamp, 'y': 0})
                else:
                    result[vindex]['data'].append(
                        {'x': timestamp, 'y': float(value.text)}
                    )
            timestamp += step
        return result
class GraphicSet(object):
    """A set of graphics."""
    domain_selector = False
    title = None
    _graphics = []
    def __init__(self, instances=None):
        if instances is None:
            instances = []
        self.__ginstances = instances
    @property
    def html_id(self):
        return self.__class__.__name__.lower()
    @property
    def graphics(self):
        if not self.__ginstances:
            self.__ginstances = [graphic() for graphic in self._graphics]
        return self.__ginstances
    def get_graphic_names(self):
        return [graphic.display_name for graphic in self._graphics]
    def get_file_name(self, request):
        """Return database file name."""
        return self.file_name
    def export(self, rrdfile, start, end):
        result = {}
        for graph in self.graphics:
            result[graph.display_name] = {
                "title": six.text_type(graph.title),
                "curves": graph.export(rrdfile, start, end)
            }
        return result
class AverageTraffic(Graphic):
    """Average traffic."""
    title = ugettext_lazy('Average traffic (msgs/min)')
    # Curve definitions
    sent = Curve("sent", "lawngreen", ugettext_lazy("sent messages"))
    recv = Curve("recv", "steelblue", ugettext_lazy("received messages"))
    bounced = Curve("bounced", "yellow", ugettext_lazy("bounced messages"))
    reject = Curve("reject", "tomato", ugettext_lazy("rejected messages"))
    virus = Curve("virus", "orange", ugettext_lazy("virus messages"))
    spam = Curve("spam", "silver", ugettext_lazy("spam messages"))
    order = ["reject", "bounced", "recv", "sent", "virus", "spam"]
    def __init__(self, greylist=False):
        if greylist:
            self.greylist = Curve(
                "greylist", "dimgrey", ugettext_lazy("greylisted messages"))
            self.order = [
                "reject", "greylist", "bounced", "recv", "sent", "virus",
                "spam"
            ]
        super().__init__()
class AverageTrafficSize(Graphic):
    """Average traffic size."""
    title = ugettext_lazy('Average normal traffic size (bytes/min)')
    # Curve definitions
    size_recv = Curve("size_recv", "orange", ugettext_lazy("received size"))
    size_sent = Curve(
        "size_sent", "mediumturquoise", ugettext_lazy("sent size")
    )
class MailTraffic(GraphicSet):
    """Mail traffic graphic set."""
    domain_selector = True
    title = ugettext_lazy("Mail traffic")
    _graphics = [AverageTraffic, AverageTrafficSize]
    def __init__(self, greylist=False):
        instances = [AverageTraffic(greylist), AverageTrafficSize()]
        super().__init__(instances)
    def _check_domain_access(self, user, pattern):
        """Check if an administrator can access a domain.
        If a non super administrator asks for the global view, we give him
        a view on the first domain he manage instead.
        :return: a domain name (str) or None.
        """
        if pattern in [None, "global"]:
            if not user.is_superuser:
                domains = admin_models.Domain.objects.get_for_admin(user)
                if not domains.exists():
                    return None
                return domains.first().name
            return "global"
        results = list(
            chain(
                admin_models.Domain.objects.filter(name__startswith=pattern),
                admin_models.DomainAlias.objects.filter(
                    name__startswith=pattern)
            )
        )
        if len(results) != 1:
            return None
        if not user.can_access(results[0]):
            raise exceptions.PermDeniedException
        return results[0].name
    def get_file_name(self, request):
        """Retrieve file name according to user and args."""
        searchq = request.GET.get("searchquery", None)
        return self._check_domain_access(request.user, searchq)
class AccountCreationGraphic(Graphic):
    """Account creation over time."""
    title = ugettext_lazy("Average account creation (account/hour)")
    accounts = Curve(
        "new_accounts", "steelblue", ugettext_lazy("New accounts"))
class AccountGraphicSet(GraphicSet):
    """A graphic set for accounts."""
    file_name = "new_accounts"
    title = ugettext_lazy("Accounts")
    _graphics = [AccountCreationGraphic]