Server IP : 85.214.239.14 / Your IP : 3.21.105.21 Web Server : Apache/2.4.62 (Debian) System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64 User : www-data ( 33) PHP Version : 7.4.18 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare, MySQL : OFF | cURL : OFF | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : OFF Directory : /proc/3/root/proc/2/task/2/root/proc/2/task/2/root/lib/python3/dist-packages/httpx/ |
Upload File : |
import os import ssl import sys import typing from pathlib import Path import certifi from ._compat import set_minimum_tls_version_1_2 from ._models import Headers from ._types import CertTypes, HeaderTypes, TimeoutTypes, URLTypes, VerifyTypes from ._urls import URL from ._utils import get_ca_bundle_from_env, get_logger DEFAULT_CIPHERS = ":".join( [ "ECDHE+AESGCM", "ECDHE+CHACHA20", "DHE+AESGCM", "DHE+CHACHA20", "ECDH+AESGCM", "DH+AESGCM", "ECDH+AES", "DH+AES", "RSA+AESGCM", "RSA+AES", "!aNULL", "!eNULL", "!MD5", "!DSS", ] ) logger = get_logger(__name__) class UnsetType: pass # pragma: no cover UNSET = UnsetType() def create_ssl_context( cert: typing.Optional[CertTypes] = None, verify: VerifyTypes = True, trust_env: bool = True, http2: bool = False, ) -> ssl.SSLContext: return SSLConfig( cert=cert, verify=verify, trust_env=trust_env, http2=http2 ).ssl_context class SSLConfig: """ SSL Configuration. """ DEFAULT_CA_BUNDLE_PATH = Path(certifi.where()) def __init__( self, *, cert: typing.Optional[CertTypes] = None, verify: VerifyTypes = True, trust_env: bool = True, http2: bool = False, ): self.cert = cert self.verify = verify self.trust_env = trust_env self.http2 = http2 self.ssl_context = self.load_ssl_context() def load_ssl_context(self) -> ssl.SSLContext: logger.trace( f"load_ssl_context " f"verify={self.verify!r} " f"cert={self.cert!r} " f"trust_env={self.trust_env!r} " f"http2={self.http2!r}" ) if self.verify: return self.load_ssl_context_verify() return self.load_ssl_context_no_verify() def load_ssl_context_no_verify(self) -> ssl.SSLContext: """ Return an SSL context for unverified connections. """ context = self._create_default_ssl_context() context.check_hostname = False context.verify_mode = ssl.CERT_NONE self._load_client_certs(context) return context def load_ssl_context_verify(self) -> ssl.SSLContext: """ Return an SSL context for verified connections. """ if self.trust_env and self.verify is True: ca_bundle = get_ca_bundle_from_env() if ca_bundle is not None: self.verify = ca_bundle if isinstance(self.verify, ssl.SSLContext): # Allow passing in our own SSLContext object that's pre-configured. context = self.verify self._load_client_certs(context) return context elif isinstance(self.verify, bool): ca_bundle_path = self.DEFAULT_CA_BUNDLE_PATH elif Path(self.verify).exists(): ca_bundle_path = Path(self.verify) else: raise IOError( "Could not find a suitable TLS CA certificate bundle, " "invalid path: {}".format(self.verify) ) context = self._create_default_ssl_context() context.verify_mode = ssl.CERT_REQUIRED context.check_hostname = True # Signal to server support for PHA in TLS 1.3. Raises an # AttributeError if only read-only access is implemented. if sys.version_info >= (3, 8): # pragma: no cover try: context.post_handshake_auth = True except AttributeError: # pragma: no cover pass # Disable using 'commonName' for SSLContext.check_hostname # when the 'subjectAltName' extension isn't available. try: context.hostname_checks_common_name = False except AttributeError: # pragma: no cover pass if ca_bundle_path.is_file(): logger.trace(f"load_verify_locations cafile={ca_bundle_path!s}") context.load_verify_locations(cafile=str(ca_bundle_path)) elif ca_bundle_path.is_dir(): logger.trace(f"load_verify_locations capath={ca_bundle_path!s}") context.load_verify_locations(capath=str(ca_bundle_path)) self._load_client_certs(context) return context def _create_default_ssl_context(self) -> ssl.SSLContext: """ Creates the default SSLContext object that's used for both verified and unverified connections. """ context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) set_minimum_tls_version_1_2(context) context.options |= ssl.OP_NO_COMPRESSION context.set_ciphers(DEFAULT_CIPHERS) if ssl.HAS_ALPN: alpn_idents = ["http/1.1", "h2"] if self.http2 else ["http/1.1"] context.set_alpn_protocols(alpn_idents) if sys.version_info >= (3, 8): # pragma: no cover keylogfile = os.environ.get("SSLKEYLOGFILE") if keylogfile and self.trust_env: context.keylog_filename = keylogfile return context def _load_client_certs(self, ssl_context: ssl.SSLContext) -> None: """ Loads client certificates into our SSLContext object """ if self.cert is not None: if isinstance(self.cert, str): ssl_context.load_cert_chain(certfile=self.cert) elif isinstance(self.cert, tuple) and len(self.cert) == 2: ssl_context.load_cert_chain(certfile=self.cert[0], keyfile=self.cert[1]) elif isinstance(self.cert, tuple) and len(self.cert) == 3: ssl_context.load_cert_chain( certfile=self.cert[0], keyfile=self.cert[1], password=self.cert[2], # type: ignore ) class Timeout: """ Timeout configuration. **Usage**: Timeout(None) # No timeouts. Timeout(5.0) # 5s timeout on all operations. Timeout(None, connect=5.0) # 5s timeout on connect, no other timeouts. Timeout(5.0, connect=10.0) # 10s timeout on connect. 5s timeout elsewhere. Timeout(5.0, pool=None) # No timeout on acquiring connection from pool. # 5s timeout elsewhere. """ def __init__( self, timeout: typing.Union[TimeoutTypes, UnsetType] = UNSET, *, connect: typing.Union[None, float, UnsetType] = UNSET, read: typing.Union[None, float, UnsetType] = UNSET, write: typing.Union[None, float, UnsetType] = UNSET, pool: typing.Union[None, float, UnsetType] = UNSET, ): if isinstance(timeout, Timeout): # Passed as a single explicit Timeout. assert connect is UNSET assert read is UNSET assert write is UNSET assert pool is UNSET self.connect = timeout.connect # type: typing.Optional[float] self.read = timeout.read # type: typing.Optional[float] self.write = timeout.write # type: typing.Optional[float] self.pool = timeout.pool # type: typing.Optional[float] elif isinstance(timeout, tuple): # Passed as a tuple. self.connect = timeout[0] self.read = timeout[1] self.write = None if len(timeout) < 3 else timeout[2] self.pool = None if len(timeout) < 4 else timeout[3] elif not ( isinstance(connect, UnsetType) or isinstance(read, UnsetType) or isinstance(write, UnsetType) or isinstance(pool, UnsetType) ): self.connect = connect self.read = read self.write = write self.pool = pool else: if isinstance(timeout, UnsetType): raise ValueError( "httpx.Timeout must either include a default, or set all " "four parameters explicitly." ) self.connect = timeout if isinstance(connect, UnsetType) else connect self.read = timeout if isinstance(read, UnsetType) else read self.write = timeout if isinstance(write, UnsetType) else write self.pool = timeout if isinstance(pool, UnsetType) else pool def as_dict(self) -> typing.Dict[str, typing.Optional[float]]: return { "connect": self.connect, "read": self.read, "write": self.write, "pool": self.pool, } def __eq__(self, other: typing.Any) -> bool: return ( isinstance(other, self.__class__) and self.connect == other.connect and self.read == other.read and self.write == other.write and self.pool == other.pool ) def __repr__(self) -> str: class_name = self.__class__.__name__ if len({self.connect, self.read, self.write, self.pool}) == 1: return f"{class_name}(timeout={self.connect})" return ( f"{class_name}(connect={self.connect}, " f"read={self.read}, write={self.write}, pool={self.pool})" ) class Limits: """ Configuration for limits to various client behaviors. **Parameters:** * **max_connections** - The maximum number of concurrent connections that may be established. * **max_keepalive_connections** - Allow the connection pool to maintain keep-alive connections below this point. Should be less than or equal to `max_connections`. * **keepalive_expiry** - Time limit on idle keep-alive connections in seconds. """ def __init__( self, *, max_connections: typing.Optional[int] = None, max_keepalive_connections: typing.Optional[int] = None, keepalive_expiry: typing.Optional[float] = 5.0, ): self.max_connections = max_connections self.max_keepalive_connections = max_keepalive_connections self.keepalive_expiry = keepalive_expiry def __eq__(self, other: typing.Any) -> bool: return ( isinstance(other, self.__class__) and self.max_connections == other.max_connections and self.max_keepalive_connections == other.max_keepalive_connections and self.keepalive_expiry == other.keepalive_expiry ) def __repr__(self) -> str: class_name = self.__class__.__name__ return ( f"{class_name}(max_connections={self.max_connections}, " f"max_keepalive_connections={self.max_keepalive_connections}, " f"keepalive_expiry={self.keepalive_expiry})" ) class Proxy: def __init__( self, url: URLTypes, *, auth: typing.Optional[typing.Tuple[str, str]] = None, headers: typing.Optional[HeaderTypes] = None, ): url = URL(url) headers = Headers(headers) if url.scheme not in ("http", "https", "socks5"): raise ValueError(f"Unknown scheme for proxy URL {url!r}") if url.username or url.password: # Remove any auth credentials from the URL. auth = (url.username, url.password) url = url.copy_with(username=None, password=None) self.url = url self.auth = auth self.headers = headers @property def raw_auth(self) -> typing.Optional[typing.Tuple[bytes, bytes]]: # The proxy authentication as raw bytes. return ( None if self.auth is None else (self.auth[0].encode("utf-8"), self.auth[1].encode("utf-8")) ) def __repr__(self) -> str: # The authentication is represented with the password component masked. auth = (self.auth[0], "********") if self.auth else None # Build a nice concise representation. url_str = f"{str(self.url)!r}" auth_str = f", auth={auth!r}" if auth else "" headers_str = f", headers={dict(self.headers)!r}" if self.headers else "" return f"Proxy({url_str}{auth_str}{headers_str})" DEFAULT_TIMEOUT_CONFIG = Timeout(timeout=5.0) DEFAULT_LIMITS = Limits(max_connections=100, max_keepalive_connections=20) DEFAULT_MAX_REDIRECTS = 20