Server IP : 85.214.239.14 / Your IP : 18.223.210.83 Web Server : Apache/2.4.62 (Debian) System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64 User : www-data ( 33) PHP Version : 7.4.18 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare, MySQL : OFF | cURL : OFF | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : OFF Directory : /proc/2/root/proc/2/cwd/proc/2/root/proc/3/cwd/lib/python3/dist-packages/httpx/_transports/ |
Upload File : |
import typing from types import TracebackType from .._models import Request, Response T = typing.TypeVar("T", bound="BaseTransport") A = typing.TypeVar("A", bound="AsyncBaseTransport") class BaseTransport: def __enter__(self: T) -> T: return self def __exit__( self, exc_type: typing.Optional[typing.Type[BaseException]] = None, exc_value: typing.Optional[BaseException] = None, traceback: typing.Optional[TracebackType] = None, ) -> None: self.close() def handle_request(self, request: Request) -> Response: """ Send a single HTTP request and return a response. Developers shouldn't typically ever need to call into this API directly, since the Client class provides all the higher level user-facing API niceties. In order to properly release any network resources, the response stream should *either* be consumed immediately, with a call to `response.stream.read()`, or else the `handle_request` call should be followed with a try/finally block to ensuring the stream is always closed. Example usage: with httpx.HTTPTransport() as transport: req = httpx.Request( method=b"GET", url=(b"https", b"www.example.com", 443, b"/"), headers=[(b"Host", b"www.example.com")], ) resp = transport.handle_request(req) body = resp.stream.read() print(resp.status_code, resp.headers, body) Takes a `Request` instance as the only argument. Returns a `Response` instance. """ raise NotImplementedError( "The 'handle_request' method must be implemented." ) # pragma: no cover def close(self) -> None: pass class AsyncBaseTransport: async def __aenter__(self: A) -> A: return self async def __aexit__( self, exc_type: typing.Optional[typing.Type[BaseException]] = None, exc_value: typing.Optional[BaseException] = None, traceback: typing.Optional[TracebackType] = None, ) -> None: await self.aclose() async def handle_async_request( self, request: Request, ) -> Response: raise NotImplementedError( "The 'handle_async_request' method must be implemented." ) # pragma: no cover async def aclose(self) -> None: pass