1# Copyright (c) 2003-2005 Maxim Sobolev. All rights reserved. 2# Copyright (c) 2006-2014 Sippy Software, Inc. All rights reserved. 3# 4# All rights reserved. 5# 6# Redistribution and use in source and binary forms, with or without modification, 7# are permitted provided that the following conditions are met: 8# 9# 1. Redistributions of source code must retain the above copyright notice, this 10# list of conditions and the following disclaimer. 11# 12# 2. Redistributions in binary form must reproduce the above copyright notice, 13# this list of conditions and the following disclaimer in the documentation and/or 14# other materials provided with the distribution. 15# 16# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND 17# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 18# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE 19# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR 20# ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES 21# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 22# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON 23# ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 24# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS 25# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 26 27from Timeout import Timeout 28from threading import Thread, Condition 29from errno import EINTR, EPIPE, ENOTCONN, ECONNRESET 30from twisted.internet import reactor 31from Time.MonoTime import MonoTime 32from Math.recfilter import recfilter 33 34from datetime import datetime 35import socket 36import sys, traceback 37 38_MAX_RECURSE = 10 39 40class _RTPPLWorker(Thread): 41 userv = None 42 43 def __init__(self, userv): 44 Thread.__init__(self) 45 self.userv = userv 46 self.setDaemon(True) 47 self.connect() 48 self.start() 49 50 def connect(self): 51 self.s = socket.socket(self.userv.family, socket.SOCK_STREAM) 52 self.s.connect(self.userv.address) 53 54 def send_raw(self, command, _recurse = 0, stime = None): 55 if _recurse > _MAX_RECURSE: 56 raise Exception('Cannot reconnect: %s', self.userv.address) 57 if not command.endswith('\n'): 58 command += '\n' 59 #print '%s.send_raw(%s)' % (id(self), command) 60 if stime == None: 61 stime = MonoTime() 62 while True: 63 try: 64 self.s.send(command) 65 break 66 except socket.error, why: 67 if why[0] == EINTR: 68 continue 69 elif why[0] in (EPIPE, ENOTCONN, ECONNRESET): 70 self.connect() 71 return self.send_raw(command, _recurse + 1, stime) 72 raise why 73 while True: 74 try: 75 rval = self.s.recv(1024) 76 if len(rval) == 0: 77 self.connect() 78 return self.send_raw(command, _MAX_RECURSE, stime) 79 rval = rval.strip() 80 break 81 except socket.error, why: 82 if why[0] == EINTR: 83 continue 84 elif why[0] in (EPIPE, ENOTCONN, ECONNRESET): 85 self.connect() 86 return self.send_raw(command, _recurse + 1, stime) 87 raise why 88 rtpc_delay = stime.offsetFromNow() 89 return (rval, rtpc_delay) 90 91 def run(self): 92 while True: 93 self.userv.wi_available.acquire() 94 while len(self.userv.wi) == 0: 95 self.userv.wi_available.wait() 96 wi = self.userv.wi.pop(0) 97 if wi == None: 98 # Shutdown request, relay it further 99 self.userv.wi.append(None) 100 self.userv.wi_available.notify() 101 self.userv.wi_available.release() 102 if wi == None: 103 break 104 command, result_callback, callback_parameters = wi 105 try: 106 data, rtpc_delay = self.send_raw(command) 107 if len(data) == 0: 108 data, rtpc_delay = None, None 109 except Exception, e: 110 print e 111 data, rtpc_delay = None, None 112 if result_callback != None: 113 reactor.callFromThread(self.dispatch, result_callback, data, callback_parameters) 114 if rtpc_delay != None: 115 reactor.callFromThread(self.userv.register_delay, rtpc_delay) 116 117 def dispatch(self, result_callback, data, callback_parameters): 118 try: 119 result_callback(data, *callback_parameters) 120 except: 121 print datetime.now(), 'Rtp_proxy_client_stream: unhandled exception when processing RTPproxy reply' 122 print '-' * 70 123 traceback.print_exc(file = sys.stdout) 124 print '-' * 70 125 sys.stdout.flush() 126 127class Rtp_proxy_client_stream(object): 128 is_local = None 129 wi_available = None 130 wi = None 131 nworkers = None 132 workers = None 133 delay_flt = None 134 family = None 135 136 def __init__(self, global_config, address = '/var/run/rtpproxy.sock', \ 137 bind_address = None, nworkers = 1, family = socket.AF_UNIX): 138 if family == socket.AF_UNIX: 139 self.is_local = True 140 else: 141 self.is_local = False 142 self.family = family 143 self.address = address 144 self.wi_available = Condition() 145 self.wi = [] 146 self.nworkers = nworkers 147 self.workers = [] 148 for i in range(0, self.nworkers): 149 self.workers.append(_RTPPLWorker(self)) 150 self.delay_flt = recfilter(0.95, 0.25) 151 152 def send_command(self, command, result_callback = None, *callback_parameters): 153 if not command.endswith('\n'): 154 command += '\n' 155 self.wi_available.acquire() 156 self.wi.append((command, result_callback, callback_parameters)) 157 self.wi_available.notify() 158 self.wi_available.release() 159 160 def reconnect(self, address, bind_address = None): 161 self.shutdown() 162 self.address = address 163 self.workers = [] 164 for i in range(0, self.nworkers): 165 self.workers.append(_RTPPLWorker(self)) 166 self.delay_flt = recfilter(0.95, 0.25) 167 168 def shutdown(self): 169 self.wi_available.acquire() 170 self.wi.append(None) 171 self.wi_available.notify() 172 self.wi_available.release() 173 for rworker in self.workers: 174 rworker.join() 175 self.workers = None 176 177 def register_delay(self, rtpc_delay): 178 self.delay_flt.apply(rtpc_delay) 179 180 def get_rtpc_delay(self): 181 return self.delay_flt.lastval 182 183if __name__ == '__main__': 184 from twisted.internet import reactor 185 def display(*args): 186 print args 187 reactor.crash() 188 r = Rtp_proxy_client_stream({'_sip_address':'1.2.3.4'}) 189 r.send_command('VF 123456', display, 'abcd') 190 reactor.run(installSignalHandlers = 1) 191 r.shutdown() 192