1 /*
2 * tvheadend, TCP common functions
3 * Copyright (C) 2007 Andreas Öman
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 #include <pthread.h>
20 #include <netdb.h>
21 #include <sys/types.h>
22 #include <sys/socket.h>
23 #include <poll.h>
24 #include <assert.h>
25 #include <stdio.h>
26 #include <unistd.h>
27 #include <stdlib.h>
28 #include <string.h>
29 #include <stdarg.h>
30 #include <fcntl.h>
31 #include <errno.h>
32 #include <signal.h>
33 #include <netinet/in.h>
34 #include <netinet/ip.h>
35 #include <netinet/tcp.h>
36 #include <arpa/inet.h>
37
38 #include "tvheadend.h"
39 #include "tcp.h"
40 #include "tvhpoll.h"
41 #include "notify.h"
42 #include "access.h"
43 #include "dvr/dvr.h"
44 #define COMPAT_IPTOS
45 #include "compat.h"
46
47 #if ENABLE_LIBSYSTEMD_DAEMON
48 #include <systemd/sd-daemon.h>
49 #endif
50
51 int tcp_preferred_address_family = AF_INET;
52 int tcp_server_running;
53 th_pipe_t tcp_server_pipe;
54
55 /**
56 *
57 */
58 int
socket_set_dscp(int sockfd,uint32_t dscp,char * errbuf,size_t errbufsize)59 socket_set_dscp(int sockfd, uint32_t dscp, char *errbuf, size_t errbufsize)
60 {
61 int r, v;
62
63 v = dscp & IPTOS_DSCP_MASK;
64 r = setsockopt(sockfd, IPPROTO_IP, IP_TOS, &v, sizeof(v));
65 if (r < 0) {
66 if (errbuf && errbufsize)
67 snprintf(errbuf, errbufsize, "IP_TOS failed: %s", strerror(errno));
68 return -1;
69 }
70 return 0;
71 }
72
73 /**
74 *
75 */
76 int
tcp_connect(const char * hostname,int port,const char * bindaddr,char * errbuf,size_t errbufsize,int timeout)77 tcp_connect(const char *hostname, int port, const char *bindaddr,
78 char *errbuf, size_t errbufsize, int timeout)
79 {
80 int fd = -1, r, res, err;
81 struct addrinfo *ai, *rai = NULL, hints;
82 struct sockaddr_storage bindip;
83 char portstr[6];
84 socklen_t errlen = sizeof(err);
85
86 errbuf[0] = '\0';
87
88 memset(&bindip, 0, sizeof(bindip));
89 bindip.ss_family = AF_UNSPEC;
90 if (bindaddr && bindaddr[0] != '\0') {
91 if (tcp_get_ip_from_str(bindaddr, &bindip) == NULL) {
92 snprintf(errbuf, errbufsize, "Cannot bind to addr '%s'", bindaddr);
93 return -1;
94 }
95 }
96
97 snprintf(portstr, 6, "%u", port);
98 memset(&hints, 0, sizeof(struct addrinfo));
99 hints.ai_family = AF_UNSPEC;
100 res = getaddrinfo(hostname, portstr, &hints, &ai);
101 if (res != 0) {
102 snprintf(errbuf, errbufsize, "%s", gai_strerror(res));
103 return -1;
104 }
105
106 again:
107 if (fd >= 0) {
108 close(fd);
109 fd = -1;
110 }
111 rai = rai == NULL ? ai : rai->ai_next;
112 if (rai == NULL) {
113 if (errbuf[0] == '\0')
114 snprintf(errbuf, errbufsize, "Invalid or unresolved hostname '%s'", hostname);
115 goto error;
116 }
117
118 if (bindip.ss_family == AF_UNSPEC) {
119 if (rai->ai_family != AF_INET && rai->ai_family != AF_INET6)
120 goto again;
121 } else if (rai->ai_family != bindip.ss_family) {
122 goto again;
123 }
124
125 fd = tvh_socket(rai->ai_family, SOCK_STREAM, 0);
126 if(fd < 0) {
127 snprintf(errbuf, errbufsize, "Unable to create socket: %s",
128 strerror(errno));
129 goto again;
130 }
131
132 /**
133 * Switch to nonblocking
134 */
135 fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK);
136
137 if (bindip.ss_family != AF_UNSPEC) {
138 if (bind(fd, (struct sockaddr *)&bindip, IP_IN_ADDRLEN(bindip)) < 0) {
139 snprintf(errbuf, errbufsize, "Cannot bind to IPv%s addr '%s:%i': %s",
140 bindip.ss_family == AF_INET6 ? "6" : "4",
141 bindaddr, htons(IP_PORT(bindip)),
142 strerror(errno));
143 goto error;
144 }
145 }
146
147 r = connect(fd, rai->ai_addr, rai->ai_addrlen);
148
149 if(r < 0) {
150 /* timeout < 0 - do not wait at all */
151 if(errno == EINPROGRESS && timeout < 0) {
152 err = 0;
153 } else if(errno == EINPROGRESS) {
154 tvhpoll_event_t ev;
155 tvhpoll_t *efd;
156
157 efd = tvhpoll_create(1);
158 memset(&ev, 0, sizeof(ev));
159 ev.events = TVHPOLL_OUT;
160 ev.fd = fd;
161 ev.data.ptr = &fd;
162 tvhpoll_add(efd, &ev, 1);
163
164 /* minimal timeout is one second */
165 if (timeout < 1)
166 timeout = 1;
167
168 while (1) {
169 if (!tvheadend_is_running()) {
170 errbuf[0] = '\0';
171 tvhpoll_destroy(efd);
172 goto error;
173 }
174
175 r = tvhpoll_wait(efd, &ev, 1, timeout * 1000);
176 if (r > 0)
177 break;
178
179 if (r == 0) { /* Timeout */
180 snprintf(errbuf, errbufsize, "Connection attempt to '%s' timed out", hostname);
181 tvhpoll_destroy(efd);
182 goto again;
183 }
184
185 if (!ERRNO_AGAIN(errno)) {
186 snprintf(errbuf, errbufsize, "poll() error: %s", strerror(errno));
187 tvhpoll_destroy(efd);
188 goto error;
189 }
190 }
191
192 tvhpoll_destroy(efd);
193 getsockopt(fd, SOL_SOCKET, SO_ERROR, (void *)&err, &errlen);
194 } else {
195 err = errno;
196 }
197 } else {
198 err = 0;
199 }
200
201 if(err != 0) {
202 snprintf(errbuf, errbufsize, "%s", strerror(err));
203 goto again;
204 }
205
206 fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) & ~O_NONBLOCK);
207
208 /* Set the keep-alive active */
209 err = 1;
210 setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, (void *)&err, errlen);
211
212 freeaddrinfo(ai);
213 return fd;
214
215 error:
216 if (fd >= 0)
217 close(fd);
218 freeaddrinfo(ai);
219 return -1;
220 }
221
222
223 /**
224 *
225 */
226 int
tcp_write_queue(int fd,htsbuf_queue_t * q)227 tcp_write_queue(int fd, htsbuf_queue_t *q)
228 {
229 htsbuf_data_t *hd;
230 int l, r = 0;
231 void *p;
232
233 while((hd = TAILQ_FIRST(&q->hq_q)) != NULL) {
234 if (!r) {
235 l = hd->hd_data_len - hd->hd_data_off;
236 p = hd->hd_data + hd->hd_data_off;
237 r = tvh_write(fd, p, l);
238 }
239 htsbuf_data_free(q, hd);
240 }
241 q->hq_size = 0;
242 return r;
243 }
244
245
246 /**
247 *
248 */
249 static int
tcp_fill_htsbuf_from_fd(int fd,htsbuf_queue_t * hq)250 tcp_fill_htsbuf_from_fd(int fd, htsbuf_queue_t *hq)
251 {
252 htsbuf_data_t *hd = TAILQ_LAST(&hq->hq_q, htsbuf_data_queue);
253 int c, r;
254
255 if(hd != NULL) {
256 /* Fill out any previous buffer */
257 c = hd->hd_data_size - hd->hd_data_len;
258
259 if(c > 0) {
260
261 do {
262 r = read(fd, hd->hd_data + hd->hd_data_len, c);
263 } while (r < 0 && ERRNO_AGAIN(errno));
264 if(r < 1)
265 return -1;
266
267 hd->hd_data_len += r;
268 hq->hq_size += r;
269 return 0;
270 }
271 }
272
273 hd = malloc(sizeof(htsbuf_data_t));
274
275 hd->hd_data_size = 1000;
276 hd->hd_data = malloc(hd->hd_data_size);
277
278 do {
279 r = read(fd, hd->hd_data, hd->hd_data_size);
280 } while (r < 0 && ERRNO_AGAIN(errno));
281 if(r < 1) {
282 free(hd->hd_data);
283 free(hd);
284 return -1;
285 }
286 hd->hd_data_len = r;
287 hd->hd_data_off = 0;
288 TAILQ_INSERT_TAIL(&hq->hq_q, hd, hd_link);
289 hq->hq_size += r;
290 return 0;
291 }
292
293
294 /**
295 *
296 */
297 char *
tcp_read_line(int fd,htsbuf_queue_t * spill)298 tcp_read_line(int fd, htsbuf_queue_t *spill)
299 {
300 int len;
301 char *buf;
302
303 do {
304 len = htsbuf_find(spill, 0xa);
305
306 if(len == -1) {
307 if(tcp_fill_htsbuf_from_fd(fd, spill) < 0)
308 return NULL;
309 }
310 } while (len == -1);
311
312 buf = malloc(len+1);
313
314 htsbuf_read(spill, buf, len);
315 buf[len] = 0;
316 while(len > 0 && buf[len - 1] < 32)
317 buf[--len] = 0;
318 htsbuf_drop(spill, 1); /* Drop the \n */
319 return buf;
320 }
321
322
323
324 /**
325 *
326 */
327 int
tcp_read_data(int fd,char * buf,const size_t bufsize,htsbuf_queue_t * spill)328 tcp_read_data(int fd, char *buf, const size_t bufsize, htsbuf_queue_t *spill)
329 {
330 int x, tot = htsbuf_read(spill, buf, bufsize);
331
332 if(tot == bufsize)
333 return 0;
334
335 x = recv(fd, buf + tot, bufsize - tot, MSG_WAITALL);
336 if(x != bufsize - tot)
337 return -1;
338
339 return 0;
340 }
341
342 /**
343 *
344 */
345 int
tcp_read(int fd,void * buf,size_t len)346 tcp_read(int fd, void *buf, size_t len)
347 {
348 int x = recv(fd, buf, len, MSG_WAITALL);
349
350 if(x == -1)
351 return errno;
352 if(x != len)
353 return ECONNRESET;
354 return 0;
355
356 }
357
358 /**
359 *
360 */
361 int
tcp_read_timeout(int fd,void * buf,size_t len,int timeout)362 tcp_read_timeout(int fd, void *buf, size_t len, int timeout)
363 {
364 int x, tot = 0;
365 struct pollfd fds;
366
367 assert(timeout > 0);
368
369 fds.fd = fd;
370 fds.events = POLLIN;
371 fds.revents = 0;
372
373 while(tot != len) {
374
375 x = poll(&fds, 1, timeout);
376 if(x == 0)
377 return ETIMEDOUT;
378 if(x == -1) {
379 if (!tvheadend_is_running())
380 return ECONNRESET;
381 if (ERRNO_AGAIN(errno))
382 continue;
383 return errno;
384 }
385
386 x = recv(fd, buf + tot, len - tot, MSG_DONTWAIT);
387 if(x == -1) {
388 if(ERRNO_AGAIN(errno))
389 continue;
390 return errno;
391 }
392
393 if(x == 0)
394 return ECONNRESET;
395
396 tot += x;
397 }
398
399 return 0;
400
401 }
402
403 /**
404 *
405 */
406 int
tcp_socket_dead(int fd)407 tcp_socket_dead(int fd)
408 {
409 int err = 0;
410 socklen_t errlen = sizeof(err);
411
412 if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &errlen))
413 return -errno;
414 if (err)
415 return -err;
416 #ifdef PLATFORM_FREEBSD
417 if (recv(fd, NULL, 0, MSG_PEEK | MSG_DONTWAIT) < 0)
418 return -errno;
419 #else
420 if (recv(fd, NULL, 0, MSG_PEEK | MSG_DONTWAIT) == 0)
421 return -EIO;
422 #endif
423 return 0;
424 }
425
426 /**
427 *
428 */
429 char *
tcp_get_str_from_ip(const struct sockaddr_storage * sa,char * dst,size_t maxlen)430 tcp_get_str_from_ip(const struct sockaddr_storage *sa, char *dst, size_t maxlen)
431 {
432 if (sa == NULL || dst == NULL)
433 return NULL;
434
435 switch(sa->ss_family)
436 {
437 case AF_INET:
438 inet_ntop(AF_INET, &(((struct sockaddr_in*)sa)->sin_addr), dst, maxlen);
439 break;
440 case AF_INET6:
441 inet_ntop(AF_INET6, &(((struct sockaddr_in6*)sa)->sin6_addr), dst, maxlen);
442 break;
443 default:
444 strlcpy(dst, "Unknown AF", maxlen);
445 return NULL;
446 }
447
448 return dst;
449 }
450
451 /**
452 *
453 */
454 struct sockaddr_storage *
tcp_get_ip_from_str(const char * src,struct sockaddr_storage * sa)455 tcp_get_ip_from_str(const char *src, struct sockaddr_storage *sa)
456 {
457 if (sa == NULL || src == NULL)
458 return NULL;
459
460 if (strstr(src, ":")) {
461 sa->ss_family = AF_INET6;
462 if (inet_pton(AF_INET6, src, &(((struct sockaddr_in6*)sa)->sin6_addr)) != 1)
463 return NULL;
464 } else if (strstr(src, ".")) {
465 sa->ss_family = AF_INET;
466 if (inet_pton(AF_INET, src, &(((struct sockaddr_in*)sa)->sin_addr)) != 1)
467 return NULL;
468 } else {
469 return NULL;
470 }
471
472 return sa;
473 }
474
475 /**
476 *
477 */
478 static tvhpoll_t *tcp_server_poll;
479 static uint32_t tcp_server_launch_id;
480
481 typedef struct tcp_server {
482 int serverfd;
483 struct sockaddr_storage bound;
484 tcp_server_ops_t ops;
485 void *opaque;
486 LIST_ENTRY(tcp_server) link;
487 } tcp_server_t;
488
489 typedef struct tcp_server_launch {
490 pthread_t tid;
491 uint32_t id;
492 int fd;
493 tcp_server_ops_t ops;
494 void *opaque;
495 char *representative;
496 void (*status) (void *opaque, htsmsg_t *m);
497 struct sockaddr_storage peer;
498 struct sockaddr_storage self;
499 time_t started;
500 LIST_ENTRY(tcp_server_launch) link;
501 LIST_ENTRY(tcp_server_launch) alink;
502 LIST_ENTRY(tcp_server_launch) jlink;
503 } tcp_server_launch_t;
504
505 static LIST_HEAD(, tcp_server) tcp_server_delete_list = { 0 };
506 static LIST_HEAD(, tcp_server_launch) tcp_server_launches = { 0 };
507 static LIST_HEAD(, tcp_server_launch) tcp_server_active = { 0 };
508 static LIST_HEAD(, tcp_server_launch) tcp_server_join = { 0 };
509
510 /**
511 *
512 */
513 uint32_t
tcp_connection_count(access_t * aa)514 tcp_connection_count(access_t *aa)
515 {
516 tcp_server_launch_t *tsl;
517 uint32_t used = 0;
518
519 lock_assert(&global_lock);
520
521 if (aa == NULL)
522 return 0;
523
524 LIST_FOREACH(tsl, &tcp_server_active, alink)
525 if (!strcmp(aa->aa_representative ?: "", tsl->representative ?: ""))
526 used++;
527 return used;
528 }
529
530 /**
531 *
532 */
533 void *
tcp_connection_launch(int fd,void (* status)(void * opaque,htsmsg_t * m),access_t * aa)534 tcp_connection_launch
535 (int fd, void (*status) (void *opaque, htsmsg_t *m), access_t *aa)
536 {
537 tcp_server_launch_t *tsl, *res;
538 uint32_t used, used2;
539 int64_t started = mclk();
540 int c1, c2;
541
542 lock_assert(&global_lock);
543
544 assert(status);
545
546 if (aa == NULL)
547 return NULL;
548
549 try_again:
550 res = NULL;
551 used = 0;
552 LIST_FOREACH(tsl, &tcp_server_active, alink) {
553 if (tsl->fd == fd) {
554 res = tsl;
555 if (!aa->aa_conn_limit && !aa->aa_conn_limit_streaming)
556 break;
557 continue;
558 }
559 if (!strcmp(aa->aa_representative ?: "", tsl->representative ?: ""))
560 used++;
561 }
562 if (res == NULL)
563 return NULL;
564
565 if (aa->aa_conn_limit || aa->aa_conn_limit_streaming) {
566 used2 = aa->aa_conn_limit ? dvr_usage_count(aa) : 0;
567 /* the rule is: allow if one condition is OK */
568 c1 = aa->aa_conn_limit ? used + used2 >= aa->aa_conn_limit : -1;
569 c2 = aa->aa_conn_limit_streaming ? used >= aa->aa_conn_limit_streaming : -1;
570
571 if (c1 && c2) {
572 if (started + sec2mono(3) < mclk()) {
573 tvherror(LS_TCP, "multiple connections are not allowed for user '%s' from '%s' "
574 "(limit %u, streaming limit %u, active streaming %u, DVR %u)",
575 aa->aa_username ?: "", aa->aa_representative ?: "",
576 aa->aa_conn_limit, aa->aa_conn_limit_streaming,
577 used, used2);
578 return NULL;
579 }
580 pthread_mutex_unlock(&global_lock);
581 tvh_safe_usleep(250000);
582 pthread_mutex_lock(&global_lock);
583 if (tvheadend_is_running())
584 goto try_again;
585 return NULL;
586 }
587 }
588
589 res->representative = aa->aa_representative ? strdup(aa->aa_representative) : NULL;
590 res->status = status;
591 LIST_INSERT_HEAD(&tcp_server_launches, res, link);
592 notify_reload("connections");
593 return res;
594 }
595
596 /**
597 *
598 */
599 void
tcp_connection_land(void * tcp_id)600 tcp_connection_land(void *tcp_id)
601 {
602 tcp_server_launch_t *tsl = tcp_id;
603
604 lock_assert(&global_lock);
605
606 if (tsl == NULL)
607 return;
608
609 LIST_REMOVE(tsl, link);
610 notify_reload("connections");
611
612 free(tsl->representative);
613 tsl->representative = NULL;
614 }
615
616 /**
617 *
618 */
619 void
tcp_connection_cancel(uint32_t id)620 tcp_connection_cancel(uint32_t id)
621 {
622 tcp_server_launch_t *tsl;
623
624 lock_assert(&global_lock);
625
626 LIST_FOREACH(tsl, &tcp_server_active, alink)
627 if (tsl->id == id) {
628 if (tsl->ops.cancel)
629 tsl->ops.cancel(tsl->opaque);
630 break;
631 }
632 }
633
634 /*
635 *
636 */
637 static void *
tcp_server_start(void * aux)638 tcp_server_start(void *aux)
639 {
640 tcp_server_launch_t *tsl = aux;
641 struct timeval to;
642 int val;
643 char c = 'J';
644
645 val = 1;
646 setsockopt(tsl->fd, SOL_SOCKET, SO_KEEPALIVE, &val, sizeof(val));
647
648 #ifdef TCP_KEEPIDLE
649 val = 30;
650 setsockopt(tsl->fd, IPPROTO_TCP, TCP_KEEPIDLE, &val, sizeof(val));
651 #endif
652
653 #ifdef TCP_KEEPINVL
654 val = 15;
655 setsockopt(tsl->fd, IPPROTO_TCP, TCP_KEEPINTVL, &val, sizeof(val));
656 #endif
657
658 #ifdef TCP_KEEPCNT
659 val = 5;
660 setsockopt(tsl->fd, IPPROTO_TCP, TCP_KEEPCNT, &val, sizeof(val));
661 #endif
662
663 val = 1;
664 setsockopt(tsl->fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val));
665
666 to.tv_sec = 30;
667 to.tv_usec = 0;
668 setsockopt(tsl->fd, SOL_SOCKET, SO_SNDTIMEO, &to, sizeof(to));
669
670 /* Start */
671 time(&tsl->started);
672 pthread_mutex_lock(&global_lock);
673 tsl->id = ++tcp_server_launch_id;
674 if (!tsl->id) tsl->id = ++tcp_server_launch_id;
675 tsl->ops.start(tsl->fd, &tsl->opaque, &tsl->peer, &tsl->self);
676
677 /* Stop */
678 if (tsl->ops.stop) tsl->ops.stop(tsl->opaque);
679 LIST_REMOVE(tsl, alink);
680 LIST_INSERT_HEAD(&tcp_server_join, tsl, jlink);
681 pthread_mutex_unlock(&global_lock);
682 if (atomic_get(&tcp_server_running))
683 tvh_write(tcp_server_pipe.wr, &c, 1);
684 return NULL;
685 }
686
687
688 /**
689 *
690 */
691 static void *
tcp_server_loop(void * aux)692 tcp_server_loop(void *aux)
693 {
694 int r;
695 tvhpoll_event_t ev;
696 tcp_server_t *ts;
697 tcp_server_launch_t *tsl;
698 socklen_t slen;
699 char c;
700
701 while(atomic_get(&tcp_server_running)) {
702 r = tvhpoll_wait(tcp_server_poll, &ev, 1, -1);
703 if(r < 0) {
704 if (ERRNO_AGAIN(errno))
705 continue;
706 tvherror(LS_TCP, "tcp_server_loop: tvhpoll_wait: %s", strerror(errno));
707 continue;
708 }
709
710 if (r == 0) continue;
711
712 if (ev.data.ptr == &tcp_server_pipe) {
713 r = read(tcp_server_pipe.rd, &c, 1);
714 if (r > 0) {
715 next:
716 pthread_mutex_lock(&global_lock);
717 while ((tsl = LIST_FIRST(&tcp_server_join)) != NULL) {
718 LIST_REMOVE(tsl, jlink);
719 pthread_mutex_unlock(&global_lock);
720 pthread_join(tsl->tid, NULL);
721 free(tsl);
722 goto next;
723 }
724 while ((ts = LIST_FIRST(&tcp_server_delete_list)) != NULL) {
725 LIST_REMOVE(ts, link);
726 free(ts);
727 }
728 pthread_mutex_unlock(&global_lock);
729 }
730 continue;
731 }
732
733 ts = ev.data.ptr;
734
735 if(ev.events & TVHPOLL_HUP) {
736 close(ts->serverfd);
737 free(ts);
738 continue;
739 }
740
741 if(ev.events & TVHPOLL_IN) {
742 tsl = malloc(sizeof(tcp_server_launch_t));
743 tsl->ops = ts->ops;
744 tsl->opaque = ts->opaque;
745 tsl->status = NULL;
746 tsl->representative = NULL;
747 slen = sizeof(struct sockaddr_storage);
748
749 tsl->fd = accept(ts->serverfd,
750 (struct sockaddr *)&tsl->peer, &slen);
751 if(tsl->fd == -1) {
752 perror("accept");
753 free(tsl);
754 sleep(1);
755 continue;
756 }
757
758 slen = sizeof(struct sockaddr_storage);
759 if(getsockname(tsl->fd, (struct sockaddr *)&tsl->self, &slen)) {
760 close(tsl->fd);
761 free(tsl);
762 continue;
763 }
764
765 pthread_mutex_lock(&global_lock);
766 LIST_INSERT_HEAD(&tcp_server_active, tsl, alink);
767 pthread_mutex_unlock(&global_lock);
768 tvhthread_create(&tsl->tid, NULL, tcp_server_start, tsl, "tcp-start");
769 }
770 }
771 tvhtrace(LS_TCP, "server thread finished");
772 return NULL;
773 }
774
775
776
777 /**
778 *
779 */
780 #if ENABLE_LIBSYSTEMD_DAEMON
tcp_server_create_new(int subsystem,const char * name,const char * bindaddr,int port,tcp_server_ops_t * ops,void * opaque)781 static void *tcp_server_create_new
782 #else
783 void *tcp_server_create
784 #endif
785 (int subsystem, const char *name, const char *bindaddr,
786 int port, tcp_server_ops_t *ops, void *opaque)
787 {
788 int fd, x;
789 tcp_server_t *ts;
790 struct addrinfo hints, *res, *ressave, *use = NULL;
791 struct sockaddr_storage bound;
792 char port_buf[6], buf[50];
793 int one = 1;
794 int zero = 0;
795
796 snprintf(port_buf, 6, "%d", port);
797
798 memset(&hints, 0, sizeof(struct addrinfo));
799 hints.ai_flags = AI_PASSIVE | AI_NUMERICSERV;
800 if (bindaddr != NULL)
801 hints.ai_flags |= AI_NUMERICHOST;
802 hints.ai_family = AF_UNSPEC;
803 hints.ai_socktype = SOCK_STREAM;
804
805 x = getaddrinfo(bindaddr, port_buf, &hints, &res);
806
807 if(x != 0) {
808 tvherror(LS_TCP, "getaddrinfo: %s: %s", bindaddr != NULL ? bindaddr : "*",
809 x == EAI_SYSTEM ? strerror(errno) : gai_strerror(x));
810 return NULL;
811 }
812
813 ressave = res;
814 while(res) {
815 if(res->ai_family == tcp_preferred_address_family) {
816 use = res;
817 break;
818 } else if(use == NULL) {
819 use = res;
820 }
821 res = res->ai_next;
822 }
823
824 fd = tvh_socket(use->ai_family, use->ai_socktype, use->ai_protocol);
825 if(fd == -1) {
826 freeaddrinfo(ressave);
827 return NULL;
828 }
829
830 if(use->ai_family == AF_INET6)
831 setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &zero, sizeof(int));
832
833 setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(int));
834
835 assert(use->ai_addrlen <= sizeof(bound));
836 memset(&bound, 0, sizeof(bound));
837 memcpy(&bound, use->ai_addr, use->ai_addrlen);
838
839 x = bind(fd, use->ai_addr, use->ai_addrlen);
840 freeaddrinfo(ressave);
841
842 if(x != 0)
843 {
844 tvherror(LS_TCP, "bind: %s:%i: %s", bindaddr != NULL ? bindaddr : "*", port, strerror(errno));
845 close(fd);
846 return NULL;
847 }
848
849 listen(fd, 511);
850
851 ts = malloc(sizeof(tcp_server_t));
852 ts->serverfd = fd;
853 ts->bound = bound;
854 ts->ops = *ops;
855 ts->opaque = opaque;
856
857 tcp_get_str_from_ip(&bound, buf, sizeof(buf));
858 tvhinfo(subsystem, "Starting %s server %s:%d", name, buf, htons(IP_PORT(bound)));
859
860 return ts;
861 }
862
863 #if ENABLE_LIBSYSTEMD_DAEMON
864 /**
865 *
866 */
867 void *
tcp_server_create(int subsystem,const char * name,const char * bindaddr,int port,tcp_server_ops_t * ops,void * opaque)868 tcp_server_create
869 (int subsystem, const char *name, const char *bindaddr,
870 int port, tcp_server_ops_t *ops, void *opaque)
871 {
872 int sd_fds_num, i, fd;
873 struct sockaddr_storage bound;
874 tcp_server_t *ts;
875 struct in_addr addr4;
876 struct in6_addr addr6;
877 char buf[50];
878 int found = 0;
879
880 sd_fds_num = sd_listen_fds(0);
881 inet_pton(AF_INET, bindaddr ?: "0.0.0.0", &addr4);
882 inet_pton(AF_INET6, bindaddr ?: "::", &addr6);
883
884 for (i = 0; i < sd_fds_num && !found; i++) {
885 struct sockaddr_in *s_addr4;
886 struct sockaddr_in6 *s_addr6;
887 socklen_t s_len;
888
889 /* Check which of the systemd-managed descriptors
890 * corresponds to the requested server (if any) */
891 fd = SD_LISTEN_FDS_START + i;
892 memset(&bound, 0, sizeof(bound));
893 s_len = sizeof(bound);
894 if (getsockname(fd, (struct sockaddr *) &bound, &s_len) != 0) {
895 tvherror(LS_TCP, "getsockname failed: %s", strerror(errno));
896 continue;
897 }
898 switch (bound.ss_family) {
899 case AF_INET:
900 s_addr4 = (struct sockaddr_in *) &bound;
901 if (addr4.s_addr == s_addr4->sin_addr.s_addr
902 && htons(port) == s_addr4->sin_port)
903 found = 1;
904 break;
905 case AF_INET6:
906 s_addr6 = (struct sockaddr_in6 *) &bound;
907 if (memcmp(addr6.s6_addr, s_addr6->sin6_addr.s6_addr, 16) == 0
908 && htons(port) == s_addr6->sin6_port)
909 found = 1;
910 break;
911 default:
912 break;
913 }
914 }
915
916 if (found) {
917 /* use the systemd provided socket */
918 ts = malloc(sizeof(tcp_server_t));
919 ts->serverfd = fd;
920 ts->bound = bound;
921 ts->ops = *ops;
922 ts->opaque = opaque;
923 tcp_get_str_from_ip(&bound, buf, sizeof(buf));
924 tvhinfo(subsystem, "Starting %s server %s:%d (systemd)", name, buf, htons(IP_PORT(bound)));
925 } else {
926 /* no systemd-managed socket found, create a new one */
927 tvhinfo(LS_TCP, "No systemd socket: creating a new one");
928 ts = tcp_server_create_new(subsystem, name, bindaddr, port, ops, opaque);
929 }
930
931 return ts;
932 }
933 #endif
934
935 /**
936 *
937 */
tcp_server_register(void * server)938 void tcp_server_register(void *server)
939 {
940 tcp_server_t *ts = server;
941 tvhpoll_event_t ev;
942
943 if (ts == NULL)
944 return;
945
946 memset(&ev, 0, sizeof(ev));
947
948 ev.fd = ts->serverfd;
949 ev.events = TVHPOLL_IN;
950 ev.data.ptr = ts;
951 tvhpoll_add(tcp_server_poll, &ev, 1);
952 }
953
954 /**
955 *
956 */
957 void
tcp_server_delete(void * server)958 tcp_server_delete(void *server)
959 {
960 tcp_server_t *ts = server;
961 tvhpoll_event_t ev;
962 char c = 'D';
963
964 if (server == NULL)
965 return;
966
967 memset(&ev, 0, sizeof(ev));
968 ev.fd = ts->serverfd;
969 ev.events = TVHPOLL_IN;
970 ev.data.ptr = ts;
971 tvhpoll_rem(tcp_server_poll, &ev, 1);
972 close(ts->serverfd);
973 ts->serverfd = -1;
974 LIST_INSERT_HEAD(&tcp_server_delete_list, ts, link);
975 tvh_write(tcp_server_pipe.wr, &c, 1);
976 }
977
978 /**
979 *
980 */
981 int
tcp_default_ip_addr(struct sockaddr_storage * deflt,int family)982 tcp_default_ip_addr ( struct sockaddr_storage *deflt, int family )
983 {
984
985 struct sockaddr_storage ss;
986 socklen_t ss_len;
987 int sock;
988
989 memset(&ss, 0, sizeof(ss));
990 ss.ss_family = family == PF_UNSPEC ? tcp_preferred_address_family : family;
991 if (inet_pton(ss.ss_family,
992 ss.ss_family == AF_INET ?
993 /* Google name servers */
994 "8.8.8.8" : "2001:4860:4860::8888",
995 IP_IN_ADDR(ss)) <= 0)
996 return -1;
997
998 IP_PORT_SET(ss, htons(53));
999
1000 sock = tvh_socket(ss.ss_family, SOCK_STREAM, 0);
1001 if (sock < 0)
1002 return -1;
1003
1004 if (connect(sock, (struct sockaddr *)&ss, IP_IN_ADDRLEN(ss)) < 0) {
1005 close(sock);
1006 return -1;
1007 }
1008
1009 ss_len = sizeof(ss);
1010 if (getsockname(sock, (struct sockaddr *)&ss, &ss_len) < 0) {
1011 close(sock);
1012 return -1;
1013 }
1014
1015 if (ss.ss_family == AF_INET)
1016 IP_AS_V4(ss, port) = 0;
1017 else
1018 IP_AS_V6(ss, port) = 0;
1019
1020 memset(deflt, 0, sizeof(*deflt));
1021 memcpy(deflt, &ss, ss_len);
1022
1023 close(sock);
1024 return 0;
1025 }
1026
1027 /**
1028 *
1029 */
1030 int
tcp_server_bound(void * server,struct sockaddr_storage * bound,int family)1031 tcp_server_bound ( void *server, struct sockaddr_storage *bound, int family )
1032 {
1033 tcp_server_t *ts = server;
1034 int i, len, port;
1035 uint8_t *ptr;
1036
1037 if (server == NULL) {
1038 memset(bound, 0, sizeof(*bound));
1039 return 0;
1040 }
1041
1042 len = IP_IN_ADDRLEN(ts->bound);
1043 ptr = (uint8_t *)IP_IN_ADDR(ts->bound);
1044 for (i = 0; i < len; i++)
1045 if (ptr[0])
1046 break;
1047 if (i < len) {
1048 *bound = ts->bound;
1049 return 0;
1050 }
1051 port = IP_PORT(ts->bound);
1052
1053 /* no bind address was set, try to find one */
1054 if (tcp_default_ip_addr(bound, family) < 0)
1055 return -1;
1056 if (bound->ss_family == AF_INET)
1057 IP_AS_V4(*bound, port) = port;
1058 else
1059 IP_AS_V6(*bound, port) = port;
1060 return 0;
1061 }
1062
1063 /**
1064 *
1065 */
1066 int
tcp_server_onall(void * server)1067 tcp_server_onall ( void *server )
1068 {
1069 tcp_server_t *ts = server;
1070 int i, len;
1071 uint8_t *ptr;
1072
1073 if (server == NULL) return 0;
1074
1075 len = IP_IN_ADDRLEN(ts->bound);
1076 ptr = (uint8_t *)IP_IN_ADDR(ts->bound);
1077 for (i = 0; i < len; i++)
1078 if (ptr[0])
1079 break;
1080 return i >= len;
1081 }
1082
1083 /*
1084 * Connections status
1085 */
1086 htsmsg_t *
tcp_server_connections(void)1087 tcp_server_connections ( void )
1088 {
1089 tcp_server_launch_t *tsl;
1090 lock_assert(&global_lock);
1091 htsmsg_t *l, *e, *m;
1092 char buf[1024];
1093 int c = 0;
1094
1095 /* Build list */
1096 l = htsmsg_create_list();
1097 LIST_FOREACH(tsl, &tcp_server_launches, link) {
1098 if (!tsl->status) continue;
1099 c++;
1100 e = htsmsg_create_map();
1101 htsmsg_add_u32(e, "id", tsl->id);
1102 tcp_get_str_from_ip(&tsl->self, buf, sizeof(buf));
1103 htsmsg_add_str(e, "server", buf);
1104 htsmsg_add_u32(e, "server_port", ntohs(IP_PORT(tsl->self)));
1105 tcp_get_str_from_ip(&tsl->peer, buf, sizeof(buf));
1106 htsmsg_add_str(e, "peer", buf);
1107 htsmsg_add_u32(e, "peer_port", ntohs(IP_PORT(tsl->peer)));
1108 htsmsg_add_s64(e, "started", tsl->started);
1109 tsl->status(tsl->opaque, e);
1110 htsmsg_add_msg(l, NULL, e);
1111 }
1112
1113 /* Output */
1114 m = htsmsg_create_map();
1115 htsmsg_add_msg(m, "entries", l);
1116 htsmsg_add_u32(m, "totalCount", c);
1117 return m;
1118 }
1119
1120 /**
1121 *
1122 */
1123 pthread_t tcp_server_tid;
1124
1125 void
tcp_server_preinit(int opt_ipv6)1126 tcp_server_preinit(int opt_ipv6)
1127 {
1128 if(opt_ipv6)
1129 tcp_preferred_address_family = AF_INET6;
1130 }
1131
1132 void
tcp_server_init(void)1133 tcp_server_init(void)
1134 {
1135 tvhpoll_event_t ev;
1136 tvh_pipe(O_NONBLOCK, &tcp_server_pipe);
1137 tcp_server_poll = tvhpoll_create(10);
1138
1139 memset(&ev, 0, sizeof(ev));
1140 ev.fd = tcp_server_pipe.rd;
1141 ev.events = TVHPOLL_IN;
1142 ev.data.ptr = &tcp_server_pipe;
1143 tvhpoll_add(tcp_server_poll, &ev, 1);
1144
1145 atomic_set(&tcp_server_running, 1);
1146 tvhthread_create(&tcp_server_tid, NULL, tcp_server_loop, NULL, "tcp-loop");
1147 }
1148
1149 void
tcp_server_done(void)1150 tcp_server_done(void)
1151 {
1152 tcp_server_t *ts;
1153 tcp_server_launch_t *tsl;
1154 char c = 'E';
1155 int64_t t;
1156
1157 atomic_set(&tcp_server_running, 0);
1158 tvh_write(tcp_server_pipe.wr, &c, 1);
1159
1160 pthread_mutex_lock(&global_lock);
1161 LIST_FOREACH(tsl, &tcp_server_active, alink) {
1162 if (tsl->ops.cancel)
1163 tsl->ops.cancel(tsl->opaque);
1164 if (tsl->fd >= 0)
1165 shutdown(tsl->fd, SHUT_RDWR);
1166 pthread_kill(tsl->tid, SIGTERM);
1167 }
1168 pthread_mutex_unlock(&global_lock);
1169
1170 pthread_join(tcp_server_tid, NULL);
1171 tvh_pipe_close(&tcp_server_pipe);
1172 tvhpoll_destroy(tcp_server_poll);
1173
1174 pthread_mutex_lock(&global_lock);
1175 t = mclk();
1176 while (LIST_FIRST(&tcp_server_active) != NULL) {
1177 if (t + sec2mono(5) < mclk())
1178 tvhtrace(LS_TCP, "tcp server %p active too long", LIST_FIRST(&tcp_server_active));
1179 pthread_mutex_unlock(&global_lock);
1180 tvh_safe_usleep(20000);
1181 pthread_mutex_lock(&global_lock);
1182 }
1183 while ((tsl = LIST_FIRST(&tcp_server_join)) != NULL) {
1184 LIST_REMOVE(tsl, jlink);
1185 pthread_mutex_unlock(&global_lock);
1186 pthread_join(tsl->tid, NULL);
1187 free(tsl);
1188 pthread_mutex_lock(&global_lock);
1189 }
1190 while ((ts = LIST_FIRST(&tcp_server_delete_list)) != NULL) {
1191 LIST_REMOVE(ts, link);
1192 free(ts);
1193 }
1194 pthread_mutex_unlock(&global_lock);
1195 }
1196