1# Copyright (c) 2003-2005 Maxim Sobolev. All rights reserved.
2# Copyright (c) 2006-2014 Sippy Software, Inc. All rights reserved.
3#
4# All rights reserved.
5#
6# Redistribution and use in source and binary forms, with or without modification,
7# are permitted provided that the following conditions are met:
8#
9# 1. Redistributions of source code must retain the above copyright notice, this
10# list of conditions and the following disclaimer.
11#
12# 2. Redistributions in binary form must reproduce the above copyright notice,
13# this list of conditions and the following disclaimer in the documentation and/or
14# other materials provided with the distribution.
15#
16# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
17# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
19# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
20# ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
21# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
22# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
23# ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27from Timeout import Timeout
28from Udp_server import Udp_server, Udp_server_opts
29from Time.MonoTime import MonoTime
30from Math.recfilter import recfilter
31from Rtp_proxy_cmd import Rtp_proxy_cmd
32
33from time import time
34from hashlib import md5
35from random import random
36
37def getnretrans(first_rert, timeout):
38    n = 0
39    while True:
40        timeout -= first_rert
41        if timeout < 0:
42            break
43        first_rert *= 2.0
44        n += 1
45    return n
46
47class Rtp_proxy_client_udp(object):
48    pending_requests = None
49    is_local = False
50    worker = None
51    uopts = None
52    global_config = None
53    delay_flt = None
54    ploss_out_rate = 0.0
55    pdelay_out_max = 0.0
56
57    def __init__(self, global_config, address, bind_address = None, family = None, nworkers = None):
58        self.address = address
59        self.is_local = False
60        self.uopts = Udp_server_opts(bind_address, self.process_reply, family)
61        self.uopts.flags = 0
62        self.uopts.ploss_out_rate = self.ploss_out_rate
63        self.uopts.pdelay_out_max = self.pdelay_out_max
64        if nworkers != None:
65            self.uopts.nworkers = nworkers
66        self.worker = Udp_server(global_config, self.uopts)
67        self.pending_requests = {}
68        self.global_config = global_config
69        self.delay_flt = recfilter(0.95, 0.25)
70
71    def send_command(self, command, result_callback = None, *callback_parameters):
72        cookie = md5(str(random()) + str(time())).hexdigest()
73        next_retr = self.delay_flt.lastval * 4.0
74        rtime = 3.0
75        if isinstance(command, Rtp_proxy_cmd):
76            if command.type == 'I':
77                rtime = 10.0
78            if command.type == 'G':
79                rtime = 1.0
80            nretr = command.nretr
81            command = str(command)
82        else:
83            if command.startswith('I'):
84                rtime = 10.0
85            elif command.startswith('G'):
86                rtime = 1.0
87            nretr = None
88        if nretr == None:
89            nretr = getnretrans(next_retr, rtime)
90        command = '%s %s' % (cookie, command)
91        timer = Timeout(self.retransmit, next_retr, 1, cookie)
92        stime = MonoTime()
93        self.worker.send_to(command, self.address)
94        nretr -= 1
95        self.pending_requests[cookie] = (next_retr, nretr, timer, command, result_callback, stime, callback_parameters)
96
97    def retransmit(self, cookie):
98        next_retr, triesleft, timer, command, result_callback, stime, callback_parameters = self.pending_requests[cookie]
99        #print 'command to %s timeout %s cookie %s triesleft %d' % (str(self.address), command, cookie, triesleft)
100        if triesleft <= 0 or self.worker == None:
101            del self.pending_requests[cookie]
102            self.go_offline()
103            if result_callback != None:
104                result_callback(None, *callback_parameters)
105            return
106        next_retr *= 2
107        timer = Timeout(self.retransmit, next_retr, 1, cookie)
108        stime = MonoTime()
109        self.worker.send_to(command, self.address)
110        triesleft -= 1
111        self.pending_requests[cookie] = (next_retr, triesleft, timer, command, result_callback, stime, callback_parameters)
112
113    def process_reply(self, data, address, worker, rtime):
114        try:
115            cookie, result = data.split(None, 1)
116        except:
117            print('Rtp_proxy_client_udp.process_reply(): invalid response %s' % data)
118            return
119        parameters = self.pending_requests.pop(cookie, None)
120        if parameters == None:
121            return
122        next_retr, triesleft, timer, command, result_callback, stime, callback_parameters = parameters
123        timer.cancel()
124        if result_callback != None:
125            result_callback(result.strip(), *callback_parameters)
126        self.delay_flt.apply(rtime - stime)
127        #print 'Rtp_proxy_client_udp.process_reply(): delay %f' % (rtime - stime)
128
129    def reconnect(self, address, bind_address = None):
130        self.address = address
131        if bind_address != self.uopts.laddress:
132            self.uopts.laddress = bind_address
133            self.worker.shutdown()
134            self.worker = Udp_server(self.global_config, self.uopts)
135            self.delay_flt = recfilter(0.95, 0.25)
136
137    def shutdown(self):
138        self.worker.shutdown()
139        self.worker = None
140
141    def get_rtpc_delay(self):
142        return self.delay_flt.lastval
143
144class selftest(object):
145    def gotreply(self, *args):
146        from twisted.internet import reactor
147        print args
148        reactor.crash()
149
150    def run(self):
151        import os
152        from twisted.internet import reactor
153        global_config = {}
154        global_config['my_pid'] = os.getpid()
155        rtpc = Rtp_proxy_client_udp(global_config, ('127.0.0.1', 22226), None)
156        os.system('sockstat | grep -w %d' % global_config['my_pid'])
157        rtpc.send_command('Ib', self.gotreply)
158        reactor.run()
159        rtpc.reconnect(('localhost', 22226), ('0.0.0.0', 34222))
160        os.system('sockstat | grep -w %d' % global_config['my_pid'])
161        rtpc.send_command('V', self.gotreply)
162        reactor.run()
163        rtpc.reconnect(('localhost', 22226), ('127.0.0.1', 57535))
164        os.system('sockstat | grep -w %d' % global_config['my_pid'])
165        rtpc.send_command('V', self.gotreply)
166        reactor.run()
167        rtpc.shutdown()
168
169if __name__ == '__main__':
170    selftest().run()
171