1 /*
2 * Copyright (c) 2014 Sippy Software, Inc., http://www.sippysoft.com
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 *
14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24 * SUCH DAMAGE.
25 *
26 */
27
28 #include <sys/types.h>
29 #include <sys/socket.h>
30 #include <errno.h>
31 #include <pthread.h>
32 #include <sched.h>
33 #include <signal.h>
34 #include <stdint.h>
35 #include <stdlib.h>
36 #include <string.h>
37
38 #include "rtpp_log.h"
39 #include "rtpp_cfg_stable.h"
40 #include "rtpp_defines.h"
41 #include "rtp.h"
42 #include "rtp_packet.h"
43 #include "rtpp_wi.h"
44 #include "rtpp_wi_private.h"
45 #include "rtpp_types.h"
46 #include "rtpp_refcnt.h"
47 #include "rtpp_log_obj.h"
48 #include "rtpp_queue.h"
49 #include "rtpp_network.h"
50 #include "rtpp_netio_async.h"
51 #include "rtpp_time.h"
52 #include "rtpp_mallocs.h"
53 #include "rtpp_debug.h"
54 #ifdef RTPP_DEBUG
55 #include "rtpp_math.h"
56 #endif
57
58 struct sthread_args {
59 struct rtpp_queue *out_q;
60 struct rtpp_log *glog;
61 int dmode;
62 #if RTPP_DEBUG_timers
63 struct recfilter average_load;
64 #endif
65 struct rtpp_wi *sigterm;
66 };
67
68 #define SEND_THREADS 1
69
70 struct rtpp_anetio_cf {
71 pthread_t thread_id[SEND_THREADS];
72 struct sthread_args args[SEND_THREADS];
73 };
74
75 #define RTPP_ANETIO_MAX_RETRY 3
76
77 static void
rtpp_anetio_sthread(struct sthread_args * args)78 rtpp_anetio_sthread(struct sthread_args *args)
79 {
80 int n, nsend, i, send_errno, nretry;
81 struct rtpp_wi *wi, *wis[100];
82 #if RTPP_DEBUG_timers
83 double tp[3], runtime, sleeptime;
84 long run_n;
85
86 runtime = sleeptime = 0.0;
87 run_n = 0;
88 tp[0] = getdtime();
89 #endif
90 for (;;) {
91 nsend = rtpp_queue_get_items(args->out_q, wis, 100, 0);
92 #if RTPP_DEBUG_timers
93 tp[1] = getdtime();
94 #endif
95
96 for (i = 0; i < nsend; i++) {
97 wi = wis[i];
98 if (wi->wi_type == RTPP_WI_TYPE_SGNL) {
99 rtpp_wi_free(wi);
100 goto out;
101 }
102 nretry = 0;
103 do {
104 n = sendto(wi->sock, wi->msg, wi->msg_len, wi->flags,
105 wi->sendto, wi->tolen);
106 send_errno = (n < 0) ? errno : 0;
107 #if RTPP_DEBUG_netio >= 1
108 if (wi->debug != 0) {
109 char daddr[MAX_AP_STRBUF];
110
111 addrport2char_r(wi->sendto, daddr, sizeof(daddr), ':');
112 if (n < 0) {
113 RTPP_ELOG(wi->log, RTPP_LOG_DBUG,
114 "sendto(%d, %p, %lld, %d, %p (%s), %d) = %d",
115 wi->sock, wi->msg, (long long)wi->msg_len, wi->flags,
116 wi->sendto, daddr, wi->tolen, n);
117 } else if (n < wi->msg_len) {
118 RTPP_LOG(wi->log, RTPP_LOG_DBUG,
119 "sendto(%d, %p, %lld, %d, %p (%s), %d) = %d: short write",
120 wi->sock, wi->msg, (long long)wi->msg_len, wi->flags,
121 wi->sendto, daddr, wi->tolen, n);
122 #if RTPP_DEBUG_netio >= 2
123 } else {
124 RTPP_LOG(wi->log, RTPP_LOG_DBUG,
125 "sendto(%d, %p, %d, %d, %p (%s), %d) = %d",
126 wi->sock, wi->msg, wi->msg_len, wi->flags, wi->sendto, daddr,
127 wi->tolen, n);
128 #endif
129 }
130 }
131 #endif
132 if (n >= 0) {
133 wi->nsend--;
134 } else {
135 /* "EPERM" is Linux thing, yield and retry */
136 if ((send_errno == EPERM || send_errno == ENOBUFS)
137 && nretry < RTPP_ANETIO_MAX_RETRY) {
138 sched_yield();
139 nretry++;
140 } else {
141 break;
142 }
143 }
144 } while (wi->nsend > 0);
145 rtpp_wi_free(wi);
146 }
147 #if RTPP_DEBUG_timers
148 sleeptime += tp[1] - tp[0];
149 tp[0] = getdtime();
150 runtime += tp[0] - tp[1];
151 if ((run_n % 10000) == 0) {
152 RTPP_LOG(args->glog, RTPP_LOG_DBUG, "rtpp_anetio_sthread(%p): run %ld aload = %f filtered = %f", \
153 args, run_n, runtime / (runtime + sleeptime), args->average_load.lastval);
154 }
155 if (runtime + sleeptime > 1.0) {
156 recfilter_apply(&args->average_load, runtime / (runtime + sleeptime));
157 runtime = sleeptime = 0.0;
158 }
159 run_n += 1;
160 #endif
161 }
162 out:
163 return;
164 }
165
166 int
rtpp_anetio_sendto(struct rtpp_anetio_cf * netio_cf,int sock,const void * msg,size_t msg_len,int flags,const struct sockaddr * sendto,socklen_t tolen)167 rtpp_anetio_sendto(struct rtpp_anetio_cf *netio_cf, int sock, const void *msg, \
168 size_t msg_len, int flags, const struct sockaddr *sendto, socklen_t tolen)
169 {
170 struct rtpp_wi *wi;
171
172 wi = rtpp_wi_malloc(sock, msg, msg_len, flags, sendto, tolen);
173 if (wi == NULL) {
174 return (-1);
175 }
176 #if RTPP_DEBUG_netio >= 1
177 wi->debug = 1;
178 wi->log = netio_cf->args[0].glog;
179 CALL_SMETHOD(wi->log->rcnt, incref);
180 #if RTPP_DEBUG_netio >= 2
181 RTPP_LOG(netio_cf->args[0].glog, RTPP_LOG_DBUG, "malloc(%d, %p, %d, %d, %p, %d) = %p",
182 sock, msg, msg_len, flags, sendto, tolen, wi);
183 RTPP_LOG(netio_cf->args[0].glog, RTPP_LOG_DBUG, "sendto(%d, %p, %d, %d, %p, %d)",
184 wi->sock, wi->msg, wi->msg_len, wi->flags, wi->sendto, wi->tolen);
185 #endif
186 #endif
187 rtpp_queue_put_item(wi, netio_cf->args[0].out_q);
188 return (0);
189 }
190
191 void
rtpp_anetio_pump(struct rtpp_anetio_cf * netio_cf)192 rtpp_anetio_pump(struct rtpp_anetio_cf *netio_cf)
193 {
194
195 rtpp_queue_pump(netio_cf->args[0].out_q);
196 }
197
198 void
rtpp_anetio_pump_q(struct sthread_args * sender)199 rtpp_anetio_pump_q(struct sthread_args *sender)
200 {
201
202 rtpp_queue_pump(sender->out_q);
203 }
204
205 int
rtpp_anetio_send_pkt(struct sthread_args * sender,int sock,const struct sockaddr * sendto,socklen_t tolen,struct rtp_packet * pkt,struct rtpp_refcnt * sock_rcnt,struct rtpp_log * plog)206 rtpp_anetio_send_pkt(struct sthread_args *sender, int sock, \
207 const struct sockaddr *sendto, socklen_t tolen, struct rtp_packet *pkt,
208 struct rtpp_refcnt *sock_rcnt, struct rtpp_log *plog)
209 {
210 struct rtpp_wi *wi;
211 int nsend;
212
213 if (sender->dmode != 0 && pkt->size < LBR_THRS) {
214 nsend = 2;
215 } else {
216 nsend = 1;
217 }
218
219 wi = rtpp_wi_malloc_pkt(sock, pkt, sendto, tolen, nsend, sock_rcnt);
220 if (wi == NULL) {
221 rtp_packet_free(pkt);
222 return (-1);
223 }
224 /*
225 * rtpp_wi_malloc_pkt() consumes pkt and returns wi, so no need to
226 * call rtp_packet_free() here.
227 */
228 #if RTPP_DEBUG_netio >= 2
229 wi->debug = 1;
230 if (plog == NULL) {
231 plog = sender->glog;
232 }
233 CALL_SMETHOD(plog->rcnt, incref);
234 wi->log = plog;
235 RTPP_LOG(plog, RTPP_LOG_DBUG, "send_pkt(%d, %p, %d, %d, %p, %d)",
236 wi->sock, wi->msg, wi->msg_len, wi->flags, wi->sendto, wi->tolen);
237 #endif
238 rtpp_queue_put_item(wi, sender->out_q);
239 return (0);
240 }
241
242 int
rtpp_anetio_send_pkt_na(struct sthread_args * sender,int sock,struct rtpp_netaddr * sendto,struct rtp_packet * pkt,struct rtpp_refcnt * sock_rcnt,struct rtpp_log * plog)243 rtpp_anetio_send_pkt_na(struct sthread_args *sender, int sock, \
244 struct rtpp_netaddr *sendto, struct rtp_packet *pkt,
245 struct rtpp_refcnt *sock_rcnt, struct rtpp_log *plog)
246 {
247 struct rtpp_wi *wi;
248 int nsend;
249
250 if (sender->dmode != 0 && pkt->size < LBR_THRS) {
251 nsend = 2;
252 } else {
253 nsend = 1;
254 }
255
256 wi = rtpp_wi_malloc_pkt_na(sock, pkt, sendto, nsend, sock_rcnt);
257 if (wi == NULL) {
258 rtp_packet_free(pkt);
259 return (-1);
260 }
261 /*
262 * rtpp_wi_malloc_pkt() consumes pkt and returns wi, so no need to
263 * call rtp_packet_free() here.
264 */
265 #if RTPP_DEBUG_netio >= 2
266 wi->debug = 1;
267 if (plog == NULL) {
268 plog = sender->glog;
269 }
270 CALL_SMETHOD(plog->rcnt, incref);
271 wi->log = plog;
272 RTPP_LOG(plog, RTPP_LOG_DBUG, "send_pkt(%d, %p, %d, %d, %p, %d)",
273 wi->sock, wi->msg, wi->msg_len, wi->flags, wi->sendto, wi->tolen);
274 #endif
275 rtpp_queue_put_item(wi, sender->out_q);
276 return (0);
277 }
278
279 struct sthread_args *
rtpp_anetio_pick_sender(struct rtpp_anetio_cf * netio_cf)280 rtpp_anetio_pick_sender(struct rtpp_anetio_cf *netio_cf)
281 {
282 int min_len, i, l;
283 struct sthread_args *sender;
284
285 sender = &netio_cf->args[0];
286 min_len = rtpp_queue_get_length(sender->out_q);
287 if (min_len == 0) {
288 return (sender);
289 }
290 for (i = 1; i < SEND_THREADS; i++) {
291 l = rtpp_queue_get_length(netio_cf->args[i].out_q);
292 if (l < min_len) {
293 sender = &netio_cf->args[i];
294 min_len = l;
295 }
296 }
297 return (sender);
298 }
299
300 struct rtpp_anetio_cf *
rtpp_netio_async_init(struct cfg * cf,int qlen)301 rtpp_netio_async_init(struct cfg *cf, int qlen)
302 {
303 struct rtpp_anetio_cf *netio_cf;
304 int i, ri;
305
306 netio_cf = rtpp_zmalloc(sizeof(*netio_cf));
307 if (netio_cf == NULL)
308 return (NULL);
309
310 for (i = 0; i < SEND_THREADS; i++) {
311 netio_cf->args[i].out_q = rtpp_queue_init(qlen, "RTPP->NET%.2d", i);
312 if (netio_cf->args[i].out_q == NULL) {
313 for (ri = i - 1; ri >= 0; ri--) {
314 rtpp_queue_destroy(netio_cf->args[ri].out_q);
315 CALL_SMETHOD(netio_cf->args[ri].glog->rcnt, decref);
316 }
317 goto e0;
318 }
319 CALL_SMETHOD(cf->stable->glog->rcnt, incref);
320 netio_cf->args[i].glog = cf->stable->glog;
321 netio_cf->args[i].dmode = cf->stable->dmode;
322 #if RTPP_DEBUG_timers
323 recfilter_init(&netio_cf->args[i].average_load, 0.9, 0.0, 0);
324 #endif
325 }
326
327 for (i = 0; i < SEND_THREADS; i++) {
328 netio_cf->args[i].sigterm = rtpp_wi_malloc_sgnl(SIGTERM, NULL, 0);
329 if (netio_cf->args[i].sigterm == NULL) {
330 for (ri = i - 1; ri >= 0; ri--) {
331 rtpp_wi_free(netio_cf->args[ri].sigterm);
332 }
333 goto e1;
334 }
335 }
336
337 cf->stable->rtpp_netio_cf = netio_cf;
338 for (i = 0; i < SEND_THREADS; i++) {
339 if (pthread_create(&(netio_cf->thread_id[i]), NULL, (void *(*)(void *))&rtpp_anetio_sthread, &netio_cf->args[i]) != 0) {
340 for (ri = i - 1; ri >= 0; ri--) {
341 rtpp_queue_put_item(netio_cf->args[ri].sigterm, netio_cf->args[ri].out_q);
342 pthread_join(netio_cf->thread_id[ri], NULL);
343 }
344 for (ri = i; ri < SEND_THREADS; ri++) {
345 rtpp_wi_free(netio_cf->args[ri].sigterm);
346 }
347 goto e1;
348 }
349 }
350
351 return (netio_cf);
352
353 #if 0
354 e2:
355 for (i = 0; i < SEND_THREADS; i++) {
356 rtpp_wi_free(netio_cf->args[i].sigterm);
357 }
358 #endif
359 e1:
360 for (i = 0; i < SEND_THREADS; i++) {
361 rtpp_queue_destroy(netio_cf->args[i].out_q);
362 CALL_SMETHOD(netio_cf->args[i].glog->rcnt, decref);
363 }
364 e0:
365 free(netio_cf);
366 return (NULL);
367 }
368
369 void
rtpp_netio_async_destroy(struct rtpp_anetio_cf * netio_cf)370 rtpp_netio_async_destroy(struct rtpp_anetio_cf *netio_cf)
371 {
372 int i;
373
374 for (i = 0; i < SEND_THREADS; i++) {
375 rtpp_queue_put_item(netio_cf->args[i].sigterm, netio_cf->args[i].out_q);
376 }
377 for (i = 0; i < SEND_THREADS; i++) {
378 pthread_join(netio_cf->thread_id[i], NULL);
379 rtpp_queue_destroy(netio_cf->args[i].out_q);
380 CALL_SMETHOD(netio_cf->args[i].glog->rcnt, decref);
381 }
382 free(netio_cf);
383 }
384