1# Copyright (c) 2003-2005 Maxim Sobolev. All rights reserved.
2# Copyright (c) 2006-2014 Sippy Software, Inc. All rights reserved.
3#
4# All rights reserved.
5#
6# Redistribution and use in source and binary forms, with or without modification,
7# are permitted provided that the following conditions are met:
8#
9# 1. Redistributions of source code must retain the above copyright notice, this
10# list of conditions and the following disclaimer.
11#
12# 2. Redistributions in binary form must reproduce the above copyright notice,
13# this list of conditions and the following disclaimer in the documentation and/or
14# other materials provided with the distribution.
15#
16# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
17# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
19# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
20# ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
21# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
22# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
23# ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27from Timeout import Timeout
28from threading import Thread, Condition
29from errno import EINTR, EPIPE, ENOTCONN, ECONNRESET
30from twisted.internet import reactor
31from Time.MonoTime import MonoTime
32from Math.recfilter import recfilter
33
34from datetime import datetime
35import socket
36import sys, traceback
37
38_MAX_RECURSE = 10
39
40class _RTPPLWorker(Thread):
41    userv = None
42
43    def __init__(self, userv):
44        Thread.__init__(self)
45        self.userv = userv
46        self.setDaemon(True)
47        self.connect()
48        self.start()
49
50    def connect(self):
51        self.s = socket.socket(self.userv.family, socket.SOCK_STREAM)
52        self.s.connect(self.userv.address)
53
54    def send_raw(self, command, _recurse = 0, stime = None):
55        if _recurse > _MAX_RECURSE:
56            raise Exception('Cannot reconnect: %s', self.userv.address)
57        if not command.endswith('\n'):
58            command += '\n'
59        #print '%s.send_raw(%s)' % (id(self), command)
60        if stime == None:
61            stime = MonoTime()
62        while True:
63            try:
64                self.s.send(command)
65                break
66            except socket.error, why:
67                if why[0] == EINTR:
68                    continue
69                elif why[0] in (EPIPE, ENOTCONN, ECONNRESET):
70                    self.connect()
71                    return self.send_raw(command, _recurse + 1, stime)
72                raise why
73        while True:
74            try:
75                rval = self.s.recv(1024)
76                if len(rval) == 0:
77                    self.connect()
78                    return self.send_raw(command, _MAX_RECURSE, stime)
79                rval = rval.strip()
80                break
81            except socket.error, why:
82                if why[0] == EINTR:
83                    continue
84                elif why[0] in (EPIPE, ENOTCONN, ECONNRESET):
85                    self.connect()
86                    return self.send_raw(command, _recurse + 1, stime)
87                raise why
88        rtpc_delay = stime.offsetFromNow()
89        return (rval, rtpc_delay)
90
91    def run(self):
92        while True:
93            self.userv.wi_available.acquire()
94            while len(self.userv.wi) == 0:
95                self.userv.wi_available.wait()
96            wi = self.userv.wi.pop(0)
97            if wi == None:
98                # Shutdown request, relay it further
99                self.userv.wi.append(None)
100                self.userv.wi_available.notify()
101            self.userv.wi_available.release()
102            if wi == None:
103                break
104            command, result_callback, callback_parameters = wi
105            try:
106                data, rtpc_delay = self.send_raw(command)
107                if len(data) == 0:
108                    data, rtpc_delay = None, None
109            except Exception, e:
110                print e
111                data, rtpc_delay = None, None
112            if result_callback != None:
113                reactor.callFromThread(self.dispatch, result_callback, data, callback_parameters)
114            if rtpc_delay != None:
115                reactor.callFromThread(self.userv.register_delay, rtpc_delay)
116
117    def dispatch(self, result_callback, data, callback_parameters):
118        try:
119            result_callback(data, *callback_parameters)
120        except:
121            print datetime.now(), 'Rtp_proxy_client_stream: unhandled exception when processing RTPproxy reply'
122            print '-' * 70
123            traceback.print_exc(file = sys.stdout)
124            print '-' * 70
125            sys.stdout.flush()
126
127class Rtp_proxy_client_stream(object):
128    is_local = None
129    wi_available = None
130    wi = None
131    nworkers = None
132    workers = None
133    delay_flt = None
134    family = None
135
136    def __init__(self, global_config, address = '/var/run/rtpproxy.sock', \
137      bind_address = None, nworkers = 1, family = socket.AF_UNIX):
138        if family == socket.AF_UNIX:
139            self.is_local = True
140        else:
141            self.is_local = False
142        self.family = family
143        self.address = address
144        self.wi_available = Condition()
145        self.wi = []
146        self.nworkers = nworkers
147        self.workers = []
148        for i in range(0, self.nworkers):
149            self.workers.append(_RTPPLWorker(self))
150        self.delay_flt = recfilter(0.95, 0.25)
151
152    def send_command(self, command, result_callback = None, *callback_parameters):
153        if not command.endswith('\n'):
154            command += '\n'
155        self.wi_available.acquire()
156        self.wi.append((command, result_callback, callback_parameters))
157        self.wi_available.notify()
158        self.wi_available.release()
159
160    def reconnect(self, address, bind_address = None):
161        self.shutdown()
162        self.address = address
163        self.workers = []
164        for i in range(0, self.nworkers):
165            self.workers.append(_RTPPLWorker(self))
166        self.delay_flt = recfilter(0.95, 0.25)
167
168    def shutdown(self):
169        self.wi_available.acquire()
170        self.wi.append(None)
171        self.wi_available.notify()
172        self.wi_available.release()
173        for rworker in self.workers:
174            rworker.join()
175        self.workers = None
176
177    def register_delay(self, rtpc_delay):
178        self.delay_flt.apply(rtpc_delay)
179
180    def get_rtpc_delay(self):
181        return self.delay_flt.lastval
182
183if __name__ == '__main__':
184    from twisted.internet import reactor
185    def display(*args):
186        print args
187        reactor.crash()
188    r = Rtp_proxy_client_stream({'_sip_address':'1.2.3.4'})
189    r.send_command('VF 123456', display, 'abcd')
190    reactor.run(installSignalHandlers = 1)
191    r.shutdown()
192