Server IP : 85.214.239.14 / Your IP : 3.138.137.114 Web Server : Apache/2.4.62 (Debian) System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64 User : www-data ( 33) PHP Version : 7.4.18 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare, MySQL : OFF | cURL : OFF | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : OFF Directory : /lib/python3/dist-packages/cryptography/hazmat/primitives/ciphers/ |
Upload File : |
# This file is dual licensed under the terms of the Apache License, Version # 2.0, and the BSD License. See the LICENSE file in the root of this repository # for complete details. import abc import typing from cryptography.exceptions import ( AlreadyFinalized, AlreadyUpdated, NotYetFinalized, ) from cryptography.hazmat.primitives._cipheralgorithm import CipherAlgorithm from cryptography.hazmat.primitives.ciphers import modes if typing.TYPE_CHECKING: from cryptography.hazmat.backends.openssl.ciphers import ( _CipherContext as _BackendCipherContext, ) class CipherContext(metaclass=abc.ABCMeta): @abc.abstractmethod def update(self, data: bytes) -> bytes: """ Processes the provided bytes through the cipher and returns the results as bytes. """ @abc.abstractmethod def update_into(self, data: bytes, buf: bytes) -> int: """ Processes the provided bytes and writes the resulting data into the provided buffer. Returns the number of bytes written. """ @abc.abstractmethod def finalize(self) -> bytes: """ Returns the results of processing the final block as bytes. """ class AEADCipherContext(CipherContext, metaclass=abc.ABCMeta): @abc.abstractmethod def authenticate_additional_data(self, data: bytes) -> None: """ Authenticates the provided bytes. """ class AEADDecryptionContext(AEADCipherContext, metaclass=abc.ABCMeta): @abc.abstractmethod def finalize_with_tag(self, tag: bytes) -> bytes: """ Returns the results of processing the final block as bytes and allows delayed passing of the authentication tag. """ class AEADEncryptionContext(AEADCipherContext, metaclass=abc.ABCMeta): @abc.abstractproperty def tag(self) -> bytes: """ Returns tag bytes. This is only available after encryption is finalized. """ Mode = typing.TypeVar( "Mode", bound=typing.Optional[modes.Mode], covariant=True ) class Cipher(typing.Generic[Mode]): def __init__( self, algorithm: CipherAlgorithm, mode: Mode, backend: typing.Any = None, ): if not isinstance(algorithm, CipherAlgorithm): raise TypeError("Expected interface of CipherAlgorithm.") if mode is not None: # mypy needs this assert to narrow the type from our generic # type. Maybe it won't some time in the future. assert isinstance(mode, modes.Mode) mode.validate_for_algorithm(algorithm) self.algorithm = algorithm self.mode = mode @typing.overload def encryptor( self: "Cipher[modes.ModeWithAuthenticationTag]", ) -> AEADEncryptionContext: ... @typing.overload def encryptor( self: "_CIPHER_TYPE", ) -> CipherContext: ... def encryptor(self): if isinstance(self.mode, modes.ModeWithAuthenticationTag): if self.mode.tag is not None: raise ValueError( "Authentication tag must be None when encrypting." ) from cryptography.hazmat.backends.openssl.backend import backend ctx = backend.create_symmetric_encryption_ctx( self.algorithm, self.mode ) return self._wrap_ctx(ctx, encrypt=True) @typing.overload def decryptor( self: "Cipher[modes.ModeWithAuthenticationTag]", ) -> AEADDecryptionContext: ... @typing.overload def decryptor( self: "_CIPHER_TYPE", ) -> CipherContext: ... def decryptor(self): from cryptography.hazmat.backends.openssl.backend import backend ctx = backend.create_symmetric_decryption_ctx( self.algorithm, self.mode ) return self._wrap_ctx(ctx, encrypt=False) def _wrap_ctx( self, ctx: "_BackendCipherContext", encrypt: bool ) -> typing.Union[ AEADEncryptionContext, AEADDecryptionContext, CipherContext ]: if isinstance(self.mode, modes.ModeWithAuthenticationTag): if encrypt: return _AEADEncryptionContext(ctx) else: return _AEADDecryptionContext(ctx) else: return _CipherContext(ctx) _CIPHER_TYPE = Cipher[ typing.Union[ modes.ModeWithNonce, modes.ModeWithTweak, None, modes.ECB, modes.ModeWithInitializationVector, ] ] class _CipherContext(CipherContext): _ctx: typing.Optional["_BackendCipherContext"] def __init__(self, ctx: "_BackendCipherContext") -> None: self._ctx = ctx def update(self, data: bytes) -> bytes: if self._ctx is None: raise AlreadyFinalized("Context was already finalized.") return self._ctx.update(data) def update_into(self, data: bytes, buf: bytes) -> int: if self._ctx is None: raise AlreadyFinalized("Context was already finalized.") return self._ctx.update_into(data, buf) def finalize(self) -> bytes: if self._ctx is None: raise AlreadyFinalized("Context was already finalized.") data = self._ctx.finalize() self._ctx = None return data class _AEADCipherContext(AEADCipherContext): _ctx: typing.Optional["_BackendCipherContext"] _tag: typing.Optional[bytes] def __init__(self, ctx: "_BackendCipherContext") -> None: self._ctx = ctx self._bytes_processed = 0 self._aad_bytes_processed = 0 self._tag = None self._updated = False def _check_limit(self, data_size: int) -> None: if self._ctx is None: raise AlreadyFinalized("Context was already finalized.") self._updated = True self._bytes_processed += data_size if self._bytes_processed > self._ctx._mode._MAX_ENCRYPTED_BYTES: raise ValueError( "{} has a maximum encrypted byte limit of {}".format( self._ctx._mode.name, self._ctx._mode._MAX_ENCRYPTED_BYTES ) ) def update(self, data: bytes) -> bytes: self._check_limit(len(data)) # mypy needs this assert even though _check_limit already checked assert self._ctx is not None return self._ctx.update(data) def update_into(self, data: bytes, buf: bytes) -> int: self._check_limit(len(data)) # mypy needs this assert even though _check_limit already checked assert self._ctx is not None return self._ctx.update_into(data, buf) def finalize(self) -> bytes: if self._ctx is None: raise AlreadyFinalized("Context was already finalized.") data = self._ctx.finalize() self._tag = self._ctx.tag self._ctx = None return data def authenticate_additional_data(self, data: bytes) -> None: if self._ctx is None: raise AlreadyFinalized("Context was already finalized.") if self._updated: raise AlreadyUpdated("Update has been called on this context.") self._aad_bytes_processed += len(data) if self._aad_bytes_processed > self._ctx._mode._MAX_AAD_BYTES: raise ValueError( "{} has a maximum AAD byte limit of {}".format( self._ctx._mode.name, self._ctx._mode._MAX_AAD_BYTES ) ) self._ctx.authenticate_additional_data(data) class _AEADDecryptionContext(_AEADCipherContext, AEADDecryptionContext): def finalize_with_tag(self, tag: bytes) -> bytes: if self._ctx is None: raise AlreadyFinalized("Context was already finalized.") data = self._ctx.finalize_with_tag(tag) self._tag = self._ctx.tag self._ctx = None return data class _AEADEncryptionContext(_AEADCipherContext, AEADEncryptionContext): @property def tag(self) -> bytes: if self._ctx is not None: raise NotYetFinalized( "You must finalize encryption before " "getting the tag." ) assert self._tag is not None return self._tag