Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 18.118.7.18
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /srv/modoboa/env/lib64/python3.5/site-packages/dns/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /srv/modoboa/env/lib64/python3.5/site-packages/dns/dnssec.py
# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license

# Copyright (C) 2003-2017 Nominum, Inc.
#
# Permission to use, copy, modify, and distribute this software and its
# documentation for any purpose with or without fee is hereby granted,
# provided that the above copyright notice and this permission notice
# appear in all copies.
#
# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES
# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR
# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.

"""Common DNSSEC-related functions and constants."""

from io import BytesIO
import struct
import time

import dns.exception
import dns.name
import dns.node
import dns.rdataset
import dns.rdata
import dns.rdatatype
import dns.rdataclass
from ._compat import string_types


class UnsupportedAlgorithm(dns.exception.DNSException):
    """The DNSSEC algorithm is not supported."""


class ValidationFailure(dns.exception.DNSException):
    """The DNSSEC signature is invalid."""


#: RSAMD5
RSAMD5 = 1
#: DH
DH = 2
#: DSA
DSA = 3
#: ECC
ECC = 4
#: RSASHA1
RSASHA1 = 5
#: DSANSEC3SHA1
DSANSEC3SHA1 = 6
#: RSASHA1NSEC3SHA1
RSASHA1NSEC3SHA1 = 7
#: RSASHA256
RSASHA256 = 8
#: RSASHA512
RSASHA512 = 10
#: ECDSAP256SHA256
ECDSAP256SHA256 = 13
#: ECDSAP384SHA384
ECDSAP384SHA384 = 14
#: INDIRECT
INDIRECT = 252
#: PRIVATEDNS
PRIVATEDNS = 253
#: PRIVATEOID
PRIVATEOID = 254

_algorithm_by_text = {
    'RSAMD5': RSAMD5,
    'DH': DH,
    'DSA': DSA,
    'ECC': ECC,
    'RSASHA1': RSASHA1,
    'DSANSEC3SHA1': DSANSEC3SHA1,
    'RSASHA1NSEC3SHA1': RSASHA1NSEC3SHA1,
    'RSASHA256': RSASHA256,
    'RSASHA512': RSASHA512,
    'INDIRECT': INDIRECT,
    'ECDSAP256SHA256': ECDSAP256SHA256,
    'ECDSAP384SHA384': ECDSAP384SHA384,
    'PRIVATEDNS': PRIVATEDNS,
    'PRIVATEOID': PRIVATEOID,
}

# We construct the inverse mapping programmatically to ensure that we
# cannot make any mistakes (e.g. omissions, cut-and-paste errors) that
# would cause the mapping not to be true inverse.

_algorithm_by_value = {y: x for x, y in _algorithm_by_text.items()}


def algorithm_from_text(text):
    """Convert text into a DNSSEC algorithm value.

    Returns an ``int``.
    """

    value = _algorithm_by_text.get(text.upper())
    if value is None:
        value = int(text)
    return value


def algorithm_to_text(value):
    """Convert a DNSSEC algorithm value to text

    Returns a ``str``.
    """

    text = _algorithm_by_value.get(value)
    if text is None:
        text = str(value)
    return text


def _to_rdata(record, origin):
    s = BytesIO()
    record.to_wire(s, origin=origin)
    return s.getvalue()


def key_id(key, origin=None):
    """Return the key id (a 16-bit number) for the specified key.

    Note the *origin* parameter of this function is historical and
    is not needed.

    Returns an ``int`` between 0 and 65535.
    """

    rdata = _to_rdata(key, origin)
    rdata = bytearray(rdata)
    if key.algorithm == RSAMD5:
        return (rdata[-3] << 8) + rdata[-2]
    else:
        total = 0
        for i in range(len(rdata) // 2):
            total += (rdata[2 * i] << 8) + \
                rdata[2 * i + 1]
        if len(rdata) % 2 != 0:
            total += rdata[len(rdata) - 1] << 8
        total += ((total >> 16) & 0xffff)
        return total & 0xffff


def make_ds(name, key, algorithm, origin=None):
    """Create a DS record for a DNSSEC key.

    *name* is the owner name of the DS record.

    *key* is a ``dns.rdtypes.ANY.DNSKEY``.

    *algorithm* is a string describing which hash algorithm to use.  The
    currently supported hashes are "SHA1" and "SHA256".  Case does not
    matter for these strings.

    *origin* is a ``dns.name.Name`` and will be used as the origin
    if *key* is a relative name.

    Returns a ``dns.rdtypes.ANY.DS``.
    """

    if algorithm.upper() == 'SHA1':
        dsalg = 1
        hash = SHA1.new()
    elif algorithm.upper() == 'SHA256':
        dsalg = 2
        hash = SHA256.new()
    else:
        raise UnsupportedAlgorithm('unsupported algorithm "%s"' % algorithm)

    if isinstance(name, string_types):
        name = dns.name.from_text(name, origin)
    hash.update(name.canonicalize().to_wire())
    hash.update(_to_rdata(key, origin))
    digest = hash.digest()

    dsrdata = struct.pack("!HBB", key_id(key), key.algorithm, dsalg) + digest
    return dns.rdata.from_wire(dns.rdataclass.IN, dns.rdatatype.DS, dsrdata, 0,
                               len(dsrdata))


def _find_candidate_keys(keys, rrsig):
    candidate_keys = []
    value = keys.get(rrsig.signer)
    if value is None:
        return None
    if isinstance(value, dns.node.Node):
        try:
            rdataset = value.find_rdataset(dns.rdataclass.IN,
                                           dns.rdatatype.DNSKEY)
        except KeyError:
            return None
    else:
        rdataset = value
    for rdata in rdataset:
        if rdata.algorithm == rrsig.algorithm and \
                key_id(rdata) == rrsig.key_tag:
            candidate_keys.append(rdata)
    return candidate_keys


def _is_rsa(algorithm):
    return algorithm in (RSAMD5, RSASHA1,
                         RSASHA1NSEC3SHA1, RSASHA256,
                         RSASHA512)


def _is_dsa(algorithm):
    return algorithm in (DSA, DSANSEC3SHA1)


def _is_ecdsa(algorithm):
    return _have_ecdsa and (algorithm in (ECDSAP256SHA256, ECDSAP384SHA384))


def _is_md5(algorithm):
    return algorithm == RSAMD5


def _is_sha1(algorithm):
    return algorithm in (DSA, RSASHA1,
                         DSANSEC3SHA1, RSASHA1NSEC3SHA1)


def _is_sha256(algorithm):
    return algorithm in (RSASHA256, ECDSAP256SHA256)


def _is_sha384(algorithm):
    return algorithm == ECDSAP384SHA384


def _is_sha512(algorithm):
    return algorithm == RSASHA512


def _make_hash(algorithm):
    if _is_md5(algorithm):
        return MD5.new()
    if _is_sha1(algorithm):
        return SHA1.new()
    if _is_sha256(algorithm):
        return SHA256.new()
    if _is_sha384(algorithm):
        return SHA384.new()
    if _is_sha512(algorithm):
        return SHA512.new()
    raise ValidationFailure('unknown hash for algorithm %u' % algorithm)


def _make_algorithm_id(algorithm):
    if _is_md5(algorithm):
        oid = [0x2a, 0x86, 0x48, 0x86, 0xf7, 0x0d, 0x02, 0x05]
    elif _is_sha1(algorithm):
        oid = [0x2b, 0x0e, 0x03, 0x02, 0x1a]
    elif _is_sha256(algorithm):
        oid = [0x60, 0x86, 0x48, 0x01, 0x65, 0x03, 0x04, 0x02, 0x01]
    elif _is_sha512(algorithm):
        oid = [0x60, 0x86, 0x48, 0x01, 0x65, 0x03, 0x04, 0x02, 0x03]
    else:
        raise ValidationFailure('unknown algorithm %u' % algorithm)
    olen = len(oid)
    dlen = _make_hash(algorithm).digest_size
    idbytes = [0x30] + [8 + olen + dlen] + \
              [0x30, olen + 4] + [0x06, olen] + oid + \
              [0x05, 0x00] + [0x04, dlen]
    return struct.pack('!%dB' % len(idbytes), *idbytes)


def _validate_rrsig(rrset, rrsig, keys, origin=None, now=None):
    """Validate an RRset against a single signature rdata

    The owner name of *rrsig* is assumed to be the same as the owner name
    of *rrset*.

    *rrset* is the RRset to validate.  It can be a ``dns.rrset.RRset`` or
    a ``(dns.name.Name, dns.rdataset.Rdataset)`` tuple.

    *rrsig* is a ``dns.rdata.Rdata``, the signature to validate.

    *keys* is the key dictionary, used to find the DNSKEY associated with
    a given name.  The dictionary is keyed by a ``dns.name.Name``, and has
    ``dns.node.Node`` or ``dns.rdataset.Rdataset`` values.

    *origin* is a ``dns.name.Name``, the origin to use for relative names.

    *now* is an ``int``, the time to use when validating the signatures,
    in seconds since the UNIX epoch.  The default is the current time.
    """

    if isinstance(origin, string_types):
        origin = dns.name.from_text(origin, dns.name.root)

    candidate_keys = _find_candidate_keys(keys, rrsig)
    if candidate_keys is None:
        raise ValidationFailure('unknown key')

    for candidate_key in candidate_keys:
        # For convenience, allow the rrset to be specified as a (name,
        # rdataset) tuple as well as a proper rrset
        if isinstance(rrset, tuple):
            rrname = rrset[0]
            rdataset = rrset[1]
        else:
            rrname = rrset.name
            rdataset = rrset

        if now is None:
            now = time.time()
        if rrsig.expiration < now:
            raise ValidationFailure('expired')
        if rrsig.inception > now:
            raise ValidationFailure('not yet valid')

        hash = _make_hash(rrsig.algorithm)

        if _is_rsa(rrsig.algorithm):
            keyptr = candidate_key.key
            (bytes_,) = struct.unpack('!B', keyptr[0:1])
            keyptr = keyptr[1:]
            if bytes_ == 0:
                (bytes_,) = struct.unpack('!H', keyptr[0:2])
                keyptr = keyptr[2:]
            rsa_e = keyptr[0:bytes_]
            rsa_n = keyptr[bytes_:]
            try:
                pubkey = CryptoRSA.construct(
                    (number.bytes_to_long(rsa_n),
                     number.bytes_to_long(rsa_e)))
            except ValueError:
                raise ValidationFailure('invalid public key')
            sig = rrsig.signature
        elif _is_dsa(rrsig.algorithm):
            keyptr = candidate_key.key
            (t,) = struct.unpack('!B', keyptr[0:1])
            keyptr = keyptr[1:]
            octets = 64 + t * 8
            dsa_q = keyptr[0:20]
            keyptr = keyptr[20:]
            dsa_p = keyptr[0:octets]
            keyptr = keyptr[octets:]
            dsa_g = keyptr[0:octets]
            keyptr = keyptr[octets:]
            dsa_y = keyptr[0:octets]
            pubkey = CryptoDSA.construct(
                (number.bytes_to_long(dsa_y),
                 number.bytes_to_long(dsa_g),
                 number.bytes_to_long(dsa_p),
                 number.bytes_to_long(dsa_q)))
            sig = rrsig.signature[1:]
        elif _is_ecdsa(rrsig.algorithm):
            # use ecdsa for NIST-384p -- not currently supported by pycryptodome

            keyptr = candidate_key.key

            if rrsig.algorithm == ECDSAP256SHA256:
                curve = ecdsa.curves.NIST256p
                key_len = 32
            elif rrsig.algorithm == ECDSAP384SHA384:
                curve = ecdsa.curves.NIST384p
                key_len = 48

            x = number.bytes_to_long(keyptr[0:key_len])
            y = number.bytes_to_long(keyptr[key_len:key_len * 2])
            if not ecdsa.ecdsa.point_is_valid(curve.generator, x, y):
                raise ValidationFailure('invalid ECDSA key')
            point = ecdsa.ellipticcurve.Point(curve.curve, x, y, curve.order)
            verifying_key = ecdsa.keys.VerifyingKey.from_public_point(point,
                                                                      curve)
            pubkey = ECKeyWrapper(verifying_key, key_len)
            r = rrsig.signature[:key_len]
            s = rrsig.signature[key_len:]
            sig = ecdsa.ecdsa.Signature(number.bytes_to_long(r),
                                        number.bytes_to_long(s))

        else:
            raise ValidationFailure('unknown algorithm %u' % rrsig.algorithm)

        hash.update(_to_rdata(rrsig, origin)[:18])
        hash.update(rrsig.signer.to_digestable(origin))

        if rrsig.labels < len(rrname) - 1:
            suffix = rrname.split(rrsig.labels + 1)[1]
            rrname = dns.name.from_text('*', suffix)
        rrnamebuf = rrname.to_digestable(origin)
        rrfixed = struct.pack('!HHI', rdataset.rdtype, rdataset.rdclass,
                              rrsig.original_ttl)
        rrlist = sorted(rdataset)
        for rr in rrlist:
            hash.update(rrnamebuf)
            hash.update(rrfixed)
            rrdata = rr.to_digestable(origin)
            rrlen = struct.pack('!H', len(rrdata))
            hash.update(rrlen)
            hash.update(rrdata)

        try:
            if _is_rsa(rrsig.algorithm):
                verifier = pkcs1_15.new(pubkey)
                # will raise ValueError if verify fails:
                verifier.verify(hash, sig)
            elif _is_dsa(rrsig.algorithm):
                verifier = DSS.new(pubkey, 'fips-186-3')
                verifier.verify(hash, sig)
            elif _is_ecdsa(rrsig.algorithm):
                digest = hash.digest()
                if not pubkey.verify(digest, sig):
                    raise ValueError
            else:
                # Raise here for code clarity; this won't actually ever happen
                # since if the algorithm is really unknown we'd already have
                # raised an exception above
                raise ValidationFailure('unknown algorithm %u' % rrsig.algorithm)
            # If we got here, we successfully verified so we can return without error
            return
        except ValueError:
            # this happens on an individual validation failure
            continue
    # nothing verified -- raise failure:
    raise ValidationFailure('verify failure')


def _validate(rrset, rrsigset, keys, origin=None, now=None):
    """Validate an RRset.

    *rrset* is the RRset to validate.  It can be a ``dns.rrset.RRset`` or
    a ``(dns.name.Name, dns.rdataset.Rdataset)`` tuple.

    *rrsigset* is the signature RRset to be validated.  It can be a
    ``dns.rrset.RRset`` or a ``(dns.name.Name, dns.rdataset.Rdataset)`` tuple.

    *keys* is the key dictionary, used to find the DNSKEY associated with
    a given name.  The dictionary is keyed by a ``dns.name.Name``, and has
    ``dns.node.Node`` or ``dns.rdataset.Rdataset`` values.

    *origin* is a ``dns.name.Name``, the origin to use for relative names.

    *now* is an ``int``, the time to use when validating the signatures,
    in seconds since the UNIX epoch.  The default is the current time.
    """

    if isinstance(origin, string_types):
        origin = dns.name.from_text(origin, dns.name.root)

    if isinstance(rrset, tuple):
        rrname = rrset[0]
    else:
        rrname = rrset.name

    if isinstance(rrsigset, tuple):
        rrsigname = rrsigset[0]
        rrsigrdataset = rrsigset[1]
    else:
        rrsigname = rrsigset.name
        rrsigrdataset = rrsigset

    rrname = rrname.choose_relativity(origin)
    rrsigname = rrsigname.choose_relativity(origin)
    if rrname != rrsigname:
        raise ValidationFailure("owner names do not match")

    for rrsig in rrsigrdataset:
        try:
            _validate_rrsig(rrset, rrsig, keys, origin, now)
            return
        except ValidationFailure:
            pass
    raise ValidationFailure("no RRSIGs validated")


def _need_pycrypto(*args, **kwargs):
    raise NotImplementedError("DNSSEC validation requires pycryptodome/pycryptodomex")


try:
    try:
        # test we're using pycryptodome, not pycrypto (which misses SHA1 for example)
        from Crypto.Hash import MD5, SHA1, SHA256, SHA384, SHA512
        from Crypto.PublicKey import RSA as CryptoRSA, DSA as CryptoDSA
        from Crypto.Signature import pkcs1_15, DSS
        from Crypto.Util import number
    except ImportError:
        from Cryptodome.Hash import MD5, SHA1, SHA256, SHA384, SHA512
        from Cryptodome.PublicKey import RSA as CryptoRSA, DSA as CryptoDSA
        from Cryptodome.Signature import pkcs1_15, DSS
        from Cryptodome.Util import number
except ImportError:
    validate = _need_pycrypto
    validate_rrsig = _need_pycrypto
    _have_pycrypto = False
    _have_ecdsa = False
else:
    validate = _validate
    validate_rrsig = _validate_rrsig
    _have_pycrypto = True

    try:
        import ecdsa
        import ecdsa.ecdsa
        import ecdsa.ellipticcurve
        import ecdsa.keys
    except ImportError:
        _have_ecdsa = False
    else:
        _have_ecdsa = True

        class ECKeyWrapper(object):

            def __init__(self, key, key_len):
                self.key = key
                self.key_len = key_len

            def verify(self, digest, sig):
                diglong = number.bytes_to_long(digest)
                return self.key.pubkey.verifies(diglong, sig)

Anon7 - 2022
AnonSec Team