Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 52.14.201.216
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /lib/python3/dist-packages/passlib/handlers/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /lib/python3/dist-packages/passlib/handlers//argon2.py
"""passlib.handlers.argon2 -- argon2 password hash wrapper

References
==========
* argon2
    - home: https://github.com/P-H-C/phc-winner-argon2
    - whitepaper: https://github.com/P-H-C/phc-winner-argon2/blob/master/argon2-specs.pdf
* argon2 cffi wrapper
    - pypi: https://pypi.python.org/pypi/argon2_cffi
    - home: https://github.com/hynek/argon2_cffi
* argon2 pure python
    - pypi: https://pypi.python.org/pypi/argon2pure
    - home: https://github.com/bwesterb/argon2pure
"""
#=============================================================================
# imports
#=============================================================================
from __future__ import with_statement, absolute_import
# core
import logging
log = logging.getLogger(__name__)
import re
import types
from warnings import warn
# site
_argon2_cffi = None  # loaded below
_argon2pure = None  # dynamically imported by _load_backend_argon2pure()
# pkg
from passlib import exc
from passlib.crypto.digest import MAX_UINT32
from passlib.utils import classproperty, to_bytes, render_bytes
from passlib.utils.binary import b64s_encode, b64s_decode
from passlib.utils.compat import u, unicode, bascii_to_str, uascii_to_str, PY2
import passlib.utils.handlers as uh
# local
__all__ = [
    "argon2",
]

#=============================================================================
# helpers
#=============================================================================

# NOTE: when adding a new argon2 hash type, need to do the following:
# * add TYPE_XXX constant, and add to ALL_TYPES
# * make sure "_backend_type_map" constructors handle it correctly for all backends
# * make sure _hash_regex & _ident_regex (below) support type string.
# * add reference vectors for testing.

#: argon2 type constants -- subclasses handle mapping these to backend-specific type constants.
#: (should be lowercase, to match representation in hash string)
TYPE_I = u("i")
TYPE_D = u("d")
TYPE_ID = u("id")  # new 2016-10-29; passlib 1.7.2 requires backends new enough for support

#: list of all known types; first (supported) type will be used as default.
ALL_TYPES = (TYPE_ID, TYPE_I, TYPE_D)
ALL_TYPES_SET = set(ALL_TYPES)

#=============================================================================
# import argon2 package (https://pypi.python.org/pypi/argon2_cffi)
#=============================================================================

# import cffi package
# NOTE: we try to do this even if caller is going to use argon2pure,
#       so that we can always use the libargon2 default settings when possible.
_argon2_cffi_error = None
try:
    import argon2 as _argon2_cffi
except ImportError:
    _argon2_cffi = None
else:
    if not hasattr(_argon2_cffi, "Type"):
        # they have incompatible "argon2" package installed, instead of "argon2_cffi" package.
        _argon2_cffi_error = (
            "'argon2' module points to unsupported 'argon2' pypi package; "
            "please install 'argon2-cffi' instead."
        )
        _argon2_cffi = None
    elif not hasattr(_argon2_cffi, "low_level"):
        # they have pre-v16 argon2_cffi package
        _argon2_cffi_error = "'argon2-cffi' is too old, please update to argon2_cffi >= 18.2.0"
        _argon2_cffi = None

# init default settings for our hasher class --
# if we have argon2_cffi >= 16.0, use their default hasher settings, otherwise use static default
if hasattr(_argon2_cffi, "PasswordHasher"):
    # use cffi's default settings
    _default_settings = _argon2_cffi.PasswordHasher()
    _default_version = _argon2_cffi.low_level.ARGON2_VERSION
else:
    # use fallback settings (for no backend, or argon2pure)
    class _DummyCffiHasher:
        """
        dummy object to use as source of defaults when argon2_cffi isn't present.
        this tries to mimic the attributes of ``argon2.PasswordHasher()`` which the rest of
        this module reads.

        .. note:: values last synced w/ argon2 19.2 as of 2019-11-09
        """
        time_cost = 2
        memory_cost = 512
        parallelism = 2
        salt_len = 16
        hash_len = 16
        # NOTE: "type" attribute added in argon2_cffi v18.2; but currently not reading it
        # type = _argon2_cffi.Type.ID

    _default_settings = _DummyCffiHasher()
    _default_version = 0x13  # v1.9

#=============================================================================
# handler
#=============================================================================
class _Argon2Common(uh.SubclassBackendMixin, uh.ParallelismMixin,
                    uh.HasRounds, uh.HasRawSalt, uh.HasRawChecksum,
                    uh.GenericHandler):
    """
    Base class which implements brunt of Argon2 code.
    This is then subclassed by the various backends,
    to override w/ backend-specific methods.

    When a backend is loaded, the bases of the 'argon2' class proper
    are modified to prepend the correct backend-specific subclass.
    """
    #===================================================================
    # class attrs
    #===================================================================

    #------------------------
    # PasswordHash
    #------------------------

    name = "argon2"
    setting_kwds = ("salt",
                    "salt_size",
                    "salt_len",  # 'salt_size' alias for compat w/ argon2 package
                    "rounds",
                    "time_cost",  # 'rounds' alias for compat w/ argon2 package
                    "memory_cost",
                    "parallelism",
                    "digest_size",
                    "hash_len",  # 'digest_size' alias for compat w/ argon2 package
                    "type",  # the type of argon2 hash used
                    )

    # TODO: could support the optional 'data' parameter,
    #       but need to research the uses, what a more descriptive name would be,
    #       and deal w/ fact that argon2_cffi 16.1 doesn't currently support it.
    #       (argon2_pure does though)

    #------------------------
    # GenericHandler
    #------------------------

    # NOTE: ident -- all argon2 hashes start with "$argon2<type>$"
    # XXX: could programmaticaly generate "ident_values" string from ALL_TYPES above

    checksum_size = _default_settings.hash_len

    #: force parsing these kwds
    _always_parse_settings = uh.GenericHandler._always_parse_settings + \
                             ("type",)

    #: exclude these kwds from parsehash() result (most are aliases for other keys)
    _unparsed_settings = uh.GenericHandler._unparsed_settings + \
                         ("salt_len", "time_cost", "hash_len", "digest_size")

    #------------------------
    # HasSalt
    #------------------------
    default_salt_size = _default_settings.salt_len
    min_salt_size = 8
    max_salt_size = MAX_UINT32

    #------------------------
    # HasRounds
    # TODO: once rounds limit logic is factored out,
    #       make 'rounds' and 'cost' an alias for 'time_cost'
    #------------------------
    default_rounds = _default_settings.time_cost
    min_rounds = 1
    max_rounds = MAX_UINT32
    rounds_cost = "linear"

    #------------------------
    # ParalleismMixin
    #------------------------
    max_parallelism = (1 << 24) - 1  # from argon2.h / ARGON2_MAX_LANES

    #------------------------
    # custom
    #------------------------

    #: max version support
    #: NOTE: this is dependant on the backend, and initialized/modified by set_backend()
    max_version = _default_version

    #: minimum version before needs_update() marks the hash; if None, defaults to max_version
    min_desired_version = None

    #: minimum valid memory_cost
    min_memory_cost = 8  # from argon2.h / ARGON2_MIN_MEMORY

    #: maximum number of threads (-1=unlimited);
    #: number of threads used by .hash() will be min(parallelism, max_threads)
    max_threads = -1

    #: global flag signalling argon2pure backend to use threads
    #: rather than subprocesses.
    pure_use_threads = False

    #: internal helper used to store mapping of TYPE_XXX constants -> backend-specific type constants;
    #: this is populated by _load_backend_mixin(); and used to detect which types are supported.
    #: XXX: could expose keys as class-level .supported_types property?
    _backend_type_map = {}

    @classproperty
    def type_values(cls):
        """
        return tuple of types supported by this backend
        
        .. versionadded:: 1.7.2
        """
        cls.get_backend()  # make sure backend is loaded
        return tuple(cls._backend_type_map)

    #===================================================================
    # instance attrs
    #===================================================================

    #: argon2 hash type, one of ALL_TYPES -- class value controls the default
    #: .. versionadded:: 1.7.2
    type = TYPE_ID

    #: parallelism setting -- class value controls the default
    parallelism = _default_settings.parallelism

    #: hash version (int)
    #: NOTE: this is modified by set_backend()
    version = _default_version

    #: memory cost -- class value controls the default
    memory_cost = _default_settings.memory_cost

    @property
    def type_d(self):
        """
        flag indicating a Type D hash

        .. deprecated:: 1.7.2; will be removed in passlib 2.0
        """
        return self.type == TYPE_D

    #: optional secret data
    data = None

    #===================================================================
    # variant constructor
    #===================================================================

    @classmethod
    def using(cls, type=None, memory_cost=None, salt_len=None, time_cost=None, digest_size=None,
              checksum_size=None, hash_len=None, max_threads=None, **kwds):
        # support aliases which match argon2 naming convention
        if time_cost is not None:
            if "rounds" in kwds:
                raise TypeError("'time_cost' and 'rounds' are mutually exclusive")
            kwds['rounds'] = time_cost

        if salt_len is not None:
            if "salt_size" in kwds:
                raise TypeError("'salt_len' and 'salt_size' are mutually exclusive")
            kwds['salt_size'] = salt_len

        if hash_len is not None:
            if digest_size is not None:
                raise TypeError("'hash_len' and 'digest_size' are mutually exclusive")
            digest_size = hash_len

        if checksum_size is not None:
            if digest_size is not None:
                raise TypeError("'checksum_size' and 'digest_size' are mutually exclusive")
            digest_size = checksum_size

        # create variant
        subcls = super(_Argon2Common, cls).using(**kwds)

        # set type
        if type is not None:
            subcls.type = subcls._norm_type(type)

        # set checksum size
        relaxed = kwds.get("relaxed")
        if digest_size is not None:
            if isinstance(digest_size, uh.native_string_types):
                digest_size = int(digest_size)
            # NOTE: this isn't *really* digest size minimum, but want to enforce secure minimum.
            subcls.checksum_size = uh.norm_integer(subcls, digest_size, min=16, max=MAX_UINT32,
                                                   param="digest_size", relaxed=relaxed)

        # set memory cost
        if memory_cost is not None:
            if isinstance(memory_cost, uh.native_string_types):
                memory_cost = int(memory_cost)
            subcls.memory_cost = subcls._norm_memory_cost(memory_cost, relaxed=relaxed)

        # validate constraints
        subcls._validate_constraints(subcls.memory_cost, subcls.parallelism)

        # set max threads
        if max_threads is not None:
            if isinstance(max_threads, uh.native_string_types):
                max_threads = int(max_threads)
            if max_threads < 1 and max_threads != -1:
                raise ValueError("max_threads (%d) must be -1 (unlimited), or at least 1." %
                                 (max_threads,))
            subcls.max_threads = max_threads

        return subcls

    @classmethod
    def _validate_constraints(cls, memory_cost, parallelism):
        # NOTE: this is used by class & instance, hence passing in via arguments.
        #       could switch and make this a hybrid method.
        min_memory_cost = 8 * parallelism
        if memory_cost < min_memory_cost:
            raise ValueError("%s: memory_cost (%d) is too low, must be at least "
                             "8 * parallelism (8 * %d = %d)" %
                             (cls.name, memory_cost,
                              parallelism, min_memory_cost))

    #===================================================================
    # public api
    #===================================================================

    #: shorter version of _hash_regex, used to quickly identify hashes
    _ident_regex = re.compile(r"^\$argon2[a-z]+\$")

    @classmethod
    def identify(cls, hash):
        hash = uh.to_unicode_for_identify(hash)
        return cls._ident_regex.match(hash) is not None

    # hash(), verify(), genhash() -- implemented by backend subclass

    #===================================================================
    # hash parsing / rendering
    #===================================================================

    # info taken from source of decode_string() function in
    # <https://github.com/P-H-C/phc-winner-argon2/blob/master/src/encoding.c>
    #
    # hash format:
    #   $argon2<T>[$v=<num>]$m=<num>,t=<num>,p=<num>[,keyid=<bin>][,data=<bin>][$<bin>[$<bin>]]
    #
    # NOTE: as of 2016-6-17, the official source (above) lists the "keyid" param in the comments,
    #       but the actual source of decode_string & encode_string don't mention it at all.
    #       we're supporting parsing it, but throw NotImplementedError if encountered.
    #
    # sample hashes:
    #    v1.0: '$argon2i$m=512,t=2,p=2$5VtWOO3cGWYQHEMaYGbsfQ$AcmqasQgW/wI6wAHAMk4aQ'
    #    v1.3: '$argon2i$v=19$m=512,t=2,p=2$5VtWOO3cGWYQHEMaYGbsfQ$AcmqasQgW/wI6wAHAMk4aQ'

    #: regex to parse argon hash
    _hash_regex = re.compile(br"""
        ^
        \$argon2(?P<type>[a-z]+)\$
        (?:
            v=(?P<version>\d+)
            \$
        )?
        m=(?P<memory_cost>\d+)
        ,
        t=(?P<time_cost>\d+)
        ,
        p=(?P<parallelism>\d+)
        (?:
            ,keyid=(?P<keyid>[^,$]+)
        )?
        (?:
            ,data=(?P<data>[^,$]+)
        )?
        (?:
            \$
            (?P<salt>[^$]+)
            (?:
                \$
                (?P<digest>.+)
            )?
        )?
        $
    """, re.X)

    @classmethod
    def from_string(cls, hash):
        # NOTE: assuming hash will be unicode, or use ascii-compatible encoding.
        # TODO: switch to working w/ str or unicode
        if isinstance(hash, unicode):
            hash = hash.encode("utf-8")
        if not isinstance(hash, bytes):
            raise exc.ExpectedStringError(hash, "hash")
        m = cls._hash_regex.match(hash)
        if not m:
            raise exc.MalformedHashError(cls)
        type, version, memory_cost, time_cost, parallelism, keyid, data, salt, digest = \
            m.group("type", "version", "memory_cost", "time_cost", "parallelism",
                    "keyid", "data", "salt", "digest")
        if keyid:
            raise NotImplementedError("argon2 'keyid' parameter not supported")
        return cls(
            type=type.decode("ascii"),
            version=int(version) if version else 0x10,
            memory_cost=int(memory_cost),
            rounds=int(time_cost),
            parallelism=int(parallelism),
            salt=b64s_decode(salt) if salt else None,
            data=b64s_decode(data) if data else None,
            checksum=b64s_decode(digest) if digest else None,
        )

    def to_string(self):
        version = self.version
        if version == 0x10:
            vstr = ""
        else:
            vstr = "v=%d$" % version

        data = self.data
        if data:
            kdstr = ",data=" + bascii_to_str(b64s_encode(self.data))
        else:
            kdstr = ""

        # NOTE: 'keyid' param currently not supported
        return "$argon2%s$%sm=%d,t=%d,p=%d%s$%s$%s" % (
            uascii_to_str(self.type),
            vstr, 
            self.memory_cost,
            self.rounds, 
            self.parallelism,
            kdstr,
            bascii_to_str(b64s_encode(self.salt)),
            bascii_to_str(b64s_encode(self.checksum)),
        )

    #===================================================================
    # init
    #===================================================================
    def __init__(self, type=None, type_d=False, version=None, memory_cost=None, data=None, **kwds):

        # handle deprecated kwds
        if type_d:
            warn('argon2 `type_d=True` keyword is deprecated, and will be removed in passlib 2.0; '
                 'please use ``type="d"`` instead')
            assert type is None
            type = TYPE_D

        # TODO: factor out variable checksum size support into a mixin.
        # set checksum size to specific value before _norm_checksum() is called
        checksum = kwds.get("checksum")
        if checksum is not None:
            self.checksum_size = len(checksum)

        # call parent
        super(_Argon2Common, self).__init__(**kwds)

        # init type
        if type is None:
            assert uh.validate_default_value(self, self.type, self._norm_type, param="type")
        else:
            self.type = self._norm_type(type)

        # init version
        if version is None:
            assert uh.validate_default_value(self, self.version, self._norm_version,
                                             param="version")
        else:
            self.version = self._norm_version(version)

        # init memory cost
        if memory_cost is None:
            assert uh.validate_default_value(self, self.memory_cost, self._norm_memory_cost,
                                             param="memory_cost")
        else:
            self.memory_cost = self._norm_memory_cost(memory_cost)

        # init data
        if data is None:
            assert self.data is None
        else:
            if not isinstance(data, bytes):
                raise uh.exc.ExpectedTypeError(data, "bytes", "data")
            self.data = data

    #-------------------------------------------------------------------
    # parameter guards
    #-------------------------------------------------------------------

    @classmethod
    def _norm_type(cls, value):
        # type check
        if not isinstance(value, unicode):
            if PY2 and isinstance(value, bytes):
                value = value.decode('ascii')
            else:
                raise uh.exc.ExpectedTypeError(value, "str", "type")

        # check if type is valid
        if value in ALL_TYPES_SET:
            return value

        # translate from uppercase
        temp = value.lower()
        if temp in ALL_TYPES_SET:
            return temp

        # failure!
        raise ValueError("unknown argon2 hash type: %r" % (value,))

    @classmethod
    def _norm_version(cls, version):
        if not isinstance(version, uh.int_types):
            raise uh.exc.ExpectedTypeError(version, "integer", "version")

        # minimum valid version
        if version < 0x13 and version != 0x10:
            raise ValueError("invalid argon2 hash version: %d" % (version,))

        # check this isn't past backend's max version
        backend = cls.get_backend()
        if version > cls.max_version:
            raise ValueError("%s: hash version 0x%X not supported by %r backend "
                             "(max version is 0x%X); try updating or switching backends" %
                             (cls.name, version, backend, cls.max_version))
        return version

    @classmethod
    def _norm_memory_cost(cls, memory_cost, relaxed=False):
        return uh.norm_integer(cls, memory_cost, min=cls.min_memory_cost,
                               param="memory_cost", relaxed=relaxed)

    #===================================================================
    # digest calculation
    #===================================================================

    # NOTE: _calc_checksum implemented by backend subclass

    @classmethod
    def _get_backend_type(cls, value):
        """
        helper to resolve backend constant from type
        """
        try:
            return cls._backend_type_map[value]
        except KeyError:
            pass
        # XXX: pick better error class?
        msg = "unsupported argon2 hash (type %r not supported by %s backend)" % \
              (value, cls.get_backend())
        raise ValueError(msg)

    #===================================================================
    # hash migration
    #===================================================================

    def _calc_needs_update(self, **kwds):
        cls = type(self)
        if self.type != cls.type:
            return True
        minver = cls.min_desired_version
        if minver is None or minver > cls.max_version:
            minver = cls.max_version
        if self.version < minver:
            # version is too old.
            return True
        if self.memory_cost != cls.memory_cost:
            return True
        if self.checksum_size != cls.checksum_size:
            return True
        return super(_Argon2Common, self)._calc_needs_update(**kwds)
    
    #===================================================================
    # backend loading
    #===================================================================

    _no_backend_suggestion = " -- recommend you install one (e.g. 'pip install argon2_cffi')"

    @classmethod
    def _finalize_backend_mixin(mixin_cls, name, dryrun):
        """
        helper called by from backend mixin classes' _load_backend_mixin() --
        invoked after backend imports have been loaded, and performs
        feature detection & testing common to all backends.
        """
        # check argon2 version
        max_version = mixin_cls.max_version
        assert isinstance(max_version, int) and max_version >= 0x10
        if max_version < 0x13:
            warn("%r doesn't support argon2 v1.3, and should be upgraded" % name,
                 uh.exc.PasslibSecurityWarning)

        # prefer best available type
        for type in ALL_TYPES:
            if type in mixin_cls._backend_type_map:
                mixin_cls.type = type
                break
        else:
            warn("%r lacks support for all known hash types" % name, uh.exc.PasslibRuntimeWarning)
            # NOTE: class will just throw "unsupported argon2 hash" error if they try to use it...
            mixin_cls.type = TYPE_ID

        return True

    @classmethod
    def _adapt_backend_error(cls, err, hash=None, self=None):
        """
        internal helper invoked when backend has hash/verification error;
        used to adapt to passlib message.
        """
        backend = cls.get_backend()

        # parse hash to throw error if format was invalid, parameter out of range, etc.
        if self is None and hash is not None:
            self = cls.from_string(hash)

        # check constraints on parsed object
        # XXX: could move this to __init__, but not needed by needs_update calls
        if self is not None:
            self._validate_constraints(self.memory_cost, self.parallelism)

            # as of cffi 16.1, lacks support in hash_secret(), so genhash() will get here.
            # as of cffi 16.2, support removed from verify_secret() as well.
            if backend == "argon2_cffi" and self.data is not None:
                raise NotImplementedError("argon2_cffi backend doesn't support the 'data' parameter")

        # fallback to reporting a malformed hash
        text = str(err)
        if text not in [
            "Decoding failed"  # argon2_cffi's default message
            ]:
            reason = "%s reported: %s: hash=%r" % (backend, text, hash)
        else:
            reason = repr(hash)
        raise exc.MalformedHashError(cls, reason=reason)

    #===================================================================
    # eoc
    #===================================================================

#-----------------------------------------------------------------------
# stub backend
#-----------------------------------------------------------------------
class _NoBackend(_Argon2Common):
    """
    mixin used before any backend has been loaded.
    contains stubs that force loading of one of the available backends.
    """
    #===================================================================
    # primary methods
    #===================================================================
    @classmethod
    def hash(cls, secret):
        cls._stub_requires_backend()
        return cls.hash(secret)

    @classmethod
    def verify(cls, secret, hash):
        cls._stub_requires_backend()
        return cls.verify(secret, hash)

    @uh.deprecated_method(deprecated="1.7", removed="2.0")
    @classmethod
    def genhash(cls, secret, config):
        cls._stub_requires_backend()
        return cls.genhash(secret, config)

    #===================================================================
    # digest calculation
    #===================================================================
    def _calc_checksum(self, secret):
        # NOTE: since argon2_cffi takes care of rendering hash,
        #       _calc_checksum() is only used by the argon2pure backend.
        self._stub_requires_backend()
        # NOTE: have to use super() here so that we don't recursively
        #       call subclass's wrapped _calc_checksum
        return super(argon2, self)._calc_checksum(secret)

    #===================================================================
    # eoc
    #===================================================================

#-----------------------------------------------------------------------
# argon2_cffi backend
#-----------------------------------------------------------------------
class _CffiBackend(_Argon2Common):
    """
    argon2_cffi backend
    """
    #===================================================================
    # backend loading
    #===================================================================

    @classmethod
    def _load_backend_mixin(mixin_cls, name, dryrun):
        # make sure we write info to base class's __dict__, not that of a subclass
        assert mixin_cls is _CffiBackend

        # we automatically import this at top, so just grab info
        if _argon2_cffi is None:
            if _argon2_cffi_error:
                raise exc.PasslibSecurityError(_argon2_cffi_error)
            return False
        max_version = _argon2_cffi.low_level.ARGON2_VERSION
        log.debug("detected 'argon2_cffi' backend, version %r, with support for 0x%x argon2 hashes",
                  _argon2_cffi.__version__, max_version)

        # build type map
        TypeEnum = _argon2_cffi.Type
        type_map = {}
        for type in ALL_TYPES:
            try:
                type_map[type] = getattr(TypeEnum, type.upper())
            except AttributeError:
                # TYPE_ID support not added until v18.2
                assert type not in (TYPE_I, TYPE_D), "unexpected missing type: %r" % type
        mixin_cls._backend_type_map = type_map

        # set version info, and run common setup
        mixin_cls.version = mixin_cls.max_version = max_version
        return mixin_cls._finalize_backend_mixin(name, dryrun)

    #===================================================================
    # primary methods
    #===================================================================
    @classmethod
    def hash(cls, secret):
        # TODO: add in 'encoding' support once that's finalized in 1.8 / 1.9.
        uh.validate_secret(secret)
        secret = to_bytes(secret, "utf-8")
        # XXX: doesn't seem to be a way to make this honor max_threads
        try:
            return bascii_to_str(_argon2_cffi.low_level.hash_secret(
                type=cls._get_backend_type(cls.type),
                memory_cost=cls.memory_cost,
                time_cost=cls.default_rounds,
                parallelism=cls.parallelism,
                salt=to_bytes(cls._generate_salt()),
                hash_len=cls.checksum_size,
                secret=secret,
            ))
        except _argon2_cffi.exceptions.HashingError as err:
            raise cls._adapt_backend_error(err)

    #: helper for verify() method below -- maps prefixes to type constants
    _byte_ident_map = dict((render_bytes(b"$argon2%s$", type.encode("ascii")), type)
                           for type in ALL_TYPES)

    @classmethod
    def verify(cls, secret, hash):
        # TODO: add in 'encoding' support once that's finalized in 1.8 / 1.9.
        uh.validate_secret(secret)
        secret = to_bytes(secret, "utf-8")
        hash = to_bytes(hash, "ascii")

        # read type from start of hash
        # NOTE: don't care about malformed strings, lowlevel will throw error for us
        type = cls._byte_ident_map.get(hash[:1+hash.find(b"$", 1)], TYPE_I)
        type_code = cls._get_backend_type(type)

        # XXX: doesn't seem to be a way to make this honor max_threads
        try:
            result = _argon2_cffi.low_level.verify_secret(hash, secret, type_code)
            assert result is True
            return True
        except _argon2_cffi.exceptions.VerifyMismatchError:
            return False
        except _argon2_cffi.exceptions.VerificationError as err:
            raise cls._adapt_backend_error(err, hash=hash)

    # NOTE: deprecated, will be removed in 2.0
    @classmethod
    def genhash(cls, secret, config):
        # TODO: add in 'encoding' support once that's finalized in 1.8 / 1.9.
        uh.validate_secret(secret)
        secret = to_bytes(secret, "utf-8")
        self = cls.from_string(config)
        # XXX: doesn't seem to be a way to make this honor max_threads
        try:
            result = bascii_to_str(_argon2_cffi.low_level.hash_secret(
                type=cls._get_backend_type(self.type),
                memory_cost=self.memory_cost,
                time_cost=self.rounds,
                parallelism=self.parallelism,
                salt=to_bytes(self.salt),
                hash_len=self.checksum_size,
                secret=secret,
                version=self.version,
            ))
        except _argon2_cffi.exceptions.HashingError as err:
            raise cls._adapt_backend_error(err, hash=config)
        if self.version == 0x10:
            # workaround: argon2 0x13 always returns "v=" segment, even for 0x10 hashes
            result = result.replace("$v=16$", "$")
        return result

    #===================================================================
    # digest calculation
    #===================================================================
    def _calc_checksum(self, secret):
        raise AssertionError("shouldn't be called under argon2_cffi backend")

    #===================================================================
    # eoc
    #===================================================================

#-----------------------------------------------------------------------
# argon2pure backend
#-----------------------------------------------------------------------
class _PureBackend(_Argon2Common):
    """
    argon2pure backend
    """
    #===================================================================
    # backend loading
    #===================================================================

    @classmethod
    def _load_backend_mixin(mixin_cls, name, dryrun):
        # make sure we write info to base class's __dict__, not that of a subclass
        assert mixin_cls is _PureBackend

        # import argon2pure
        global _argon2pure
        try:
            import argon2pure as _argon2pure
        except ImportError:
            return False

        # get default / max supported version -- added in v1.2.2
        try:
            from argon2pure import ARGON2_DEFAULT_VERSION as max_version
        except ImportError:
            log.warning("detected 'argon2pure' backend, but package is too old "
                        "(passlib requires argon2pure >= 1.2.3)")
            return False

        log.debug("detected 'argon2pure' backend, with support for 0x%x argon2 hashes",
                  max_version)

        if not dryrun:
            warn("Using argon2pure backend, which is 100x+ slower than is required "
                 "for adequate security. Installing argon2_cffi (via 'pip install argon2_cffi') "
                 "is strongly recommended", exc.PasslibSecurityWarning)

        # build type map
        type_map = {}
        for type in ALL_TYPES:
            try:
                type_map[type] = getattr(_argon2pure, "ARGON2" + type.upper())
            except AttributeError:
                # TYPE_ID support not added until v1.3
                assert type not in (TYPE_I, TYPE_D), "unexpected missing type: %r" % type
        mixin_cls._backend_type_map = type_map

        mixin_cls.version = mixin_cls.max_version = max_version
        return mixin_cls._finalize_backend_mixin(name, dryrun)

    #===================================================================
    # primary methods
    #===================================================================

    # NOTE: this backend uses default .hash() & .verify() implementations.

    #===================================================================
    # digest calculation
    #===================================================================
    def _calc_checksum(self, secret):
        # TODO: add in 'encoding' support once that's finalized in 1.8 / 1.9.
        uh.validate_secret(secret)
        secret = to_bytes(secret, "utf-8")
        kwds = dict(
            password=secret,
            salt=self.salt,
            time_cost=self.rounds,
            memory_cost=self.memory_cost,
            parallelism=self.parallelism,
            tag_length=self.checksum_size,
            type_code=self._get_backend_type(self.type),
            version=self.version,
        )
        if self.max_threads > 0:
            kwds['threads'] = self.max_threads
        if self.pure_use_threads:
            kwds['use_threads'] = True
        if self.data:
            kwds['associated_data'] = self.data
        # NOTE: should return raw bytes
        # NOTE: this may raise _argon2pure.Argon2ParameterError,
        #       but it if does that, there's a bug in our own parameter checking code.
        try:
            return _argon2pure.argon2(**kwds)
        except _argon2pure.Argon2Error as err:
            raise self._adapt_backend_error(err, self=self)

    #===================================================================
    # eoc
    #===================================================================

class argon2(_NoBackend, _Argon2Common):
    """
    This class implements the Argon2 password hash [#argon2-home]_, and follows the :ref:`password-hash-api`.

    Argon2 supports a variable-length salt, and variable time & memory cost,
    and a number of other configurable parameters.

    The :meth:`~passlib.ifc.PasswordHash.replace` method accepts the following optional keywords:

    :type type: str
    :param type:
        Specify the type of argon2 hash to generate.
        Can be one of "ID", "I", "D".

        This defaults to "ID" if supported by the backend, otherwise "I".

    :type salt: str
    :param salt:
        Optional salt string.
        If specified, the length must be between 0-1024 bytes.
        If not specified, one will be auto-generated (this is recommended).

    :type salt_size: int
    :param salt_size:
        Optional number of bytes to use when autogenerating new salts.

    :type rounds: int
    :param rounds:
        Optional number of rounds to use.
        This corresponds linearly to the amount of time hashing will take.

    :type time_cost: int
    :param time_cost:
        An alias for **rounds**, for compatibility with underlying argon2 library.

    :param int memory_cost:
        Defines the memory usage in kibibytes.
        This corresponds linearly to the amount of memory hashing will take.

    :param int parallelism:
        Defines the parallelization factor.
        *NOTE: this will affect the resulting hash value.*

    :param int digest_size:
        Length of the digest in bytes.

    :param int max_threads:
        Maximum number of threads that will be used.
        -1 means unlimited; otherwise hashing will use ``min(parallelism, max_threads)`` threads.

        .. note::

            This option is currently only honored by the argon2pure backend.

    :type relaxed: bool
    :param relaxed:
        By default, providing an invalid value for one of the other
        keywords will result in a :exc:`ValueError`. If ``relaxed=True``,
        and the error can be corrected, a :exc:`~passlib.exc.PasslibHashWarning`
        will be issued instead. Correctable errors include ``rounds``
        that are too small or too large, and ``salt`` strings that are too long.

    .. versionchanged:: 1.7.2

        Added the "type" keyword, and support for type "D" and "ID" hashes.
        (Prior versions could verify type "D" hashes, but not generate them).

    .. todo::

        * Support configurable threading limits.
    """
    #=============================================================================
    # backend
    #=============================================================================

    # NOTE: the brunt of the argon2 class is implemented in _Argon2Common.
    #       there are then subclass for each backend (e.g. _PureBackend),
    #       these are dynamically prepended to this class's bases
    #       in order to load the appropriate backend.

    #: list of potential backends
    backends = ("argon2_cffi", "argon2pure")

    #: flag that this class's bases should be modified by SubclassBackendMixin
    _backend_mixin_target = True

    #: map of backend -> mixin class, used by _get_backend_loader()
    _backend_mixin_map = {
        None: _NoBackend,
        "argon2_cffi": _CffiBackend,
        "argon2pure": _PureBackend,
    }

    #=============================================================================
    #
    #=============================================================================

#=============================================================================
# eof
#=============================================================================

Anon7 - 2022
AnonSec Team