1# Copyright (c) 2003-2005 Maxim Sobolev. All rights reserved. 2# Copyright (c) 2006-2014 Sippy Software, Inc. All rights reserved. 3# 4# All rights reserved. 5# 6# Redistribution and use in source and binary forms, with or without modification, 7# are permitted provided that the following conditions are met: 8# 9# 1. Redistributions of source code must retain the above copyright notice, this 10# list of conditions and the following disclaimer. 11# 12# 2. Redistributions in binary form must reproduce the above copyright notice, 13# this list of conditions and the following disclaimer in the documentation and/or 14# other materials provided with the distribution. 15# 16# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 17# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 18# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 19# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR 20# ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 21# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 22# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON 23# ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 24# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 25# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 26 27from twisted.internet import reactor 28from errno import ECONNRESET, ENOTCONN, ESHUTDOWN, EWOULDBLOCK, ENOBUFS, EAGAIN, \ 29 EINTR 30from datetime import datetime 31from time import sleep, time 32from threading import Thread, Condition 33from random import random 34import socket 35import sys, traceback 36 37from Timeout import Timeout 38from Time.MonoTime import MonoTime 39 40class AsyncSender(Thread): 41 userv = None 42 43 def __init__(self, userv): 44 Thread.__init__(self) 45 self.userv = userv 46 self.setDaemon(True) 47 self.start() 48 49 def run(self): 50 while True: 51 self.userv.wi_available.acquire() 52 while len(self.userv.wi) == 0: 53 self.userv.wi_available.wait() 54 wi = self.userv.wi.pop(0) 55 if wi == None: 56 # Shutdown request, relay it further 57 self.userv.wi.append(None) 58 self.userv.wi_available.notify() 59 self.userv.wi_available.release() 60 if wi == None: 61 break 62 data, address = wi 63 try: 64 ai = socket.getaddrinfo(address[0], None, self.userv.uopts.family) 65 except: 66 continue 67 if self.userv.uopts.family == socket.AF_INET: 68 address = (ai[0][4][0], address[1]) 69 else: 70 address = (ai[0][4][0], address[1], ai[0][4][2], ai[0][4][3]) 71 for i in range(0, 20): 72 try: 73 if self.userv.skt.sendto(data, address) == len(data): 74 break 75 except socket.error, why: 76 if why[0] not in (EWOULDBLOCK, ENOBUFS, EAGAIN): 77 break 78 sleep(0.01) 79 self.userv = None 80 81class AsyncReceiver(Thread): 82 userv = None 83 84 def __init__(self, userv): 85 Thread.__init__(self) 86 self.userv = userv 87 self.setDaemon(True) 88 self.start() 89 90 def run(self): 91 maxemptydata = 100 92 while True: 93 try: 94 data, address = self.userv.skt.recvfrom(8192) 95 if not data: 96 # Ugly hack to detect socket being closed under us on Linux. 97 # The problem is that even call on non-closed socket can 98 # sometimes return empty data buffer, making AsyncReceiver 99 # to exit prematurely. 100 maxemptydata -= 1 101 if maxemptydata == 0: 102 break 103 continue 104 else: 105 maxemptydata = 100 106 except Exception, why: 107 if isinstance(why, socket.error) and why[0] in (ECONNRESET, ENOTCONN, ESHUTDOWN): 108 break 109 if isinstance(why, socket.error) and why[0] in (EINTR,): 110 continue 111 else: 112 print datetime.now(), 'Udp_server: unhandled exception when receiving incoming data' 113 print '-' * 70 114 traceback.print_exc(file = sys.stdout) 115 print '-' * 70 116 sys.stdout.flush() 117 sleep(1) 118 continue 119 rtime = MonoTime() 120 if self.userv.uopts.family == socket.AF_INET6: 121 address = ('[%s]' % address[0], address[1]) 122 reactor.callFromThread(self.userv.handle_read, data, address, rtime) 123 self.userv = None 124 125_DEFAULT_FLAGS = socket.SO_REUSEADDR 126if hasattr(socket, 'SO_REUSEPORT'): 127 _DEFAULT_FLAGS |= socket.SO_REUSEPORT 128_DEFAULT_NWORKERS = 30 129 130class Udp_server_opts(object): 131 laddress = None 132 data_callback = None 133 family = None 134 flags = _DEFAULT_FLAGS 135 nworkers = _DEFAULT_NWORKERS 136 ploss_out_rate = 0.0 137 pdelay_out_max = 0.0 138 ploss_in_rate = 0.0 139 pdelay_in_max = 0.0 140 141 def __init__(self, laddress, data_callback, family = None, o = None): 142 if o == None: 143 if family == None: 144 if laddress != None and laddress[0].startswith('['): 145 family = socket.AF_INET6 146 laddress = (laddress[0][1:-1], laddress[1]) 147 else: 148 family = socket.AF_INET 149 self.family = family 150 self.laddress = laddress 151 self.data_callback = data_callback 152 else: 153 self.laddress, self.data_callback, self.family, self.nworkers, self.flags, \ 154 self.ploss_out_rate, self.pdelay_out_max, self.ploss_in_rate, \ 155 self.pdelay_in_max = o.laddress, o.data_callback, o.family, \ 156 o.nworkers, o.flags, o.ploss_out_rate, o.pdelay_out_max, o.ploss_in_rate, \ 157 o.pdelay_in_max 158 159 def getCopy(self): 160 return self.__class__(None, None, o = self) 161 162class Udp_server(object): 163 skt = None 164 uopts = None 165 sendqueue = None 166 stats = None 167 wi_available = None 168 wi = None 169 asenders = None 170 areceivers = None 171 172 def __init__(self, global_config, uopts): 173 self.uopts = uopts.getCopy() 174 self.skt = socket.socket(self.uopts.family, socket.SOCK_DGRAM) 175 if self.uopts.laddress != None: 176 ai = socket.getaddrinfo(self.uopts.laddress[0], None, self.uopts.family) 177 if self.uopts.family == socket.AF_INET: 178 address = (ai[0][4][0], self.uopts.laddress[1]) 179 else: 180 address = (ai[0][4][0], self.uopts.laddress[1], ai[0][4][2], ai[0][4][3]) 181 if (self.uopts.flags & socket.SO_REUSEADDR) != 0: 182 self.skt.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 183 if hasattr(socket, 'SO_REUSEPORT') and \ 184 (self.uopts.flags & socket.SO_REUSEPORT) != 0: 185 self.skt.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) 186 self.skt.bind(address) 187 self.sendqueue = [] 188 self.stats = [0, 0, 0] 189 self.wi_available = Condition() 190 self.wi = [] 191 self.asenders = [] 192 self.areceivers = [] 193 for i in range(0, self.uopts.nworkers): 194 self.asenders.append(AsyncSender(self)) 195 self.areceivers.append(AsyncReceiver(self)) 196 197 def send_to(self, data, address, delayed = False): 198 if not isinstance(address, tuple): 199 raise Exception('Invalid address, not a tuple: %s' % str(address)) 200 if self.uopts.ploss_out_rate > 0.0 and not delayed: 201 if random() < self.uopts.ploss_out_rate: 202 return 203 if self.uopts.pdelay_out_max > 0.0 and not delayed: 204 pdelay = self.uopts.pdelay_out_max * random() 205 Timeout(self.send_to, pdelay, 1, data, address, True) 206 return 207 addr, port = address 208 if self.uopts.family == socket.AF_INET6: 209 if not addr.startswith('['): 210 raise Exception('Invalid IPv6 address: %s' % addr) 211 address = (addr[1:-1], port) 212 self.wi_available.acquire() 213 self.wi.append((data, address)) 214 self.wi_available.notify() 215 self.wi_available.release() 216 217 def handle_read(self, data, address, rtime, delayed = False): 218 if len(data) > 0 and self.uopts.data_callback != None: 219 self.stats[2] += 1 220 if self.uopts.ploss_in_rate > 0.0 and not delayed: 221 if random() < self.uopts.ploss_in_rate: 222 return 223 if self.uopts.pdelay_in_max > 0.0 and not delayed: 224 pdelay = self.uopts.pdelay_in_max * random() 225 Timeout(self.handle_read, pdelay, 1, data, address, rtime + pdelay, True) 226 return 227 try: 228 self.uopts.data_callback(data, address, self, rtime) 229 except: 230 print datetime.now(), 'Udp_server: unhandled exception when processing incoming data' 231 print '-' * 70 232 traceback.print_exc(file = sys.stdout) 233 print '-' * 70 234 sys.stdout.flush() 235 236 def shutdown(self): 237 try: 238 self.skt.shutdown(socket.SHUT_RDWR) 239 except: 240 pass 241 self.wi_available.acquire() 242 self.wi.append(None) 243 self.wi_available.notify() 244 self.wi_available.release() 245 self.uopts.data_callback = None 246 for worker in self.asenders + self.areceivers: 247 worker.join() 248 self.asenders = None 249 self.areceivers = None 250 251class self_test(object): 252 from sys import exit 253 npongs = 2 254 ping_data = 'ping!' 255 ping_data6 = 'ping6!' 256 pong_laddr = None 257 pong_laddr6 = None 258 pong_data = 'pong!' 259 pong_data6 = 'pong6!' 260 ping_laddr = None 261 ping_laddr6 = None 262 ping_raddr = None 263 ping_raddr6 = None 264 pong_raddr = None 265 pong_raddr6 = None 266 267 def ping_received(self, data, address, udp_server, rtime): 268 if udp_server.uopts.family == socket.AF_INET: 269 print 'ping_received' 270 if data != self.ping_data or address != self.pong_raddr: 271 print data, address, self.ping_data, self.pong_raddr 272 exit(1) 273 udp_server.send_to(self.pong_data, address) 274 else: 275 print 'ping_received6' 276 if data != self.ping_data6 or address != self.pong_raddr6: 277 exit(1) 278 udp_server.send_to(self.pong_data6, address) 279 280 def pong_received(self, data, address, udp_server, rtime): 281 if udp_server.uopts.family == socket.AF_INET: 282 print 'pong_received' 283 if data != self.pong_data or address != self.ping_raddr: 284 exit(1) 285 else: 286 print 'pong_received6' 287 if data != self.pong_data6 or address != self.ping_raddr6: 288 exit(1) 289 self.npongs -= 1 290 if self.npongs == 0: 291 reactor.stop() 292 293 def run(self): 294 local_host = '127.0.0.1' 295 local_host6 = '[::1]' 296 remote_host = local_host 297 remote_host6 = local_host6 298 self.ping_laddr = (local_host, 12345) 299 self.pong_laddr = (local_host, 54321) 300 self.ping_laddr6 = (local_host6, 12345) 301 self.pong_laddr6 = (local_host6, 54321) 302 self.ping_raddr = (remote_host, 12345) 303 self.pong_raddr = (remote_host, 54321) 304 self.ping_raddr6 = (remote_host6, 12345) 305 self.pong_raddr6 = (remote_host6, 54321) 306 uopts_ping = Udp_server_opts(self.ping_laddr, self.ping_received) 307 uopts_ping6 = Udp_server_opts(self.ping_laddr6, self.ping_received) 308 uopts_pong = Udp_server_opts(self.pong_laddr, self.pong_received) 309 uopts_pong6 = Udp_server_opts(self.pong_laddr6, self.pong_received) 310 udp_server_ping = Udp_server({}, uopts_ping) 311 udp_server_pong = Udp_server({}, uopts_pong) 312 udp_server_pong.send_to(self.ping_data, self.ping_laddr) 313 udp_server_ping6 = Udp_server({}, uopts_ping6) 314 udp_server_pong6 = Udp_server({}, uopts_pong6) 315 udp_server_pong6.send_to(self.ping_data6, self.ping_laddr6) 316 reactor.run() 317 udp_server_ping.shutdown() 318 udp_server_pong.shutdown() 319 udp_server_ping6.shutdown() 320 udp_server_pong6.shutdown() 321 322if __name__ == '__main__': 323 self_test().run() 324