Server IP : 85.214.239.14 / Your IP : 3.145.81.47 Web Server : Apache/2.4.62 (Debian) System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64 User : www-data ( 33) PHP Version : 7.4.18 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare, MySQL : OFF | cURL : OFF | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : OFF Directory : /proc/3/cwd/proc/3/cwd/usr/lib/python3/dist-packages/h11/tests/ |
Upload File : |
import re from typing import Tuple import pytest from .._receivebuffer import ReceiveBuffer def test_receivebuffer() -> None: b = ReceiveBuffer() assert not b assert len(b) == 0 assert bytes(b) == b"" b += b"123" assert b assert len(b) == 3 assert bytes(b) == b"123" assert bytes(b) == b"123" assert b.maybe_extract_at_most(2) == b"12" assert b assert len(b) == 1 assert bytes(b) == b"3" assert bytes(b) == b"3" assert b.maybe_extract_at_most(10) == b"3" assert bytes(b) == b"" assert b.maybe_extract_at_most(10) is None assert not b ################################################################ # maybe_extract_until_next ################################################################ b += b"123\n456\r\n789\r\n" assert b.maybe_extract_next_line() == b"123\n456\r\n" assert bytes(b) == b"789\r\n" assert b.maybe_extract_next_line() == b"789\r\n" assert bytes(b) == b"" b += b"12\r" assert b.maybe_extract_next_line() is None assert bytes(b) == b"12\r" b += b"345\n\r" assert b.maybe_extract_next_line() is None assert bytes(b) == b"12\r345\n\r" # here we stopped at the middle of b"\r\n" delimiter b += b"\n6789aaa123\r\n" assert b.maybe_extract_next_line() == b"12\r345\n\r\n" assert b.maybe_extract_next_line() == b"6789aaa123\r\n" assert b.maybe_extract_next_line() is None assert bytes(b) == b"" ################################################################ # maybe_extract_lines ################################################################ b += b"123\r\na: b\r\nfoo:bar\r\n\r\ntrailing" lines = b.maybe_extract_lines() assert lines == [b"123", b"a: b", b"foo:bar"] assert bytes(b) == b"trailing" assert b.maybe_extract_lines() is None b += b"\r\n\r" assert b.maybe_extract_lines() is None assert b.maybe_extract_at_most(100) == b"trailing\r\n\r" assert not b # Empty body case (as happens at the end of chunked encoding if there are # no trailing headers, e.g.) b += b"\r\ntrailing" assert b.maybe_extract_lines() == [] assert bytes(b) == b"trailing" @pytest.mark.parametrize( "data", [ pytest.param( ( b"HTTP/1.1 200 OK\r\n", b"Content-type: text/plain\r\n", b"Connection: close\r\n", b"\r\n", b"Some body", ), id="with_crlf_delimiter", ), pytest.param( ( b"HTTP/1.1 200 OK\n", b"Content-type: text/plain\n", b"Connection: close\n", b"\n", b"Some body", ), id="with_lf_only_delimiter", ), pytest.param( ( b"HTTP/1.1 200 OK\n", b"Content-type: text/plain\r\n", b"Connection: close\n", b"\n", b"Some body", ), id="with_mixed_crlf_and_lf", ), ], ) def test_receivebuffer_for_invalid_delimiter(data: Tuple[bytes]) -> None: b = ReceiveBuffer() for line in data: b += line lines = b.maybe_extract_lines() assert lines == [ b"HTTP/1.1 200 OK", b"Content-type: text/plain", b"Connection: close", ] assert bytes(b) == b"Some body"