Dre4m Shell
Server IP : 85.214.239.14  /  Your IP : 18.223.159.143
Web Server : Apache/2.4.62 (Debian)
System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64
User : www-data ( 33)
PHP Version : 7.4.18
Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
MySQL : OFF  |  cURL : OFF  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : OFF
Directory :  /proc/3/root/proc/3/task/3/cwd/proc/2/task/2/cwd/usr/lib/python3/dist-packages/h11/tests/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ HOME SHELL ]     

Current File : /proc/3/root/proc/3/task/3/cwd/proc/2/task/2/cwd/usr/lib/python3/dist-packages/h11/tests/helpers.py
from typing import cast, List, Type, Union, ValuesView

from .._connection import Connection, NEED_DATA, PAUSED
from .._events import (
    ConnectionClosed,
    Data,
    EndOfMessage,
    Event,
    InformationalResponse,
    Request,
    Response,
)
from .._state import CLIENT, CLOSED, DONE, MUST_CLOSE, SERVER
from .._util import Sentinel

try:
    from typing import Literal
except ImportError:
    from typing_extensions import Literal  # type: ignore


def get_all_events(conn: Connection) -> List[Event]:
    got_events = []
    while True:
        event = conn.next_event()
        if event in (NEED_DATA, PAUSED):
            break
        event = cast(Event, event)
        got_events.append(event)
        if type(event) is ConnectionClosed:
            break
    return got_events


def receive_and_get(conn: Connection, data: bytes) -> List[Event]:
    conn.receive_data(data)
    return get_all_events(conn)


# Merges adjacent Data events, converts payloads to bytestrings, and removes
# chunk boundaries.
def normalize_data_events(in_events: List[Event]) -> List[Event]:
    out_events: List[Event] = []
    for event in in_events:
        if type(event) is Data:
            event = Data(data=bytes(event.data), chunk_start=False, chunk_end=False)
        if out_events and type(out_events[-1]) is type(event) is Data:
            out_events[-1] = Data(
                data=out_events[-1].data + event.data,
                chunk_start=out_events[-1].chunk_start,
                chunk_end=out_events[-1].chunk_end,
            )
        else:
            out_events.append(event)
    return out_events


# Given that we want to write tests that push some events through a Connection
# and check that its state updates appropriately... we might as make a habit
# of pushing them through two Connections with a fake network link in
# between.
class ConnectionPair:
    def __init__(self) -> None:
        self.conn = {CLIENT: Connection(CLIENT), SERVER: Connection(SERVER)}
        self.other = {CLIENT: SERVER, SERVER: CLIENT}

    @property
    def conns(self) -> ValuesView[Connection]:
        return self.conn.values()

    # expect="match" if expect=send_events; expect=[...] to say what expected
    def send(
        self,
        role: Type[Sentinel],
        send_events: Union[List[Event], Event],
        expect: Union[List[Event], Event, Literal["match"]] = "match",
    ) -> bytes:
        if not isinstance(send_events, list):
            send_events = [send_events]
        data = b""
        closed = False
        for send_event in send_events:
            new_data = self.conn[role].send(send_event)
            if new_data is None:
                closed = True
            else:
                data += new_data
        # send uses b"" to mean b"", and None to mean closed
        # receive uses b"" to mean closed, and None to mean "try again"
        # so we have to translate between the two conventions
        if data:
            self.conn[self.other[role]].receive_data(data)
        if closed:
            self.conn[self.other[role]].receive_data(b"")
        got_events = get_all_events(self.conn[self.other[role]])
        if expect == "match":
            expect = send_events
        if not isinstance(expect, list):
            expect = [expect]
        assert got_events == expect
        return data

Anon7 - 2022
AnonSec Team