Server IP : 85.214.239.14 / Your IP : 3.15.138.214 Web Server : Apache/2.4.62 (Debian) System : Linux h2886529.stratoserver.net 4.9.0 #1 SMP Tue Jan 9 19:45:01 MSK 2024 x86_64 User : www-data ( 33) PHP Version : 7.4.18 Disable Function : pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare, MySQL : OFF | cURL : OFF | WGET : ON | Perl : ON | Python : ON | Sudo : ON | Pkexec : OFF Directory : /lib/python3/dist-packages/supervisor/medusa/ |
Upload File : |
# -*- Mode: Python; tab-width: 4 -*- # Id: asynchat.py,v 2.26 2000/09/07 22:29:26 rushing Exp # Author: Sam Rushing <rushing@nightmare.com> # ====================================================================== # Copyright 1996 by Sam Rushing # # All Rights Reserved # # Permission to use, copy, modify, and distribute this software and # its documentation for any purpose and without fee is hereby # granted, provided that the above copyright notice appear in all # copies and that both that copyright notice and this permission # notice appear in supporting documentation, and that the name of Sam # Rushing not be used in advertising or publicity pertaining to # distribution of the software without specific, written prior # permission. # # SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, # INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN # NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR # CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS # OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN # CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. # ====================================================================== r"""A class supporting chat-style (command/response) protocols. This class adds support for 'chat' style protocols - where one side sends a 'command', and the other sends a response (examples would be the common internet protocols - smtp, nntp, ftp, etc..). The handle_read() method looks at the input stream for the current 'terminator' (usually '\r\n' for single-line responses, '\r\n.\r\n' for multi-line output), calling self.found_terminator() on its receipt. for example: Say you build an async nntp client using this class. At the start of the connection, you'll have self.terminator set to '\r\n', in order to process the single-line greeting. Just before issuing a 'LIST' command you'll set it to '\r\n.\r\n'. The output of the LIST command will be accumulated (using your own 'collect_incoming_data' method) up to the terminator, and then control will be returned to you - by calling your self.found_terminator() method. """ import socket from supervisor.medusa import asyncore_25 as asyncore from supervisor.compat import long from supervisor.compat import as_bytes class async_chat (asyncore.dispatcher): """This is an abstract class. You must derive from this class, and add the two methods collect_incoming_data() and found_terminator()""" # these are overridable defaults ac_in_buffer_size = 4096 ac_out_buffer_size = 4096 def __init__ (self, conn=None, map=None): self.ac_in_buffer = b'' self.ac_out_buffer = b'' self.producer_fifo = fifo() asyncore.dispatcher.__init__ (self, conn, map) def collect_incoming_data(self, data): raise NotImplementedError("must be implemented in subclass") def found_terminator(self): raise NotImplementedError("must be implemented in subclass") def set_terminator (self, term): """Set the input delimiter. Can be a fixed string of any length, an integer, or None""" self.terminator = term def get_terminator (self): return self.terminator # grab some more data from the socket, # throw it to the collector method, # check for the terminator, # if found, transition to the next state. def handle_read (self): try: data = self.recv (self.ac_in_buffer_size) except socket.error: self.handle_error() return self.ac_in_buffer += data # Continue to search for self.terminator in self.ac_in_buffer, # while calling self.collect_incoming_data. The while loop # is necessary because we might read several data+terminator # combos with a single recv(1024). while self.ac_in_buffer: lb = len(self.ac_in_buffer) terminator = self.get_terminator() if not terminator: # no terminator, collect it all self.collect_incoming_data (self.ac_in_buffer) self.ac_in_buffer = b'' elif isinstance(terminator, int) or isinstance(terminator, long): # numeric terminator n = terminator if lb < n: self.collect_incoming_data (self.ac_in_buffer) self.ac_in_buffer = b'' self.terminator -= lb else: self.collect_incoming_data (self.ac_in_buffer[:n]) self.ac_in_buffer = self.ac_in_buffer[n:] self.terminator = 0 self.found_terminator() else: # 3 cases: # 1) end of buffer matches terminator exactly: # collect data, transition # 2) end of buffer matches some prefix: # collect data to the prefix # 3) end of buffer does not match any prefix: # collect data terminator_len = len(terminator) index = self.ac_in_buffer.find(terminator) if index != -1: # we found the terminator if index > 0: # don't bother reporting the empty string (source of subtle bugs) self.collect_incoming_data (self.ac_in_buffer[:index]) self.ac_in_buffer = self.ac_in_buffer[index+terminator_len:] # This does the Right Thing if the terminator is changed here. self.found_terminator() else: # check for a prefix of the terminator index = find_prefix_at_end (self.ac_in_buffer, terminator) if index: if index != lb: # we found a prefix, collect up to the prefix self.collect_incoming_data (self.ac_in_buffer[:-index]) self.ac_in_buffer = self.ac_in_buffer[-index:] break else: # no prefix, collect it all self.collect_incoming_data (self.ac_in_buffer) self.ac_in_buffer = b'' def handle_write (self): self.initiate_send () def handle_close (self): self.close() def push (self, data): data = as_bytes(data) self.producer_fifo.push(simple_producer(data)) self.initiate_send() def push_with_producer (self, producer): self.producer_fifo.push (producer) self.initiate_send() def readable (self): """predicate for inclusion in the readable for select()""" return len(self.ac_in_buffer) <= self.ac_in_buffer_size def writable (self): """predicate for inclusion in the writable for select()""" # return len(self.ac_out_buffer) or len(self.producer_fifo) or (not self.connected) # this is about twice as fast, though not as clear. return not ( (self.ac_out_buffer == b'') and self.producer_fifo.is_empty() and self.connected ) def close_when_done (self): """automatically close this channel once the outgoing queue is empty""" self.producer_fifo.push (None) # refill the outgoing buffer by calling the more() method # of the first producer in the queue def refill_buffer (self): while 1: if len(self.producer_fifo): p = self.producer_fifo.first() # a 'None' in the producer fifo is a sentinel, # telling us to close the channel. if p is None: if not self.ac_out_buffer: self.producer_fifo.pop() self.close() return elif isinstance(p, bytes): self.producer_fifo.pop() self.ac_out_buffer += p return data = p.more() if data: self.ac_out_buffer = self.ac_out_buffer + data return else: self.producer_fifo.pop() else: return def initiate_send (self): obs = self.ac_out_buffer_size # try to refill the buffer if len (self.ac_out_buffer) < obs: self.refill_buffer() if self.ac_out_buffer and self.connected: # try to send the buffer try: num_sent = self.send (self.ac_out_buffer[:obs]) if num_sent: self.ac_out_buffer = self.ac_out_buffer[num_sent:] except socket.error: self.handle_error() return def discard_buffers (self): # Emergencies only! self.ac_in_buffer = b'' self.ac_out_buffer = b'' while self.producer_fifo: self.producer_fifo.pop() class simple_producer: def __init__ (self, data, buffer_size=512): self.data = data self.buffer_size = buffer_size def more (self): if len (self.data) > self.buffer_size: result = self.data[:self.buffer_size] self.data = self.data[self.buffer_size:] return result else: result = self.data self.data = b'' return result class fifo: def __init__ (self, list=None): if not list: self.list = [] else: self.list = list def __len__ (self): return len(self.list) def is_empty (self): return self.list == [] def first (self): return self.list[0] def push (self, data): self.list.append(data) def pop (self): if self.list: return 1, self.list.pop(0) else: return 0, None # Given 'haystack', see if any prefix of 'needle' is at its end. This # assumes an exact match has already been checked. Return the number of # characters matched. # for example: # f_p_a_e ("qwerty\r", "\r\n") => 1 # f_p_a_e ("qwertydkjf", "\r\n") => 0 # f_p_a_e ("qwerty\r\n", "\r\n") => <undefined> # this could maybe be made faster with a computed regex? # [answer: no; circa Python-2.0, Jan 2001] # new python: 28961/s # old python: 18307/s # re: 12820/s # regex: 14035/s def find_prefix_at_end (haystack, needle): l = len(needle) - 1 while l and not haystack.endswith(needle[:l]): l -= 1 return l