Server IP : 85.214.239.14 / Your IP : 18.225.156.91 Web Server : Apache/2.4.62 (Debian) System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64 User : www-data ( 33) PHP Version : 7.4.18 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare, MySQL : OFF | cURL : OFF | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : OFF Directory : /srv/modoboa/env/lib/python3.5/site-packages/oath/ |
Upload File : |
import hmac import hashlib import re import random import string from . import _hotp as hotp, _utils ''' Implementation of OCRA See also http://tools.ietf.org/html/draft-mraihi-mutual-oath-hotp-variants-14 ''' __all__ = ('str2ocrasuite', 'StateException', 'OCRAChallengeResponseServer', 'OCRAChallengeResponseClient', 'OCRAMutualChallengeResponseServer', 'OCRAMutualChallengeResponseClient') def is_int(v): try: int(v) return True except ValueError: return False # Constants PERIODS = { 'H': 3600, 'M': 60, 'S': 1 } HOTP = 'HOTP' OCRA_1 = 'OCRA-1' class CryptoFunction(object): '''Represents an OCRA CryptoFunction specification. :attribute hash_algo: an object implementing the digest interface as given by PEP 247 and the hashlib package :attribute truncation_length: the length to truncate the decimal representation, can be None, in this case no truncation is done. ''' def __init__(self, hash_algo, truncation_length): assert hash_algo assert is_int(truncation_length) or truncation_length is None self.hash_algo = hash_algo self.truncation_length = truncation_length def __call__(self, key, data_input): '''Compute an HOTP digest using the given key and data input and following the current crypto function description. :param key: a byte string containing the HMAC key :param data_input: the data input assembled as a byte-string as described by the OCRA specification :returns: the computed digest :rtype: str ''' h = hmac.new(key, data_input, self.hash_algo).digest() if self.truncation_length: return hotp.dec(h, self.truncation_length) else: return str(hotp.truncated_value(h)) def __str__(self): '''Return the standard representation for the given crypto function. ''' return 'HOTP-%s-%s' % (self.hash_algo.__name__, self.truncation_length) def str2hashalgo(description): '''Convert the name of a hash algorithm as described in the OATH specifications, to a python object handling the digest algorithm interface, PEP-xxx. :param description the name of the hash algorithm, example :rtype: a hash algorithm class constructor ''' algo = getattr(hashlib, description.lower(), None) if not callable(algo): raise ValueError('Unknown hash algorithm %s' % description) return algo def str2cryptofunction(crypto_function_description): ''' Convert an OCRA crypto function description into a CryptoFunction instance :param crypto_function_description: :returns: the CryptoFunction object :rtype: CryptoFunction ''' s = crypto_function_description.split('-') if len(s) != 3: raise ValueError('CryptoFunction description must be triplet separated by -') if s[0] != HOTP: raise ValueError('Unknown CryptoFunction kind %s' % s[0]) algo = str2hashalgo(s[1]) try: truncation_length = int(s[2]) if truncation_length < 0 or truncation_length > 10: raise ValueError() except ValueError: raise ValueError('Invalid truncation length %s' % s[2]) return CryptoFunction(algo, truncation_length) class DataInput(object): ''' OCRA data input description By calling this instance of this class and giving the needed parameter corrresponding to the data input description, it compute a binary string to give to the HMAC algorithme implemented by a CryptoFunction object ''' __slots__ = [ 'C', 'Q', 'P', 'S', 'T' ] def __init__(self, C=None, Q=None, P=None, S=None, T=None): self.C = C self.Q = Q self.P = P self.S = S self.T = T def __call__(self, C=None, Q=None, P=None, P_digest=None, S=None, T=None, T_precomputed=None, Qsc=None): datainput = b'' if self.C: try: C = int(C) if C < 0 or C > 2**64: raise Exception() except: raise ValueError('Invalid counter value %s' % C) datainput += hotp.int2beint64(int(C)) if self.Q: max_length = self.Q[1] if Qsc is not None: # Mutual Challenge-Response Q = Qsc max_length *= 2 if Q is None or not isinstance(Q, str) or len(Q) > max_length: raise ValueError('challenge') if self.Q[0] == 'N' and not Q.isdigit(): raise ValueError('challenge') if self.Q[0] == 'A' and not Q.isalnum(): raise ValueError('challenge') if self.Q[0] == 'H': try: int(Q, 16) except ValueError: raise ValueError('challenge') if self.Q[0] == 'N': Q = '%x' % int(Q) Q += '0' * (len(Q) % 2) Q = _utils.fromhex(Q) if self.Q[0] == 'A': pass if self.Q[0] == 'H': Q = _utils.fromhex(Q) datainput += _utils.tobytes(Q) datainput += _utils.tobytes('\0' * (128-len(Q))) if self.P: if P_digest: if len(P_digest) == self.P.digest_size: datainput += _utils.tobytes(P_digest) elif len(P_digest) == 2*self.P.digest_size: datainput += _utils.fromhex(_utils.tobytes(P_digest)) else: raise ValueError('Pin/Password digest invalid %r' % P_digest) elif P is None: raise ValueError('Pin/Password missing') else: datainput += self.P(_utils.tobytes(P)).digest() if self.S: if S is None or len(S) != self.S: raise ValueError('session') datainput += _utils.tobytes(S) if self.T: if is_int(T_precomputed): datainput += hotp.int2beint64(int(T_precomputed)) elif is_int(T): datainput += hotp.int2beint64(int(T / self.T)) else: raise ValueError('timestamp') return datainput def __str__(self): values = [] for slot in DataInput.__slots__: value = getattr(self, slot, None) if value is not None: values.append('{0}={1}'.format(slot, value)) return '<{0} {1}>'.format(DataInput.__class__.__name__, ', '.join(values)) def str2datainput(datainput_description): elements = datainput_description.split('-') datainputs = {} for element in elements: letter = element[0] if letter in datainputs: raise ValueError('DataInput already present %s %s' % (element, datainput_description)) if letter == 'C': datainputs[letter] = 1 elif letter == 'Q': if len(element) == 1: datainputs[letter] = ('N',8) else: second_letter = element[1] try: if second_letter not in 'ANH': raise ValueError() length = int(element[2:]) if length < 4 or length > 64: raise ValueError() except ValueError: raise ValueError('Invalid challenge descriptor %s' % element) datainputs[letter] = (second_letter, length) elif letter == 'P': algo = str2hashalgo(element[1:] or 'SHA1') datainputs[letter] = algo elif letter == 'S': length = 64 if element[1:]: try: length = int(element[1:]) except ValueError: raise ValueError('Invalid session data descriptor %s' % element) datainputs[letter] = length elif letter == 'T': complement = element[1:] or '1M' try: length = 0 if not re.match(r'^(\d+[HMS])+$', complement): raise ValueError() parts = re.findall(r'\d+[HMS]', complement) for part in parts: period = part[-1] quantity = int(part[:-1]) length += quantity * PERIODS[period] datainputs[letter] = length except ValueError: raise ValueError('Invalid timestamp descriptor %s' % element) else: raise ValueError('Invalid datainput descriptor %s' % element) return DataInput(**datainputs) class OcraSuite(object): def __init__(self, ocrasuite_description, crypto_function, data_input): self.ocrasuite_description = ocrasuite_description self.crypto_function = crypto_function self.data_input = data_input def __call__(self, key, **kwargs): data_input = self.ocrasuite_description.encode('ascii') + b'\0' \ + self.data_input(**kwargs) return self.crypto_function(key, data_input) def accept(self, response, key, **kwargs): return _utils.compare_digest(str(response), self(key, **kwargs)) def __str__(self): return '<OcraSuite crypto_function:%s data_input:%s>' % (self.crypto_function, self.data_input) def str2ocrasuite(ocrasuite_description): elements = ocrasuite_description.split(':') if len(elements) != 3: raise ValueError('Bad OcraSuite description %s' % ocrasuite_description) if elements[0] != OCRA_1: raise ValueError('Unsupported OCRA identifier %s' % elements[0]) crypto_function = str2cryptofunction(elements[1]) data_input = str2datainput(elements[2]) return OcraSuite(ocrasuite_description, crypto_function, data_input) class StateException(Exception): pass DEFAULT_LENGTH = 20 class OCRAChallengeResponse(object): state = 1 def __init__(self, key, ocrasuite_description, remote_ocrasuite_description=None): self.key = key self.ocrasuite = str2ocrasuite(ocrasuite_description) self.remote_ocrasuite = remote_ocrasuite_description is not None \ and str2ocrasuite(remote_ocrasuite_description) if not self.ocrasuite.data_input.Q: raise ValueError('Ocrasuite must have a Q descriptor') def compute_challenge(Q): kind, length = Q try: r = xrange(0, length) except NameError: r = range(0, length) if kind == 'N': c = ''.join([random.choice(string.digits) for i in r]) elif kind == 'A': alphabet = string.digits + string.ascii_letters c = ''.join([random.choice(alphabet) for i in r]) elif kind == 'H': c = ''.join([random.choice(string.hexdigits) for i in r]) else: raise ValueError('Q kind is unknown: %s' % kind) return c class OCRAChallengeResponseServer(OCRAChallengeResponse): SERVER_STATE_COMPUTE_CHALLENGE = 1 SERVER_STATE_VERIFY_RESPONSE = 2 SERVER_STATE_FINISHED = 3 def compute_challenge(self): if self.state != self.SERVER_STATE_COMPUTE_CHALLENGE: raise StateException() ocrasuite = self.remote_ocrasuite or self.ocrasuite self.challenge = compute_challenge(ocrasuite.data_input.Q) self.state = self.SERVER_STATE_VERIFY_RESPONSE return self.challenge def verify_response(self, response, **kwargs): if self.state != self.SERVER_STATE_VERIFY_RESPONSE: return StateException() ocrasuite = self.remote_ocrasuite or self.ocrasuite c = _utils.compare_digest(ocrasuite(self.key, Q=self.challenge, **kwargs), response) if c: self.state = self.SERVER_STATE_FINISHED return c class OCRAChallengeResponseClient(OCRAChallengeResponse): def compute_response(self, challenge, **kwargs): return self.ocrasuite(self.key, Q=challenge, **kwargs) class OCRAMutualChallengeResponseClient(OCRAChallengeResponse): CLIENT_STATE_COMPUTE_CLIENT_CHALLENGE = 1 CLIENT_STATE_VERIFY_SERVER_RESPONSE = 2 CLIENT_STATE_COMPUTE_CLIENT_RESPONSE = 3 CLIENT_STATE_FINISHED = 4 def compute_client_challenge(self, Qc=None): if self.state != self.CLIENT_STATE_COMPUTE_CLIENT_CHALLENGE: raise StateException() ocrasuite = self.remote_ocrasuite or self.ocrasuite self.client_challenge = Qc or compute_challenge(ocrasuite.data_input.Q) self.state = self.CLIENT_STATE_VERIFY_SERVER_RESPONSE return self.client_challenge def verify_server_response(self, response, challenge, **kwargs): if self.state != self.CLIENT_STATE_VERIFY_SERVER_RESPONSE: return StateException() self.server_challenge = challenge q = self.client_challenge+self.server_challenge ocrasuite = self.remote_ocrasuite or self.ocrasuite c = _utils.compare_digest(ocrasuite(self.key, Qsc=q, **kwargs), response) if c: self.state = self.CLIENT_STATE_COMPUTE_CLIENT_RESPONSE return c def compute_client_response(self, **kwargs): if self.state != self.CLIENT_STATE_COMPUTE_CLIENT_RESPONSE: return StateException() q = self.server_challenge+self.client_challenge rc = self.ocrasuite(self.key, Qsc=q, **kwargs) self.state = self.CLIENT_STATE_FINISHED return rc class OCRAMutualChallengeResponseServer(OCRAChallengeResponse): SERVER_STATE_COMPUTE_SERVER_RESPONSE = 1 SERVER_STATE_VERIFY_CLIENT_RESPONSE = 2 SERVER_STATE_FINISHED = 3 def compute_server_response(self, challenge, Qs=None, **kwargs): if self.state != self.SERVER_STATE_COMPUTE_SERVER_RESPONSE: raise StateException() self.client_challenge = challenge self.server_challenge = Qs or compute_challenge(self.ocrasuite.data_input.Q) q = self.client_challenge+self.server_challenge # no need for pin with server mode kwargs.pop('P', None) kwargs.pop('P_digest', None) rs = self.ocrasuite(self.key, Qsc=q, **kwargs) self.state = self.SERVER_STATE_VERIFY_CLIENT_RESPONSE return rs, self.server_challenge def verify_client_response(self, response, **kwargs): if self.state != self.SERVER_STATE_VERIFY_CLIENT_RESPONSE: raise StateException() q = self.server_challenge+self.client_challenge ocrasuite = self.remote_ocrasuite or self.ocrasuite c = _utils.compare_digest(ocrasuite(self.key, Qsc=q, **kwargs), response) if c: self.state = self.SERVER_STATE_FINISHED return c