Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 18.118.253.124
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /srv/modoboa/env/lib64/python3.5/site-packages/modoboa_stats/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /srv/modoboa/env/lib64/python3.5/site-packages/modoboa_stats//graphics.py
"""Classes to define graphics."""

import inspect
from itertools import chain
import os

from lxml import etree
import six

from django.conf import settings
from django.utils.encoding import smart_bytes, smart_text
from django.utils.translation import ugettext as _, ugettext_lazy

from modoboa.admin import models as admin_models
from modoboa.lib import exceptions
from modoboa.lib.sysutils import exec_cmd
from modoboa.parameters import tools as param_tools


class Curve:

    """Graphic curve.

    Simple way to represent a graphic curve.
    """

    def __init__(self, dsname, color, legend, cfunc="AVERAGE"):
        """Constructor.
        """
        self.dsname = dsname
        self.color = color
        self.legend = legend
        self.cfunc = cfunc

    def to_rrd_command_args(self, rrdfile):
        """Convert this curve to the approriate RRDtool command.

        :param str rrdfile: RRD file name
        :return: a list
        """
        rrdfile = os.path.join(
            param_tools.get_global_parameter("rrd_rootdir"), "%s.rrd" % rrdfile
        )
        return [
            'DEF:%s=%s:%s:%s' %
            (self.dsname, rrdfile, self.dsname, self.cfunc),
            'CDEF:%(ds)spm=%(ds)s,UN,0,%(ds)s,IF,60,*' % {"ds": self.dsname},
            'XPORT:%spm:"%s"' % (self.dsname, self.legend)
        ]


class Graphic:
    """Graphic."""

    def __init__(self):
        """Constructor."""
        self._curves = []
        try:
            order = getattr(self, "order")
        except AttributeError:
            for member in inspect.getmembers(self):
                if isinstance(member[1], Curve):
                    self._curves += [member[1]]
        else:
            for name in order:
                try:
                    curve = getattr(self, name)
                except AttributeError:
                    continue
                if not isinstance(curve, Curve):
                    continue
                self._curves += [curve]

    @property
    def display_name(self):
        return self.__class__.__name__.lower()

    @property
    def rrdtool_binary(self):
        """Return path to rrdtool binary."""
        dpath = None
        code, output = exec_cmd("which rrdtool")
        if not code:
            dpath = output.strip()
        else:
            known_paths = getattr(
                settings, "RRDTOOL_LOOKUP_PATH",
                ("/usr/bin/rrdtool", "/usr/local/bin/rrdtool")
            )
            for fpath in known_paths:
                if os.path.isfile(fpath) and os.access(fpath, os.X_OK):
                    dpath = fpath
        if dpath is None:
            raise exceptions.InternalError(
                _("Failed to locate rrdtool binary."))
        return smart_text(dpath)

    def export(self, rrdfile, start, end):
        """Export data to XML using rrdtool and convert it to JSON."""
        result = []
        cmdargs = []
        for curve in self._curves:
            result += [{
                "name": str(curve.legend),
                "color": curve.color, "data": []
            }]
            cmdargs += curve.to_rrd_command_args(rrdfile)
        code = 0

        cmd = "{} xport --start {} --end {} ".format(
            self.rrdtool_binary, str(start), str(end))
        cmd += " ".join(cmdargs)
        code, output = exec_cmd(smart_bytes(cmd))
        if code:
            return []

        tree = etree.fromstring(output)
        timestamp = int(tree.xpath('/xport/meta/start')[0].text)
        step = int(tree.xpath('/xport/meta/step')[0].text)
        for row in tree.xpath('/xport/data/row'):
            for vindex, value in enumerate(row.findall('v')):
                if value.text == 'NaN':
                    result[vindex]['data'].append({'x': timestamp, 'y': 0})
                else:
                    result[vindex]['data'].append(
                        {'x': timestamp, 'y': float(value.text)}
                    )
            timestamp += step
        return result


class GraphicSet(object):
    """A set of graphics."""

    domain_selector = False
    title = None
    _graphics = []

    def __init__(self, instances=None):
        if instances is None:
            instances = []
        self.__ginstances = instances

    @property
    def html_id(self):
        return self.__class__.__name__.lower()

    @property
    def graphics(self):
        if not self.__ginstances:
            self.__ginstances = [graphic() for graphic in self._graphics]
        return self.__ginstances

    def get_graphic_names(self):
        return [graphic.display_name for graphic in self._graphics]

    def get_file_name(self, request):
        """Return database file name."""
        return self.file_name

    def export(self, rrdfile, start, end):
        result = {}
        for graph in self.graphics:
            result[graph.display_name] = {
                "title": six.text_type(graph.title),
                "curves": graph.export(rrdfile, start, end)
            }
        return result


class AverageTraffic(Graphic):
    """Average traffic."""

    title = ugettext_lazy('Average traffic (msgs/min)')

    # Curve definitions
    sent = Curve("sent", "lawngreen", ugettext_lazy("sent messages"))
    recv = Curve("recv", "steelblue", ugettext_lazy("received messages"))
    bounced = Curve("bounced", "yellow", ugettext_lazy("bounced messages"))
    reject = Curve("reject", "tomato", ugettext_lazy("rejected messages"))
    virus = Curve("virus", "orange", ugettext_lazy("virus messages"))
    spam = Curve("spam", "silver", ugettext_lazy("spam messages"))

    order = ["reject", "bounced", "recv", "sent", "virus", "spam"]

    def __init__(self, greylist=False):
        if greylist:
            self.greylist = Curve(
                "greylist", "dimgrey", ugettext_lazy("greylisted messages"))
            self.order = [
                "reject", "greylist", "bounced", "recv", "sent", "virus",
                "spam"
            ]
        super().__init__()


class AverageTrafficSize(Graphic):
    """Average traffic size."""

    title = ugettext_lazy('Average normal traffic size (bytes/min)')

    # Curve definitions
    size_recv = Curve("size_recv", "orange", ugettext_lazy("received size"))
    size_sent = Curve(
        "size_sent", "mediumturquoise", ugettext_lazy("sent size")
    )


class MailTraffic(GraphicSet):
    """Mail traffic graphic set."""

    domain_selector = True
    title = ugettext_lazy("Mail traffic")
    _graphics = [AverageTraffic, AverageTrafficSize]

    def __init__(self, greylist=False):
        instances = [AverageTraffic(greylist), AverageTrafficSize()]
        super().__init__(instances)

    def _check_domain_access(self, user, pattern):
        """Check if an administrator can access a domain.

        If a non super administrator asks for the global view, we give him
        a view on the first domain he manage instead.

        :return: a domain name (str) or None.
        """
        if pattern in [None, "global"]:
            if not user.is_superuser:
                domains = admin_models.Domain.objects.get_for_admin(user)
                if not domains.exists():
                    return None
                return domains.first().name
            return "global"

        results = list(
            chain(
                admin_models.Domain.objects.filter(name__startswith=pattern),
                admin_models.DomainAlias.objects.filter(
                    name__startswith=pattern)
            )
        )
        if len(results) != 1:
            return None
        if not user.can_access(results[0]):
            raise exceptions.PermDeniedException
        return results[0].name

    def get_file_name(self, request):
        """Retrieve file name according to user and args."""
        searchq = request.GET.get("searchquery", None)
        return self._check_domain_access(request.user, searchq)


class AccountCreationGraphic(Graphic):
    """Account creation over time."""

    title = ugettext_lazy("Average account creation (account/hour)")

    accounts = Curve(
        "new_accounts", "steelblue", ugettext_lazy("New accounts"))


class AccountGraphicSet(GraphicSet):
    """A graphic set for accounts."""

    file_name = "new_accounts"
    title = ugettext_lazy("Accounts")
    _graphics = [AccountCreationGraphic]

Anon7 - 2022
AnonSec Team