Server IP : 85.214.239.14 / Your IP : 3.137.170.38 Web Server : Apache/2.4.62 (Debian) System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64 User : www-data ( 33) PHP Version : 7.4.18 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare, MySQL : OFF | cURL : OFF | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : OFF Directory : /proc/3/task/3/cwd/proc/self/root/proc/3/task/3/root/lib/python3/dist-packages/rich/ |
Upload File : |
import io from typing import IO, TYPE_CHECKING, Any, List from .ansi import AnsiDecoder from .text import Text if TYPE_CHECKING: from .console import Console class FileProxy(io.TextIOBase): """Wraps a file (e.g. sys.stdout) and redirects writes to a console.""" def __init__(self, console: "Console", file: IO[str]) -> None: self.__console = console self.__file = file self.__buffer: List[str] = [] self.__ansi_decoder = AnsiDecoder() @property def rich_proxied_file(self) -> IO[str]: """Get proxied file.""" return self.__file def __getattr__(self, name: str) -> Any: return getattr(self.__file, name) def write(self, text: str) -> int: if not isinstance(text, str): raise TypeError(f"write() argument must be str, not {type(text).__name__}") buffer = self.__buffer lines: List[str] = [] while text: line, new_line, text = text.partition("\n") if new_line: lines.append("".join(buffer) + line) buffer.clear() else: buffer.append(line) break if lines: console = self.__console with console: output = Text("\n").join( self.__ansi_decoder.decode_line(line) for line in lines ) console.print(output) return len(text) def flush(self) -> None: output = "".join(self.__buffer) if output: self.__console.print(output) del self.__buffer[:] def fileno(self) -> int: return self.__file.fileno()