1# Copyright (c) 2003-2005 Maxim Sobolev. All rights reserved. 2# Copyright (c) 2006-2014 Sippy Software, Inc. All rights reserved. 3# 4# All rights reserved. 5# 6# Redistribution and use in source and binary forms, with or without modification, 7# are permitted provided that the following conditions are met: 8# 9# 1. Redistributions of source code must retain the above copyright notice, this 10# list of conditions and the following disclaimer. 11# 12# 2. Redistributions in binary form must reproduce the above copyright notice, 13# this list of conditions and the following disclaimer in the documentation and/or 14# other materials provided with the distribution. 15# 16# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 17# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 18# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 19# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR 20# ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 21# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 22# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON 23# ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 24# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 25# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 26 27from Timeout import Timeout 28from Udp_server import Udp_server, Udp_server_opts 29from Time.MonoTime import MonoTime 30from Math.recfilter import recfilter 31from Rtp_proxy_cmd import Rtp_proxy_cmd 32 33from time import time 34from hashlib import md5 35from random import random 36 37def getnretrans(first_rert, timeout): 38 n = 0 39 while True: 40 timeout -= first_rert 41 if timeout < 0: 42 break 43 first_rert *= 2.0 44 n += 1 45 return n 46 47class Rtp_proxy_client_udp(object): 48 pending_requests = None 49 is_local = False 50 worker = None 51 uopts = None 52 global_config = None 53 delay_flt = None 54 ploss_out_rate = 0.0 55 pdelay_out_max = 0.0 56 57 def __init__(self, global_config, address, bind_address = None, family = None, nworkers = None): 58 self.address = address 59 self.is_local = False 60 self.uopts = Udp_server_opts(bind_address, self.process_reply, family) 61 self.uopts.flags = 0 62 self.uopts.ploss_out_rate = self.ploss_out_rate 63 self.uopts.pdelay_out_max = self.pdelay_out_max 64 if nworkers != None: 65 self.uopts.nworkers = nworkers 66 self.worker = Udp_server(global_config, self.uopts) 67 self.pending_requests = {} 68 self.global_config = global_config 69 self.delay_flt = recfilter(0.95, 0.25) 70 71 def send_command(self, command, result_callback = None, *callback_parameters): 72 cookie = md5(str(random()) + str(time())).hexdigest() 73 next_retr = self.delay_flt.lastval * 4.0 74 rtime = 3.0 75 if isinstance(command, Rtp_proxy_cmd): 76 if command.type == 'I': 77 rtime = 10.0 78 if command.type == 'G': 79 rtime = 1.0 80 nretr = command.nretr 81 command = str(command) 82 else: 83 if command.startswith('I'): 84 rtime = 10.0 85 elif command.startswith('G'): 86 rtime = 1.0 87 nretr = None 88 if nretr == None: 89 nretr = getnretrans(next_retr, rtime) 90 command = '%s %s' % (cookie, command) 91 timer = Timeout(self.retransmit, next_retr, 1, cookie) 92 stime = MonoTime() 93 self.worker.send_to(command, self.address) 94 nretr -= 1 95 self.pending_requests[cookie] = (next_retr, nretr, timer, command, result_callback, stime, callback_parameters) 96 97 def retransmit(self, cookie): 98 next_retr, triesleft, timer, command, result_callback, stime, callback_parameters = self.pending_requests[cookie] 99 #print 'command to %s timeout %s cookie %s triesleft %d' % (str(self.address), command, cookie, triesleft) 100 if triesleft <= 0 or self.worker == None: 101 del self.pending_requests[cookie] 102 self.go_offline() 103 if result_callback != None: 104 result_callback(None, *callback_parameters) 105 return 106 next_retr *= 2 107 timer = Timeout(self.retransmit, next_retr, 1, cookie) 108 stime = MonoTime() 109 self.worker.send_to(command, self.address) 110 triesleft -= 1 111 self.pending_requests[cookie] = (next_retr, triesleft, timer, command, result_callback, stime, callback_parameters) 112 113 def process_reply(self, data, address, worker, rtime): 114 try: 115 cookie, result = data.split(None, 1) 116 except: 117 print('Rtp_proxy_client_udp.process_reply(): invalid response %s' % data) 118 return 119 parameters = self.pending_requests.pop(cookie, None) 120 if parameters == None: 121 return 122 next_retr, triesleft, timer, command, result_callback, stime, callback_parameters = parameters 123 timer.cancel() 124 if result_callback != None: 125 result_callback(result.strip(), *callback_parameters) 126 self.delay_flt.apply(rtime - stime) 127 #print 'Rtp_proxy_client_udp.process_reply(): delay %f' % (rtime - stime) 128 129 def reconnect(self, address, bind_address = None): 130 self.address = address 131 if bind_address != self.uopts.laddress: 132 self.uopts.laddress = bind_address 133 self.worker.shutdown() 134 self.worker = Udp_server(self.global_config, self.uopts) 135 self.delay_flt = recfilter(0.95, 0.25) 136 137 def shutdown(self): 138 self.worker.shutdown() 139 self.worker = None 140 141 def get_rtpc_delay(self): 142 return self.delay_flt.lastval 143 144class selftest(object): 145 def gotreply(self, *args): 146 from twisted.internet import reactor 147 print args 148 reactor.crash() 149 150 def run(self): 151 import os 152 from twisted.internet import reactor 153 global_config = {} 154 global_config['my_pid'] = os.getpid() 155 rtpc = Rtp_proxy_client_udp(global_config, ('127.0.0.1', 22226), None) 156 os.system('sockstat | grep -w %d' % global_config['my_pid']) 157 rtpc.send_command('Ib', self.gotreply) 158 reactor.run() 159 rtpc.reconnect(('localhost', 22226), ('0.0.0.0', 34222)) 160 os.system('sockstat | grep -w %d' % global_config['my_pid']) 161 rtpc.send_command('V', self.gotreply) 162 reactor.run() 163 rtpc.reconnect(('localhost', 22226), ('127.0.0.1', 57535)) 164 os.system('sockstat | grep -w %d' % global_config['my_pid']) 165 rtpc.send_command('V', self.gotreply) 166 reactor.run() 167 rtpc.shutdown() 168 169if __name__ == '__main__': 170 selftest().run() 171