1# -*- Mode: Python; tab-width: 4 -*- 2# Id: asynchat.py,v 2.26 2000/09/07 22:29:26 rushing Exp 3# Author: Sam Rushing <rushing@nightmare.com> 4 5# ====================================================================== 6# Copyright 1996 by Sam Rushing 7# 8# All Rights Reserved 9# 10# Permission to use, copy, modify, and distribute this software and 11# its documentation for any purpose and without fee is hereby 12# granted, provided that the above copyright notice appear in all 13# copies and that both that copyright notice and this permission 14# notice appear in supporting documentation, and that the name of Sam 15# Rushing not be used in advertising or publicity pertaining to 16# distribution of the software without specific, written prior 17# permission. 18# 19# SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, 20# INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN 21# NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR 22# CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS 23# OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, 24# NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN 25# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 26# ====================================================================== 27 28r"""A class supporting chat-style (command/response) protocols. 29 30This class adds support for 'chat' style protocols - where one side 31sends a 'command', and the other sends a response (examples would be 32the common internet protocols - smtp, nntp, ftp, etc..). 33 34The handle_read() method looks at the input stream for the current 35'terminator' (usually '\r\n' for single-line responses, '\r\n.\r\n' 36for multi-line output), calling self.found_terminator() on its 37receipt. 38 39for example: 40Say you build an async nntp client using this class. At the start 41of the connection, you'll have self.terminator set to '\r\n', in 42order to process the single-line greeting. Just before issuing a 43'LIST' command you'll set it to '\r\n.\r\n'. The output of the LIST 44command will be accumulated (using your own 'collect_incoming_data' 45method) up to the terminator, and then control will be returned to 46you - by calling your self.found_terminator() method. 47""" 48 49import socket 50import asyncore 51from collections import deque 52 53class async_chat (asyncore.dispatcher): 54 """This is an abstract class. You must derive from this class, and add 55 the two methods collect_incoming_data() and found_terminator()""" 56 57 # these are overridable defaults 58 59 ac_in_buffer_size = 4096 60 ac_out_buffer_size = 4096 61 62 def __init__ (self, conn=None): 63 self.ac_in_buffer = '' 64 self.ac_out_buffer = '' 65 self.producer_fifo = fifo() 66 asyncore.dispatcher.__init__ (self, conn) 67 68 def collect_incoming_data(self, data): 69 raise NotImplementedError, "must be implemented in subclass" 70 71 def found_terminator(self): 72 raise NotImplementedError, "must be implemented in subclass" 73 74 def set_terminator (self, term): 75 "Set the input delimiter. Can be a fixed string of any length, an integer, or None" 76 self.terminator = term 77 78 def get_terminator (self): 79 return self.terminator 80 81 # grab some more data from the socket, 82 # throw it to the collector method, 83 # check for the terminator, 84 # if found, transition to the next state. 85 86 def handle_read (self): 87 88 try: 89 data = self.recv (self.ac_in_buffer_size) 90 except socket.error, why: 91 self.handle_error() 92 return 93 94 self.ac_in_buffer = self.ac_in_buffer + data 95 96 # Continue to search for self.terminator in self.ac_in_buffer, 97 # while calling self.collect_incoming_data. The while loop 98 # is necessary because we might read several data+terminator 99 # combos with a single recv(1024). 100 101 while self.ac_in_buffer: 102 lb = len(self.ac_in_buffer) 103 terminator = self.get_terminator() 104 if not terminator: 105 # no terminator, collect it all 106 self.collect_incoming_data (self.ac_in_buffer) 107 self.ac_in_buffer = '' 108 elif isinstance(terminator, int) or isinstance(terminator, long): 109 # numeric terminator 110 n = terminator 111 if lb < n: 112 self.collect_incoming_data (self.ac_in_buffer) 113 self.ac_in_buffer = '' 114 self.terminator = self.terminator - lb 115 else: 116 self.collect_incoming_data (self.ac_in_buffer[:n]) 117 self.ac_in_buffer = self.ac_in_buffer[n:] 118 self.terminator = 0 119 self.found_terminator() 120 else: 121 # 3 cases: 122 # 1) end of buffer matches terminator exactly: 123 # collect data, transition 124 # 2) end of buffer matches some prefix: 125 # collect data to the prefix 126 # 3) end of buffer does not match any prefix: 127 # collect data 128 terminator_len = len(terminator) 129 index = self.ac_in_buffer.find(terminator) 130 if index != -1: 131 # we found the terminator 132 if index > 0: 133 # don't bother reporting the empty string (source of subtle bugs) 134 self.collect_incoming_data (self.ac_in_buffer[:index]) 135 self.ac_in_buffer = self.ac_in_buffer[index+terminator_len:] 136 # This does the Right Thing if the terminator is changed here. 137 self.found_terminator() 138 else: 139 # check for a prefix of the terminator 140 index = find_prefix_at_end (self.ac_in_buffer, terminator) 141 if index: 142 if index != lb: 143 # we found a prefix, collect up to the prefix 144 self.collect_incoming_data (self.ac_in_buffer[:-index]) 145 self.ac_in_buffer = self.ac_in_buffer[-index:] 146 break 147 else: 148 # no prefix, collect it all 149 self.collect_incoming_data (self.ac_in_buffer) 150 self.ac_in_buffer = '' 151 152 def handle_write (self): 153 self.initiate_send () 154 155 def handle_close (self): 156 self.close() 157 158 def push (self, data): 159 self.producer_fifo.push (simple_producer (data)) 160 self.initiate_send() 161 162 def push_with_producer (self, producer): 163 self.producer_fifo.push (producer) 164 self.initiate_send() 165 166 def readable (self): 167 "predicate for inclusion in the readable for select()" 168 return (len(self.ac_in_buffer) <= self.ac_in_buffer_size) 169 170 def writable (self): 171 "predicate for inclusion in the writable for select()" 172 # return len(self.ac_out_buffer) or len(self.producer_fifo) or (not self.connected) 173 # this is about twice as fast, though not as clear. 174 return not ( 175 (self.ac_out_buffer == '') and 176 self.producer_fifo.is_empty() and 177 self.connected 178 ) 179 180 def close_when_done (self): 181 "automatically close this channel once the outgoing queue is empty" 182 self.producer_fifo.push (None) 183 184 # refill the outgoing buffer by calling the more() method 185 # of the first producer in the queue 186 def refill_buffer (self): 187 while 1: 188 if len(self.producer_fifo): 189 p = self.producer_fifo.first() 190 # a 'None' in the producer fifo is a sentinel, 191 # telling us to close the channel. 192 if p is None: 193 if not self.ac_out_buffer: 194 self.producer_fifo.pop() 195 self.close() 196 return 197 elif isinstance(p, str): 198 self.producer_fifo.pop() 199 self.ac_out_buffer = self.ac_out_buffer + p 200 return 201 data = p.more() 202 if data: 203 self.ac_out_buffer = self.ac_out_buffer + data 204 return 205 else: 206 self.producer_fifo.pop() 207 else: 208 return 209 210 def initiate_send (self): 211 obs = self.ac_out_buffer_size 212 # try to refill the buffer 213 if (len (self.ac_out_buffer) < obs): 214 self.refill_buffer() 215 216 if self.ac_out_buffer and self.connected: 217 # try to send the buffer 218 try: 219 num_sent = self.send (self.ac_out_buffer[:obs]) 220 if num_sent: 221 self.ac_out_buffer = self.ac_out_buffer[num_sent:] 222 223 except socket.error, why: 224 self.handle_error() 225 return 226 227 def discard_buffers (self): 228 # Emergencies only! 229 self.ac_in_buffer = '' 230 self.ac_out_buffer = '' 231 while self.producer_fifo: 232 self.producer_fifo.pop() 233 234 235class simple_producer: 236 237 def __init__ (self, data, buffer_size=512): 238 self.data = data 239 self.buffer_size = buffer_size 240 241 def more (self): 242 if len (self.data) > self.buffer_size: 243 result = self.data[:self.buffer_size] 244 self.data = self.data[self.buffer_size:] 245 return result 246 else: 247 result = self.data 248 self.data = '' 249 return result 250 251class fifo: 252 def __init__ (self, list=None): 253 if not list: 254 self.list = deque() 255 else: 256 self.list = deque(list) 257 258 def __len__ (self): 259 return len(self.list) 260 261 def is_empty (self): 262 return not self.list 263 264 def first (self): 265 return self.list[0] 266 267 def push (self, data): 268 self.list.append(data) 269 270 def pop (self): 271 if self.list: 272 return (1, self.list.popleft()) 273 else: 274 return (0, None) 275 276# Given 'haystack', see if any prefix of 'needle' is at its end. This 277# assumes an exact match has already been checked. Return the number of 278# characters matched. 279# for example: 280# f_p_a_e ("qwerty\r", "\r\n") => 1 281# f_p_a_e ("qwertydkjf", "\r\n") => 0 282# f_p_a_e ("qwerty\r\n", "\r\n") => <undefined> 283 284# this could maybe be made faster with a computed regex? 285# [answer: no; circa Python-2.0, Jan 2001] 286# new python: 28961/s 287# old python: 18307/s 288# re: 12820/s 289# regex: 14035/s 290 291def find_prefix_at_end (haystack, needle): 292 l = len(needle) - 1 293 while l and not haystack.endswith(needle[:l]): 294 l -= 1 295 return l 296