1 /*
2  * Copyright (c) 2014 Sippy Software, Inc., http://www.sippysoft.com
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  *    notice, this list of conditions and the following disclaimer.
10  * 2. Redistributions in binary form must reproduce the above copyright
11  *    notice, this list of conditions and the following disclaimer in the
12  *    documentation and/or other materials provided with the distribution.
13  *
14  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24  * SUCH DAMAGE.
25  *
26  */
27 
28 #include <sys/types.h>
29 #include <sys/socket.h>
30 #include <errno.h>
31 #include <pthread.h>
32 #include <sched.h>
33 #include <signal.h>
34 #include <stdint.h>
35 #include <stdlib.h>
36 #include <string.h>
37 
38 #include "rtpp_log.h"
39 #include "rtpp_cfg_stable.h"
40 #include "rtpp_defines.h"
41 #include "rtp.h"
42 #include "rtp_packet.h"
43 #include "rtpp_wi.h"
44 #include "rtpp_wi_private.h"
45 #include "rtpp_types.h"
46 #include "rtpp_refcnt.h"
47 #include "rtpp_log_obj.h"
48 #include "rtpp_queue.h"
49 #include "rtpp_network.h"
50 #include "rtpp_netio_async.h"
51 #include "rtpp_time.h"
52 #include "rtpp_mallocs.h"
53 #include "rtpp_debug.h"
54 #ifdef RTPP_DEBUG
55 #include "rtpp_math.h"
56 #endif
57 
58 struct sthread_args {
59     struct rtpp_queue *out_q;
60     struct rtpp_log *glog;
61     int dmode;
62 #if RTPP_DEBUG_timers
63     struct recfilter average_load;
64 #endif
65     struct rtpp_wi *sigterm;
66 };
67 
68 #define SEND_THREADS 1
69 
70 struct rtpp_anetio_cf {
71     pthread_t thread_id[SEND_THREADS];
72     struct sthread_args args[SEND_THREADS];
73 };
74 
75 #define RTPP_ANETIO_MAX_RETRY 3
76 
77 static void
rtpp_anetio_sthread(struct sthread_args * args)78 rtpp_anetio_sthread(struct sthread_args *args)
79 {
80     int n, nsend, i, send_errno, nretry;
81     struct rtpp_wi *wi, *wis[100];
82 #if RTPP_DEBUG_timers
83     double tp[3], runtime, sleeptime;
84     long run_n;
85 
86     runtime = sleeptime = 0.0;
87     run_n = 0;
88     tp[0] = getdtime();
89 #endif
90     for (;;) {
91         nsend = rtpp_queue_get_items(args->out_q, wis, 100, 0);
92 #if RTPP_DEBUG_timers
93         tp[1] = getdtime();
94 #endif
95 
96         for (i = 0; i < nsend; i++) {
97 	    wi = wis[i];
98             if (wi->wi_type == RTPP_WI_TYPE_SGNL) {
99                 rtpp_wi_free(wi);
100                 goto out;
101             }
102             nretry = 0;
103             do {
104                 n = sendto(wi->sock, wi->msg, wi->msg_len, wi->flags,
105                   wi->sendto, wi->tolen);
106                 send_errno = (n < 0) ? errno : 0;
107 #if RTPP_DEBUG_netio >= 1
108                 if (wi->debug != 0) {
109                     char daddr[MAX_AP_STRBUF];
110 
111                     addrport2char_r(wi->sendto, daddr, sizeof(daddr), ':');
112                     if (n < 0) {
113                         RTPP_ELOG(wi->log, RTPP_LOG_DBUG,
114                           "sendto(%d, %p, %lld, %d, %p (%s), %d) = %d",
115                           wi->sock, wi->msg, (long long)wi->msg_len, wi->flags,
116                           wi->sendto, daddr, wi->tolen, n);
117                     } else if (n < wi->msg_len) {
118                         RTPP_LOG(wi->log, RTPP_LOG_DBUG,
119                           "sendto(%d, %p, %lld, %d, %p (%s), %d) = %d: short write",
120                           wi->sock, wi->msg, (long long)wi->msg_len, wi->flags,
121                           wi->sendto, daddr, wi->tolen, n);
122 #if RTPP_DEBUG_netio >= 2
123                     } else {
124                         RTPP_LOG(wi->log, RTPP_LOG_DBUG,
125                           "sendto(%d, %p, %d, %d, %p (%s), %d) = %d",
126                           wi->sock, wi->msg, wi->msg_len, wi->flags, wi->sendto, daddr,
127                           wi->tolen, n);
128 #endif
129                     }
130                 }
131 #endif
132                 if (n >= 0) {
133                     wi->nsend--;
134                 } else {
135                     /* "EPERM" is Linux thing, yield and retry */
136                     if ((send_errno == EPERM || send_errno == ENOBUFS)
137                       && nretry < RTPP_ANETIO_MAX_RETRY) {
138                         sched_yield();
139                         nretry++;
140                     } else {
141                         break;
142                     }
143                 }
144             } while (wi->nsend > 0);
145             rtpp_wi_free(wi);
146         }
147 #if RTPP_DEBUG_timers
148         sleeptime += tp[1] - tp[0];
149         tp[0] = getdtime();
150         runtime += tp[0] - tp[1];
151         if ((run_n % 10000) == 0) {
152             RTPP_LOG(args->glog, RTPP_LOG_DBUG, "rtpp_anetio_sthread(%p): run %ld aload = %f filtered = %f", \
153               args, run_n, runtime / (runtime + sleeptime), args->average_load.lastval);
154         }
155         if (runtime + sleeptime > 1.0) {
156             recfilter_apply(&args->average_load, runtime / (runtime + sleeptime));
157             runtime = sleeptime = 0.0;
158         }
159         run_n += 1;
160 #endif
161     }
162 out:
163     return;
164 }
165 
166 int
rtpp_anetio_sendto(struct rtpp_anetio_cf * netio_cf,int sock,const void * msg,size_t msg_len,int flags,const struct sockaddr * sendto,socklen_t tolen)167 rtpp_anetio_sendto(struct rtpp_anetio_cf *netio_cf, int sock, const void *msg, \
168   size_t msg_len, int flags, const struct sockaddr *sendto, socklen_t tolen)
169 {
170     struct rtpp_wi *wi;
171 
172     wi = rtpp_wi_malloc(sock, msg, msg_len, flags, sendto, tolen);
173     if (wi == NULL) {
174         return (-1);
175     }
176 #if RTPP_DEBUG_netio >= 1
177     wi->debug = 1;
178     wi->log = netio_cf->args[0].glog;
179     CALL_SMETHOD(wi->log->rcnt, incref);
180 #if RTPP_DEBUG_netio >= 2
181     RTPP_LOG(netio_cf->args[0].glog, RTPP_LOG_DBUG, "malloc(%d, %p, %d, %d, %p, %d) = %p",
182       sock, msg, msg_len, flags, sendto, tolen, wi);
183     RTPP_LOG(netio_cf->args[0].glog, RTPP_LOG_DBUG, "sendto(%d, %p, %d, %d, %p, %d)",
184       wi->sock, wi->msg, wi->msg_len, wi->flags, wi->sendto, wi->tolen);
185 #endif
186 #endif
187     rtpp_queue_put_item(wi, netio_cf->args[0].out_q);
188     return (0);
189 }
190 
191 void
rtpp_anetio_pump(struct rtpp_anetio_cf * netio_cf)192 rtpp_anetio_pump(struct rtpp_anetio_cf *netio_cf)
193 {
194 
195     rtpp_queue_pump(netio_cf->args[0].out_q);
196 }
197 
198 void
rtpp_anetio_pump_q(struct sthread_args * sender)199 rtpp_anetio_pump_q(struct sthread_args *sender)
200 {
201 
202     rtpp_queue_pump(sender->out_q);
203 }
204 
205 int
rtpp_anetio_send_pkt(struct sthread_args * sender,int sock,const struct sockaddr * sendto,socklen_t tolen,struct rtp_packet * pkt,struct rtpp_refcnt * sock_rcnt,struct rtpp_log * plog)206 rtpp_anetio_send_pkt(struct sthread_args *sender, int sock, \
207   const struct sockaddr *sendto, socklen_t tolen, struct rtp_packet *pkt,
208   struct rtpp_refcnt *sock_rcnt, struct rtpp_log *plog)
209 {
210     struct rtpp_wi *wi;
211     int nsend;
212 
213     if (sender->dmode != 0 && pkt->size < LBR_THRS) {
214         nsend = 2;
215     } else {
216         nsend = 1;
217     }
218 
219     wi = rtpp_wi_malloc_pkt(sock, pkt, sendto, tolen, nsend, sock_rcnt);
220     if (wi == NULL) {
221         rtp_packet_free(pkt);
222         return (-1);
223     }
224     /*
225      * rtpp_wi_malloc_pkt() consumes pkt and returns wi, so no need to
226      * call rtp_packet_free() here.
227      */
228 #if RTPP_DEBUG_netio >= 2
229     wi->debug = 1;
230     if (plog == NULL) {
231         plog = sender->glog;
232     }
233     CALL_SMETHOD(plog->rcnt, incref);
234     wi->log = plog;
235     RTPP_LOG(plog, RTPP_LOG_DBUG, "send_pkt(%d, %p, %d, %d, %p, %d)",
236       wi->sock, wi->msg, wi->msg_len, wi->flags, wi->sendto, wi->tolen);
237 #endif
238     rtpp_queue_put_item(wi, sender->out_q);
239     return (0);
240 }
241 
242 int
rtpp_anetio_send_pkt_na(struct sthread_args * sender,int sock,struct rtpp_netaddr * sendto,struct rtp_packet * pkt,struct rtpp_refcnt * sock_rcnt,struct rtpp_log * plog)243 rtpp_anetio_send_pkt_na(struct sthread_args *sender, int sock, \
244   struct rtpp_netaddr *sendto, struct rtp_packet *pkt,
245   struct rtpp_refcnt *sock_rcnt, struct rtpp_log *plog)
246 {
247     struct rtpp_wi *wi;
248     int nsend;
249 
250     if (sender->dmode != 0 && pkt->size < LBR_THRS) {
251         nsend = 2;
252     } else {
253         nsend = 1;
254     }
255 
256     wi = rtpp_wi_malloc_pkt_na(sock, pkt, sendto, nsend, sock_rcnt);
257     if (wi == NULL) {
258         rtp_packet_free(pkt);
259         return (-1);
260     }
261     /*
262      * rtpp_wi_malloc_pkt() consumes pkt and returns wi, so no need to
263      * call rtp_packet_free() here.
264      */
265 #if RTPP_DEBUG_netio >= 2
266     wi->debug = 1;
267     if (plog == NULL) {
268         plog = sender->glog;
269     }
270     CALL_SMETHOD(plog->rcnt, incref);
271     wi->log = plog;
272     RTPP_LOG(plog, RTPP_LOG_DBUG, "send_pkt(%d, %p, %d, %d, %p, %d)",
273       wi->sock, wi->msg, wi->msg_len, wi->flags, wi->sendto, wi->tolen);
274 #endif
275     rtpp_queue_put_item(wi, sender->out_q);
276     return (0);
277 }
278 
279 struct sthread_args *
rtpp_anetio_pick_sender(struct rtpp_anetio_cf * netio_cf)280 rtpp_anetio_pick_sender(struct rtpp_anetio_cf *netio_cf)
281 {
282     int min_len, i, l;
283     struct sthread_args *sender;
284 
285     sender = &netio_cf->args[0];
286     min_len = rtpp_queue_get_length(sender->out_q);
287     if (min_len == 0) {
288         return (sender);
289     }
290     for (i = 1; i < SEND_THREADS; i++) {
291         l = rtpp_queue_get_length(netio_cf->args[i].out_q);
292         if (l < min_len) {
293             sender = &netio_cf->args[i];
294             min_len = l;
295         }
296     }
297     return (sender);
298 }
299 
300 struct rtpp_anetio_cf *
rtpp_netio_async_init(struct cfg * cf,int qlen)301 rtpp_netio_async_init(struct cfg *cf, int qlen)
302 {
303     struct rtpp_anetio_cf *netio_cf;
304     int i, ri;
305 
306     netio_cf = rtpp_zmalloc(sizeof(*netio_cf));
307     if (netio_cf == NULL)
308         return (NULL);
309 
310     for (i = 0; i < SEND_THREADS; i++) {
311         netio_cf->args[i].out_q = rtpp_queue_init(qlen, "RTPP->NET%.2d", i);
312         if (netio_cf->args[i].out_q == NULL) {
313             for (ri = i - 1; ri >= 0; ri--) {
314                 rtpp_queue_destroy(netio_cf->args[ri].out_q);
315                 CALL_SMETHOD(netio_cf->args[ri].glog->rcnt, decref);
316             }
317             goto e0;
318         }
319         CALL_SMETHOD(cf->stable->glog->rcnt, incref);
320         netio_cf->args[i].glog = cf->stable->glog;
321         netio_cf->args[i].dmode = cf->stable->dmode;
322 #if RTPP_DEBUG_timers
323         recfilter_init(&netio_cf->args[i].average_load, 0.9, 0.0, 0);
324 #endif
325     }
326 
327     for (i = 0; i < SEND_THREADS; i++) {
328         netio_cf->args[i].sigterm = rtpp_wi_malloc_sgnl(SIGTERM, NULL, 0);
329         if (netio_cf->args[i].sigterm == NULL) {
330             for (ri = i - 1; ri >= 0; ri--) {
331                 rtpp_wi_free(netio_cf->args[ri].sigterm);
332             }
333             goto e1;
334         }
335     }
336 
337     cf->stable->rtpp_netio_cf = netio_cf;
338     for (i = 0; i < SEND_THREADS; i++) {
339         if (pthread_create(&(netio_cf->thread_id[i]), NULL, (void *(*)(void *))&rtpp_anetio_sthread, &netio_cf->args[i]) != 0) {
340              for (ri = i - 1; ri >= 0; ri--) {
341                  rtpp_queue_put_item(netio_cf->args[ri].sigterm, netio_cf->args[ri].out_q);
342                  pthread_join(netio_cf->thread_id[ri], NULL);
343              }
344              for (ri = i; ri < SEND_THREADS; ri++) {
345                  rtpp_wi_free(netio_cf->args[ri].sigterm);
346              }
347              goto e1;
348         }
349     }
350 
351     return (netio_cf);
352 
353 #if 0
354 e2:
355     for (i = 0; i < SEND_THREADS; i++) {
356         rtpp_wi_free(netio_cf->args[i].sigterm);
357     }
358 #endif
359 e1:
360     for (i = 0; i < SEND_THREADS; i++) {
361         rtpp_queue_destroy(netio_cf->args[i].out_q);
362         CALL_SMETHOD(netio_cf->args[i].glog->rcnt, decref);
363     }
364 e0:
365     free(netio_cf);
366     return (NULL);
367 }
368 
369 void
rtpp_netio_async_destroy(struct rtpp_anetio_cf * netio_cf)370 rtpp_netio_async_destroy(struct rtpp_anetio_cf *netio_cf)
371 {
372     int i;
373 
374     for (i = 0; i < SEND_THREADS; i++) {
375         rtpp_queue_put_item(netio_cf->args[i].sigterm, netio_cf->args[i].out_q);
376     }
377     for (i = 0; i < SEND_THREADS; i++) {
378         pthread_join(netio_cf->thread_id[i], NULL);
379         rtpp_queue_destroy(netio_cf->args[i].out_q);
380         CALL_SMETHOD(netio_cf->args[i].glog->rcnt, decref);
381     }
382     free(netio_cf);
383 }
384