Server IP : 85.214.239.14 / Your IP : 3.144.83.68 Web Server : Apache/2.4.62 (Debian) System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64 User : www-data ( 33) PHP Version : 7.4.18 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare, MySQL : OFF | cURL : OFF | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : OFF Directory : /proc/3/root/lib/python3/dist-packages/ansible_test/_internal/ |
Upload File : |
""" Primitive replacement for requests to avoid extra dependency. Avoids use of urllib2 due to lack of SNI support. """ from __future__ import annotations import json import time import typing as t from .util import ( ApplicationError, SubprocessError, display, ) from .util_common import ( CommonConfig, run_command, ) class HttpClient: """Make HTTP requests via curl.""" def __init__(self, args: CommonConfig, always: bool = False, insecure: bool = False, proxy: t.Optional[str] = None) -> None: self.args = args self.always = always self.insecure = insecure self.proxy = proxy self.username = None self.password = None def get(self, url: str) -> HttpResponse: """Perform an HTTP GET and return the response.""" return self.request('GET', url) def delete(self, url: str) -> HttpResponse: """Perform an HTTP DELETE and return the response.""" return self.request('DELETE', url) def put(self, url: str, data: t.Optional[str] = None, headers: t.Optional[dict[str, str]] = None) -> HttpResponse: """Perform an HTTP PUT and return the response.""" return self.request('PUT', url, data, headers) def request(self, method: str, url: str, data: t.Optional[str] = None, headers: t.Optional[dict[str, str]] = None) -> HttpResponse: """Perform an HTTP request and return the response.""" cmd = ['curl', '-s', '-S', '-i', '-X', method] if self.insecure: cmd += ['--insecure'] if headers is None: headers = {} headers['Expect'] = '' # don't send expect continue header if self.username: if self.password: display.sensitive.add(self.password) cmd += ['-u', '%s:%s' % (self.username, self.password)] else: cmd += ['-u', self.username] for header in headers.keys(): cmd += ['-H', '%s: %s' % (header, headers[header])] if data is not None: cmd += ['-d', data] if self.proxy: cmd += ['-x', self.proxy] cmd += [url] attempts = 0 max_attempts = 3 sleep_seconds = 3 # curl error codes which are safe to retry (request never sent to server) retry_on_status = ( 6, # CURLE_COULDNT_RESOLVE_HOST ) stdout = '' while True: attempts += 1 try: stdout = run_command(self.args, cmd, capture=True, always=self.always, cmd_verbosity=2)[0] break except SubprocessError as ex: if ex.status in retry_on_status and attempts < max_attempts: display.warning('%s' % ex) time.sleep(sleep_seconds) continue raise if self.args.explain and not self.always: return HttpResponse(method, url, 200, '') header, body = stdout.split('\r\n\r\n', 1) response_headers = header.split('\r\n') first_line = response_headers[0] http_response = first_line.split(' ') status_code = int(http_response[1]) return HttpResponse(method, url, status_code, body) class HttpResponse: """HTTP response from curl.""" def __init__(self, method: str, url: str, status_code: int, response: str) -> None: self.method = method self.url = url self.status_code = status_code self.response = response def json(self) -> t.Any: """Return the response parsed as JSON, raising an exception if parsing fails.""" try: return json.loads(self.response) except ValueError: raise HttpError(self.status_code, 'Cannot parse response to %s %s as JSON:\n%s' % (self.method, self.url, self.response)) class HttpError(ApplicationError): """HTTP response as an error.""" def __init__(self, status: int, message: str) -> None: super().__init__('%s: %s' % (status, message)) self.status = status