Server IP : 85.214.239.14 / Your IP : 3.148.104.103 Web Server : Apache/2.4.62 (Debian) System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64 User : www-data ( 33) PHP Version : 7.4.18 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare, MySQL : OFF | cURL : OFF | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : OFF Directory : /proc/self/root/proc/2/root/proc/3/cwd/usr/lib/python3/dist-packages/httpx/_transports/ |
Upload File : |
""" Custom transports, with nicely configured defaults. The following additional keyword arguments are currently supported by httpcore... * uds: str * local_address: str * retries: int Example usages... # Disable HTTP/2 on a single specific domain. mounts = { "all://": httpx.HTTPTransport(http2=True), "all://*example.org": httpx.HTTPTransport() } # Using advanced httpcore configuration, with connection retries. transport = httpx.HTTPTransport(retries=1) client = httpx.Client(transport=transport) # Using advanced httpcore configuration, with unix domain sockets. transport = httpx.HTTPTransport(uds="socket.uds") client = httpx.Client(transport=transport) """ import contextlib import typing from types import TracebackType import httpcore from .._config import DEFAULT_LIMITS, Limits, Proxy, create_ssl_context from .._exceptions import ( ConnectError, ConnectTimeout, LocalProtocolError, NetworkError, PoolTimeout, ProtocolError, ProxyError, ReadError, ReadTimeout, RemoteProtocolError, TimeoutException, UnsupportedProtocol, WriteError, WriteTimeout, ) from .._models import Request, Response from .._types import AsyncByteStream, CertTypes, SyncByteStream, VerifyTypes from .base import AsyncBaseTransport, BaseTransport T = typing.TypeVar("T", bound="HTTPTransport") A = typing.TypeVar("A", bound="AsyncHTTPTransport") @contextlib.contextmanager def map_httpcore_exceptions() -> typing.Iterator[None]: try: yield except Exception as exc: # noqa: PIE-786 mapped_exc = None for from_exc, to_exc in HTTPCORE_EXC_MAP.items(): if not isinstance(exc, from_exc): continue # We want to map to the most specific exception we can find. # Eg if `exc` is an `httpcore.ReadTimeout`, we want to map to # `httpx.ReadTimeout`, not just `httpx.TimeoutException`. if mapped_exc is None or issubclass(to_exc, mapped_exc): mapped_exc = to_exc if mapped_exc is None: # pragma: no cover raise message = str(exc) raise mapped_exc(message) from exc HTTPCORE_EXC_MAP = { httpcore.TimeoutException: TimeoutException, httpcore.ConnectTimeout: ConnectTimeout, httpcore.ReadTimeout: ReadTimeout, httpcore.WriteTimeout: WriteTimeout, httpcore.PoolTimeout: PoolTimeout, httpcore.NetworkError: NetworkError, httpcore.ConnectError: ConnectError, httpcore.ReadError: ReadError, httpcore.WriteError: WriteError, httpcore.ProxyError: ProxyError, httpcore.UnsupportedProtocol: UnsupportedProtocol, httpcore.ProtocolError: ProtocolError, httpcore.LocalProtocolError: LocalProtocolError, httpcore.RemoteProtocolError: RemoteProtocolError, } class ResponseStream(SyncByteStream): def __init__(self, httpcore_stream: typing.Iterable[bytes]): self._httpcore_stream = httpcore_stream def __iter__(self) -> typing.Iterator[bytes]: with map_httpcore_exceptions(): for part in self._httpcore_stream: yield part def close(self) -> None: if hasattr(self._httpcore_stream, "close"): self._httpcore_stream.close() # type: ignore class HTTPTransport(BaseTransport): def __init__( self, verify: VerifyTypes = True, cert: typing.Optional[CertTypes] = None, http1: bool = True, http2: bool = False, limits: Limits = DEFAULT_LIMITS, trust_env: bool = True, proxy: typing.Optional[Proxy] = None, uds: typing.Optional[str] = None, local_address: typing.Optional[str] = None, retries: int = 0, ) -> None: ssl_context = create_ssl_context(verify=verify, cert=cert, trust_env=trust_env) if proxy is None: self._pool = httpcore.ConnectionPool( ssl_context=ssl_context, max_connections=limits.max_connections, max_keepalive_connections=limits.max_keepalive_connections, keepalive_expiry=limits.keepalive_expiry, http1=http1, http2=http2, uds=uds, local_address=local_address, retries=retries, ) elif proxy.url.scheme in ("http", "https"): self._pool = httpcore.HTTPProxy( proxy_url=httpcore.URL( scheme=proxy.url.raw_scheme, host=proxy.url.raw_host, port=proxy.url.port, target=proxy.url.raw_path, ), proxy_auth=proxy.raw_auth, proxy_headers=proxy.headers.raw, ssl_context=ssl_context, max_connections=limits.max_connections, max_keepalive_connections=limits.max_keepalive_connections, keepalive_expiry=limits.keepalive_expiry, http1=http1, http2=http2, ) elif proxy.url.scheme == "socks5": try: import socksio # noqa except ImportError: # pragma: no cover raise ImportError( "Using SOCKS proxy, but the 'socksio' package is not installed. " "Make sure to install httpx using `pip install httpx[socks]`." ) from None self._pool = httpcore.SOCKSProxy( proxy_url=httpcore.URL( scheme=proxy.url.raw_scheme, host=proxy.url.raw_host, port=proxy.url.port, target=proxy.url.raw_path, ), proxy_auth=proxy.raw_auth, ssl_context=ssl_context, max_connections=limits.max_connections, max_keepalive_connections=limits.max_keepalive_connections, keepalive_expiry=limits.keepalive_expiry, http1=http1, http2=http2, ) else: # pragma: no cover raise ValueError( f"Proxy protocol must be either 'http', 'https', or 'socks5', but got {proxy.url.scheme!r}." ) def __enter__(self: T) -> T: # Use generics for subclass support. self._pool.__enter__() return self def __exit__( self, exc_type: typing.Optional[typing.Type[BaseException]] = None, exc_value: typing.Optional[BaseException] = None, traceback: typing.Optional[TracebackType] = None, ) -> None: with map_httpcore_exceptions(): self._pool.__exit__(exc_type, exc_value, traceback) def handle_request( self, request: Request, ) -> Response: assert isinstance(request.stream, SyncByteStream) req = httpcore.Request( method=request.method, url=httpcore.URL( scheme=request.url.raw_scheme, host=request.url.raw_host, port=request.url.port, target=request.url.raw_path, ), headers=request.headers.raw, content=request.stream, extensions=request.extensions, ) with map_httpcore_exceptions(): resp = self._pool.handle_request(req) assert isinstance(resp.stream, typing.Iterable) return Response( status_code=resp.status, headers=resp.headers, stream=ResponseStream(resp.stream), extensions=resp.extensions, ) def close(self) -> None: self._pool.close() class AsyncResponseStream(AsyncByteStream): def __init__(self, httpcore_stream: typing.AsyncIterable[bytes]): self._httpcore_stream = httpcore_stream async def __aiter__(self) -> typing.AsyncIterator[bytes]: with map_httpcore_exceptions(): async for part in self._httpcore_stream: yield part async def aclose(self) -> None: if hasattr(self._httpcore_stream, "aclose"): await self._httpcore_stream.aclose() # type: ignore class AsyncHTTPTransport(AsyncBaseTransport): def __init__( self, verify: VerifyTypes = True, cert: typing.Optional[CertTypes] = None, http1: bool = True, http2: bool = False, limits: Limits = DEFAULT_LIMITS, trust_env: bool = True, proxy: typing.Optional[Proxy] = None, uds: typing.Optional[str] = None, local_address: typing.Optional[str] = None, retries: int = 0, ) -> None: ssl_context = create_ssl_context(verify=verify, cert=cert, trust_env=trust_env) if proxy is None: self._pool = httpcore.AsyncConnectionPool( ssl_context=ssl_context, max_connections=limits.max_connections, max_keepalive_connections=limits.max_keepalive_connections, keepalive_expiry=limits.keepalive_expiry, http1=http1, http2=http2, uds=uds, local_address=local_address, retries=retries, ) elif proxy.url.scheme in ("http", "https"): self._pool = httpcore.AsyncHTTPProxy( proxy_url=httpcore.URL( scheme=proxy.url.raw_scheme, host=proxy.url.raw_host, port=proxy.url.port, target=proxy.url.raw_path, ), proxy_auth=proxy.raw_auth, proxy_headers=proxy.headers.raw, ssl_context=ssl_context, max_connections=limits.max_connections, max_keepalive_connections=limits.max_keepalive_connections, keepalive_expiry=limits.keepalive_expiry, http1=http1, http2=http2, ) elif proxy.url.scheme == "socks5": try: import socksio # noqa except ImportError: # pragma: no cover raise ImportError( "Using SOCKS proxy, but the 'socksio' package is not installed. " "Make sure to install httpx using `pip install httpx[socks]`." ) from None self._pool = httpcore.AsyncSOCKSProxy( proxy_url=httpcore.URL( scheme=proxy.url.raw_scheme, host=proxy.url.raw_host, port=proxy.url.port, target=proxy.url.raw_path, ), proxy_auth=proxy.raw_auth, ssl_context=ssl_context, max_connections=limits.max_connections, max_keepalive_connections=limits.max_keepalive_connections, keepalive_expiry=limits.keepalive_expiry, http1=http1, http2=http2, ) else: # pragma: no cover raise ValueError( f"Proxy protocol must be either 'http', 'https', or 'socks5', but got {proxy.url.scheme!r}." ) async def __aenter__(self: A) -> A: # Use generics for subclass support. await self._pool.__aenter__() return self async def __aexit__( self, exc_type: typing.Optional[typing.Type[BaseException]] = None, exc_value: typing.Optional[BaseException] = None, traceback: typing.Optional[TracebackType] = None, ) -> None: with map_httpcore_exceptions(): await self._pool.__aexit__(exc_type, exc_value, traceback) async def handle_async_request( self, request: Request, ) -> Response: assert isinstance(request.stream, AsyncByteStream) req = httpcore.Request( method=request.method, url=httpcore.URL( scheme=request.url.raw_scheme, host=request.url.raw_host, port=request.url.port, target=request.url.raw_path, ), headers=request.headers.raw, content=request.stream, extensions=request.extensions, ) with map_httpcore_exceptions(): resp = await self._pool.handle_async_request(req) assert isinstance(resp.stream, typing.AsyncIterable) return Response( status_code=resp.status, headers=resp.headers, stream=AsyncResponseStream(resp.stream), extensions=resp.extensions, ) async def aclose(self) -> None: await self._pool.aclose()