1# Copyright (c) 2003-2005 Maxim Sobolev. All rights reserved.
2# Copyright (c) 2006-2014 Sippy Software, Inc. All rights reserved.
3#
4# All rights reserved.
5#
6# Redistribution and use in source and binary forms, with or without modification,
7# are permitted provided that the following conditions are met:
8#
9# 1. Redistributions of source code must retain the above copyright notice, this
10# list of conditions and the following disclaimer.
11#
12# 2. Redistributions in binary form must reproduce the above copyright notice,
13# this list of conditions and the following disclaimer in the documentation and/or
14# other materials provided with the distribution.
15#
16# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
17# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
19# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
20# ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
21# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
22# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
23# ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26
27from twisted.internet import reactor
28from errno import ECONNRESET, ENOTCONN, ESHUTDOWN, EWOULDBLOCK, ENOBUFS, EAGAIN, \
29  EINTR
30from datetime import datetime
31from time import sleep, time
32from threading import Thread, Condition
33from random import random
34import socket
35import sys, traceback
36
37from Timeout import Timeout
38from Time.MonoTime import MonoTime
39
40class AsyncSender(Thread):
41    userv = None
42
43    def __init__(self, userv):
44        Thread.__init__(self)
45        self.userv = userv
46        self.setDaemon(True)
47        self.start()
48
49    def run(self):
50        while True:
51            self.userv.wi_available.acquire()
52            while len(self.userv.wi) == 0:
53                self.userv.wi_available.wait()
54            wi = self.userv.wi.pop(0)
55            if wi == None:
56                # Shutdown request, relay it further
57                self.userv.wi.append(None)
58                self.userv.wi_available.notify()
59            self.userv.wi_available.release()
60            if wi == None:
61                break
62            data, address = wi
63            try:
64                ai = socket.getaddrinfo(address[0], None, self.userv.uopts.family)
65            except:
66                continue
67            if self.userv.uopts.family == socket.AF_INET:
68                address = (ai[0][4][0], address[1])
69            else:
70                address = (ai[0][4][0], address[1], ai[0][4][2], ai[0][4][3])
71            for i in range(0, 20):
72                try:
73                    if self.userv.skt.sendto(data, address) == len(data):
74                        break
75                except socket.error, why:
76                    if why[0] not in (EWOULDBLOCK, ENOBUFS, EAGAIN):
77                        break
78                sleep(0.01)
79        self.userv = None
80
81class AsyncReceiver(Thread):
82    userv = None
83
84    def __init__(self, userv):
85        Thread.__init__(self)
86        self.userv = userv
87        self.setDaemon(True)
88        self.start()
89
90    def run(self):
91        maxemptydata = 100
92        while True:
93            try:
94                data, address = self.userv.skt.recvfrom(8192)
95                if not data:
96                    # Ugly hack to detect socket being closed under us on Linux.
97                    # The problem is that even call on non-closed socket can
98                    # sometimes return empty data buffer, making AsyncReceiver
99                    # to exit prematurely.
100                    maxemptydata -= 1
101                    if maxemptydata == 0:
102                        break
103                    continue
104                else:
105                    maxemptydata = 100
106            except Exception, why:
107                if isinstance(why, socket.error) and why[0] in (ECONNRESET, ENOTCONN, ESHUTDOWN):
108                    break
109                if isinstance(why, socket.error) and why[0] in (EINTR,):
110                    continue
111                else:
112                    print datetime.now(), 'Udp_server: unhandled exception when receiving incoming data'
113                    print '-' * 70
114                    traceback.print_exc(file = sys.stdout)
115                    print '-' * 70
116                    sys.stdout.flush()
117                    sleep(1)
118                    continue
119            rtime = MonoTime()
120            if self.userv.uopts.family == socket.AF_INET6:
121                address = ('[%s]' % address[0], address[1])
122            reactor.callFromThread(self.userv.handle_read, data, address, rtime)
123        self.userv = None
124
125_DEFAULT_FLAGS = socket.SO_REUSEADDR
126if hasattr(socket, 'SO_REUSEPORT'):
127    _DEFAULT_FLAGS |= socket.SO_REUSEPORT
128_DEFAULT_NWORKERS = 30
129
130class Udp_server_opts(object):
131    laddress = None
132    data_callback = None
133    family = None
134    flags = _DEFAULT_FLAGS
135    nworkers = _DEFAULT_NWORKERS
136    ploss_out_rate = 0.0
137    pdelay_out_max = 0.0
138    ploss_in_rate = 0.0
139    pdelay_in_max = 0.0
140
141    def __init__(self, laddress, data_callback, family = None, o = None):
142        if o == None:
143            if family == None:
144                if laddress != None and laddress[0].startswith('['):
145                    family = socket.AF_INET6
146                    laddress = (laddress[0][1:-1], laddress[1])
147                else:
148                    family = socket.AF_INET
149            self.family = family
150            self.laddress = laddress
151            self.data_callback = data_callback
152        else:
153            self.laddress, self.data_callback, self.family, self.nworkers, self.flags, \
154              self.ploss_out_rate, self.pdelay_out_max, self.ploss_in_rate, \
155              self.pdelay_in_max = o.laddress, o.data_callback, o.family, \
156              o.nworkers, o.flags, o.ploss_out_rate, o.pdelay_out_max, o.ploss_in_rate, \
157              o.pdelay_in_max
158
159    def getCopy(self):
160        return self.__class__(None, None, o = self)
161
162class Udp_server(object):
163    skt = None
164    uopts = None
165    sendqueue = None
166    stats = None
167    wi_available = None
168    wi = None
169    asenders = None
170    areceivers = None
171
172    def __init__(self, global_config, uopts):
173        self.uopts = uopts.getCopy()
174        self.skt = socket.socket(self.uopts.family, socket.SOCK_DGRAM)
175        if self.uopts.laddress != None:
176            ai = socket.getaddrinfo(self.uopts.laddress[0], None, self.uopts.family)
177            if self.uopts.family == socket.AF_INET:
178                address = (ai[0][4][0], self.uopts.laddress[1])
179            else:
180                address = (ai[0][4][0], self.uopts.laddress[1], ai[0][4][2], ai[0][4][3])
181            if (self.uopts.flags & socket.SO_REUSEADDR) != 0:
182                self.skt.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
183            if hasattr(socket, 'SO_REUSEPORT') and \
184              (self.uopts.flags & socket.SO_REUSEPORT) != 0:
185                self.skt.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
186            self.skt.bind(address)
187        self.sendqueue = []
188        self.stats = [0, 0, 0]
189        self.wi_available = Condition()
190        self.wi = []
191        self.asenders = []
192        self.areceivers = []
193        for i in range(0, self.uopts.nworkers):
194            self.asenders.append(AsyncSender(self))
195            self.areceivers.append(AsyncReceiver(self))
196
197    def send_to(self, data, address, delayed = False):
198        if not isinstance(address, tuple):
199            raise Exception('Invalid address, not a tuple: %s' % str(address))
200        if self.uopts.ploss_out_rate > 0.0 and not delayed:
201            if random() < self.uopts.ploss_out_rate:
202                return
203        if self.uopts.pdelay_out_max > 0.0 and not delayed:
204            pdelay = self.uopts.pdelay_out_max * random()
205            Timeout(self.send_to, pdelay, 1, data, address, True)
206            return
207        addr, port = address
208        if self.uopts.family == socket.AF_INET6:
209            if not addr.startswith('['):
210                raise Exception('Invalid IPv6 address: %s' % addr)
211            address = (addr[1:-1], port)
212        self.wi_available.acquire()
213        self.wi.append((data, address))
214        self.wi_available.notify()
215        self.wi_available.release()
216
217    def handle_read(self, data, address, rtime, delayed = False):
218        if len(data) > 0 and self.uopts.data_callback != None:
219            self.stats[2] += 1
220            if self.uopts.ploss_in_rate > 0.0 and not delayed:
221                if random() < self.uopts.ploss_in_rate:
222                    return
223            if self.uopts.pdelay_in_max > 0.0 and not delayed:
224                pdelay = self.uopts.pdelay_in_max * random()
225                Timeout(self.handle_read, pdelay, 1, data, address, rtime + pdelay, True)
226                return
227            try:
228                self.uopts.data_callback(data, address, self, rtime)
229            except:
230                print datetime.now(), 'Udp_server: unhandled exception when processing incoming data'
231                print '-' * 70
232                traceback.print_exc(file = sys.stdout)
233                print '-' * 70
234                sys.stdout.flush()
235
236    def shutdown(self):
237        try:
238            self.skt.shutdown(socket.SHUT_RDWR)
239        except:
240            pass
241        self.wi_available.acquire()
242        self.wi.append(None)
243        self.wi_available.notify()
244        self.wi_available.release()
245        self.uopts.data_callback = None
246        for worker in self.asenders + self.areceivers:
247            worker.join()
248        self.asenders = None
249        self.areceivers = None
250
251class self_test(object):
252    from sys import exit
253    npongs = 2
254    ping_data = 'ping!'
255    ping_data6 = 'ping6!'
256    pong_laddr = None
257    pong_laddr6 = None
258    pong_data = 'pong!'
259    pong_data6 = 'pong6!'
260    ping_laddr = None
261    ping_laddr6 = None
262    ping_raddr = None
263    ping_raddr6 = None
264    pong_raddr = None
265    pong_raddr6 = None
266
267    def ping_received(self, data, address, udp_server, rtime):
268        if udp_server.uopts.family == socket.AF_INET:
269            print 'ping_received'
270            if data != self.ping_data or address != self.pong_raddr:
271                print data, address, self.ping_data, self.pong_raddr
272                exit(1)
273            udp_server.send_to(self.pong_data, address)
274        else:
275            print 'ping_received6'
276            if data != self.ping_data6 or address != self.pong_raddr6:
277                exit(1)
278            udp_server.send_to(self.pong_data6, address)
279
280    def pong_received(self, data, address, udp_server, rtime):
281        if udp_server.uopts.family == socket.AF_INET:
282            print 'pong_received'
283            if data != self.pong_data or address != self.ping_raddr:
284                exit(1)
285        else:
286            print 'pong_received6'
287            if data != self.pong_data6 or address != self.ping_raddr6:
288                exit(1)
289        self.npongs -= 1
290        if self.npongs == 0:
291            reactor.stop()
292
293    def run(self):
294        local_host = '127.0.0.1'
295        local_host6 = '[::1]'
296        remote_host = local_host
297        remote_host6 = local_host6
298        self.ping_laddr = (local_host, 12345)
299        self.pong_laddr = (local_host, 54321)
300        self.ping_laddr6 = (local_host6, 12345)
301        self.pong_laddr6 = (local_host6, 54321)
302        self.ping_raddr = (remote_host, 12345)
303        self.pong_raddr = (remote_host, 54321)
304        self.ping_raddr6 = (remote_host6, 12345)
305        self.pong_raddr6 = (remote_host6, 54321)
306        uopts_ping = Udp_server_opts(self.ping_laddr, self.ping_received)
307        uopts_ping6 = Udp_server_opts(self.ping_laddr6, self.ping_received)
308        uopts_pong = Udp_server_opts(self.pong_laddr, self.pong_received)
309        uopts_pong6 = Udp_server_opts(self.pong_laddr6, self.pong_received)
310        udp_server_ping = Udp_server({}, uopts_ping)
311        udp_server_pong = Udp_server({}, uopts_pong)
312        udp_server_pong.send_to(self.ping_data, self.ping_laddr)
313        udp_server_ping6 = Udp_server({}, uopts_ping6)
314        udp_server_pong6 = Udp_server({}, uopts_pong6)
315        udp_server_pong6.send_to(self.ping_data6, self.ping_laddr6)
316        reactor.run()
317        udp_server_ping.shutdown()
318        udp_server_pong.shutdown()
319        udp_server_ping6.shutdown()
320        udp_server_pong6.shutdown()
321
322if __name__ == '__main__':
323    self_test().run()
324