1 /*
2  *  tvheadend, TCP common functions
3  *  Copyright (C) 2007 Andreas Öman
4  *
5  *  This program is free software: you can redistribute it and/or modify
6  *  it under the terms of the GNU General Public License as published by
7  *  the Free Software Foundation, either version 3 of the License, or
8  *  (at your option) any later version.
9  *
10  *  This program is distributed in the hope that it will be useful,
11  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  *  GNU General Public License for more details.
14  *
15  *  You should have received a copy of the GNU General Public License
16  *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
17  */
18 
19 #include <pthread.h>
20 #include <netdb.h>
21 #include <sys/types.h>
22 #include <sys/socket.h>
23 #include <poll.h>
24 #include <assert.h>
25 #include <stdio.h>
26 #include <unistd.h>
27 #include <stdlib.h>
28 #include <string.h>
29 #include <stdarg.h>
30 #include <fcntl.h>
31 #include <errno.h>
32 #include <signal.h>
33 #include <netinet/in.h>
34 #include <netinet/ip.h>
35 #include <netinet/tcp.h>
36 #include <arpa/inet.h>
37 
38 #include "tvheadend.h"
39 #include "tcp.h"
40 #include "tvhpoll.h"
41 #include "notify.h"
42 #include "access.h"
43 #include "dvr/dvr.h"
44 #define COMPAT_IPTOS
45 #include "compat.h"
46 
47 #if ENABLE_LIBSYSTEMD_DAEMON
48 #include <systemd/sd-daemon.h>
49 #endif
50 
51 int tcp_preferred_address_family = AF_INET;
52 int tcp_server_running;
53 th_pipe_t tcp_server_pipe;
54 
55 /**
56  *
57  */
58 int
socket_set_dscp(int sockfd,uint32_t dscp,char * errbuf,size_t errbufsize)59 socket_set_dscp(int sockfd, uint32_t dscp, char *errbuf, size_t errbufsize)
60 {
61   int r, v;
62 
63   v = dscp & IPTOS_DSCP_MASK;
64   r = setsockopt(sockfd, IPPROTO_IP, IP_TOS, &v, sizeof(v));
65   if (r < 0) {
66     if (errbuf && errbufsize)
67       snprintf(errbuf, errbufsize, "IP_TOS failed: %s", strerror(errno));
68     return -1;
69   }
70   return 0;
71 }
72 
73 /**
74  *
75  */
76 int
tcp_connect(const char * hostname,int port,const char * bindaddr,char * errbuf,size_t errbufsize,int timeout)77 tcp_connect(const char *hostname, int port, const char *bindaddr,
78             char *errbuf, size_t errbufsize, int timeout)
79 {
80   int fd = -1, r, res, err;
81   struct addrinfo *ai, *rai = NULL, hints;
82   struct sockaddr_storage bindip;
83   char portstr[6];
84   socklen_t errlen = sizeof(err);
85 
86   errbuf[0] = '\0';
87 
88   memset(&bindip, 0, sizeof(bindip));
89   bindip.ss_family = AF_UNSPEC;
90   if (bindaddr && bindaddr[0] != '\0') {
91     if (tcp_get_ip_from_str(bindaddr, &bindip) == NULL) {
92       snprintf(errbuf, errbufsize, "Cannot bind to addr '%s'", bindaddr);
93       return -1;
94     }
95   }
96 
97   snprintf(portstr, 6, "%u", port);
98   memset(&hints, 0, sizeof(struct addrinfo));
99   hints.ai_family = AF_UNSPEC;
100   res = getaddrinfo(hostname, portstr, &hints, &ai);
101   if (res != 0) {
102     snprintf(errbuf, errbufsize, "%s", gai_strerror(res));
103     return -1;
104   }
105 
106 again:
107   if (fd >= 0) {
108     close(fd);
109     fd = -1;
110   }
111   rai = rai == NULL ? ai : rai->ai_next;
112   if (rai == NULL) {
113     if (errbuf[0] == '\0')
114       snprintf(errbuf, errbufsize, "Invalid or unresolved hostname '%s'", hostname);
115     goto error;
116   }
117 
118   if (bindip.ss_family == AF_UNSPEC) {
119     if (rai->ai_family != AF_INET && rai->ai_family != AF_INET6)
120       goto again;
121   } else if (rai->ai_family != bindip.ss_family) {
122     goto again;
123   }
124 
125   fd = tvh_socket(rai->ai_family, SOCK_STREAM, 0);
126   if(fd < 0) {
127     snprintf(errbuf, errbufsize, "Unable to create socket: %s",
128 	     strerror(errno));
129     goto again;
130   }
131 
132   /**
133    * Switch to nonblocking
134    */
135   fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK);
136 
137   if (bindip.ss_family != AF_UNSPEC) {
138     if (bind(fd, (struct sockaddr *)&bindip, IP_IN_ADDRLEN(bindip)) < 0) {
139       snprintf(errbuf, errbufsize, "Cannot bind to IPv%s addr '%s:%i': %s",
140                                    bindip.ss_family == AF_INET6 ? "6" : "4",
141                                    bindaddr, htons(IP_PORT(bindip)),
142                                    strerror(errno));
143       goto error;
144     }
145   }
146 
147   r = connect(fd, rai->ai_addr, rai->ai_addrlen);
148 
149   if(r < 0) {
150     /* timeout < 0 - do not wait at all */
151     if(errno == EINPROGRESS && timeout < 0) {
152       err = 0;
153     } else if(errno == EINPROGRESS) {
154       tvhpoll_event_t ev;
155       tvhpoll_t *efd;
156 
157       efd = tvhpoll_create(1);
158       memset(&ev, 0, sizeof(ev));
159       ev.events   = TVHPOLL_OUT;
160       ev.fd       = fd;
161       ev.data.ptr = &fd;
162       tvhpoll_add(efd, &ev, 1);
163 
164       /* minimal timeout is one second */
165       if (timeout < 1)
166         timeout = 1;
167 
168       while (1) {
169         if (!tvheadend_is_running()) {
170           errbuf[0] = '\0';
171           tvhpoll_destroy(efd);
172           goto error;
173         }
174 
175         r = tvhpoll_wait(efd, &ev, 1, timeout * 1000);
176         if (r > 0)
177           break;
178 
179         if (r == 0) { /* Timeout */
180           snprintf(errbuf, errbufsize, "Connection attempt to '%s' timed out", hostname);
181           tvhpoll_destroy(efd);
182           goto again;
183         }
184 
185         if (!ERRNO_AGAIN(errno)) {
186           snprintf(errbuf, errbufsize, "poll() error: %s", strerror(errno));
187           tvhpoll_destroy(efd);
188           goto error;
189         }
190       }
191 
192       tvhpoll_destroy(efd);
193       getsockopt(fd, SOL_SOCKET, SO_ERROR, (void *)&err, &errlen);
194     } else {
195       err = errno;
196     }
197   } else {
198     err = 0;
199   }
200 
201   if(err != 0) {
202     snprintf(errbuf, errbufsize, "%s", strerror(err));
203     goto again;
204   }
205 
206   fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) & ~O_NONBLOCK);
207 
208   /* Set the keep-alive active */
209   err = 1;
210   setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, (void *)&err, errlen);
211 
212   freeaddrinfo(ai);
213   return fd;
214 
215 error:
216   if (fd >= 0)
217     close(fd);
218   freeaddrinfo(ai);
219   return -1;
220 }
221 
222 
223 /**
224  *
225  */
226 int
tcp_write_queue(int fd,htsbuf_queue_t * q)227 tcp_write_queue(int fd, htsbuf_queue_t *q)
228 {
229   htsbuf_data_t *hd;
230   int l, r = 0;
231   void *p;
232 
233   while((hd = TAILQ_FIRST(&q->hq_q)) != NULL) {
234     if (!r) {
235       l = hd->hd_data_len - hd->hd_data_off;
236       p = hd->hd_data + hd->hd_data_off;
237       r = tvh_write(fd, p, l);
238     }
239     htsbuf_data_free(q, hd);
240   }
241   q->hq_size = 0;
242   return r;
243 }
244 
245 
246 /**
247  *
248  */
249 static int
tcp_fill_htsbuf_from_fd(int fd,htsbuf_queue_t * hq)250 tcp_fill_htsbuf_from_fd(int fd, htsbuf_queue_t *hq)
251 {
252   htsbuf_data_t *hd = TAILQ_LAST(&hq->hq_q, htsbuf_data_queue);
253   int c, r;
254 
255   if(hd != NULL) {
256     /* Fill out any previous buffer */
257     c = hd->hd_data_size - hd->hd_data_len;
258 
259     if(c > 0) {
260 
261       do {
262         r = read(fd, hd->hd_data + hd->hd_data_len, c);
263       } while (r < 0 && ERRNO_AGAIN(errno));
264       if(r < 1)
265 	return -1;
266 
267       hd->hd_data_len += r;
268       hq->hq_size += r;
269       return 0;
270     }
271   }
272 
273   hd = malloc(sizeof(htsbuf_data_t));
274 
275   hd->hd_data_size = 1000;
276   hd->hd_data = malloc(hd->hd_data_size);
277 
278   do {
279     r = read(fd, hd->hd_data, hd->hd_data_size);
280   } while (r < 0 && ERRNO_AGAIN(errno));
281   if(r < 1) {
282     free(hd->hd_data);
283     free(hd);
284     return -1;
285   }
286   hd->hd_data_len = r;
287   hd->hd_data_off = 0;
288   TAILQ_INSERT_TAIL(&hq->hq_q, hd, hd_link);
289   hq->hq_size += r;
290   return 0;
291 }
292 
293 
294 /**
295  *
296  */
297 char *
tcp_read_line(int fd,htsbuf_queue_t * spill)298 tcp_read_line(int fd, htsbuf_queue_t *spill)
299 {
300   int len;
301   char *buf;
302 
303   do {
304     len = htsbuf_find(spill, 0xa);
305 
306     if(len == -1) {
307       if(tcp_fill_htsbuf_from_fd(fd, spill) < 0)
308         return NULL;
309     }
310   } while (len == -1);
311 
312   buf = malloc(len+1);
313 
314   htsbuf_read(spill, buf, len);
315   buf[len] = 0;
316   while(len > 0 && buf[len - 1] < 32)
317     buf[--len] = 0;
318   htsbuf_drop(spill, 1); /* Drop the \n */
319   return buf;
320 }
321 
322 
323 
324 /**
325  *
326  */
327 int
tcp_read_data(int fd,char * buf,const size_t bufsize,htsbuf_queue_t * spill)328 tcp_read_data(int fd, char *buf, const size_t bufsize, htsbuf_queue_t *spill)
329 {
330   int x, tot = htsbuf_read(spill, buf, bufsize);
331 
332   if(tot == bufsize)
333     return 0;
334 
335   x = recv(fd, buf + tot, bufsize - tot, MSG_WAITALL);
336   if(x != bufsize - tot)
337     return -1;
338 
339   return 0;
340 }
341 
342 /**
343  *
344  */
345 int
tcp_read(int fd,void * buf,size_t len)346 tcp_read(int fd, void *buf, size_t len)
347 {
348   int x = recv(fd, buf, len, MSG_WAITALL);
349 
350   if(x == -1)
351     return errno;
352   if(x != len)
353     return ECONNRESET;
354   return 0;
355 
356 }
357 
358 /**
359  *
360  */
361 int
tcp_read_timeout(int fd,void * buf,size_t len,int timeout)362 tcp_read_timeout(int fd, void *buf, size_t len, int timeout)
363 {
364   int x, tot = 0;
365   struct pollfd fds;
366 
367   assert(timeout > 0);
368 
369   fds.fd = fd;
370   fds.events = POLLIN;
371   fds.revents = 0;
372 
373   while(tot != len) {
374 
375     x = poll(&fds, 1, timeout);
376     if(x == 0)
377       return ETIMEDOUT;
378     if(x == -1) {
379       if (!tvheadend_is_running())
380         return ECONNRESET;
381       if (ERRNO_AGAIN(errno))
382         continue;
383       return errno;
384     }
385 
386     x = recv(fd, buf + tot, len - tot, MSG_DONTWAIT);
387     if(x == -1) {
388       if(ERRNO_AGAIN(errno))
389         continue;
390       return errno;
391     }
392 
393     if(x == 0)
394       return ECONNRESET;
395 
396     tot += x;
397   }
398 
399   return 0;
400 
401 }
402 
403 /**
404  *
405  */
406 int
tcp_socket_dead(int fd)407 tcp_socket_dead(int fd)
408 {
409   int err = 0;
410   socklen_t errlen = sizeof(err);
411 
412   if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &err, &errlen))
413     return -errno;
414   if (err)
415     return -err;
416 #ifdef PLATFORM_FREEBSD
417   if (recv(fd, NULL, 0, MSG_PEEK | MSG_DONTWAIT) < 0)
418     return -errno;
419 #else
420   if (recv(fd, NULL, 0, MSG_PEEK | MSG_DONTWAIT) == 0)
421     return -EIO;
422 #endif
423   return 0;
424 }
425 
426 /**
427  *
428  */
429 char *
tcp_get_str_from_ip(const struct sockaddr_storage * sa,char * dst,size_t maxlen)430 tcp_get_str_from_ip(const struct sockaddr_storage *sa, char *dst, size_t maxlen)
431 {
432   if (sa == NULL || dst == NULL)
433     return NULL;
434 
435   switch(sa->ss_family)
436   {
437     case AF_INET:
438       inet_ntop(AF_INET, &(((struct sockaddr_in*)sa)->sin_addr), dst, maxlen);
439       break;
440     case AF_INET6:
441       inet_ntop(AF_INET6, &(((struct sockaddr_in6*)sa)->sin6_addr), dst, maxlen);
442       break;
443     default:
444       strlcpy(dst, "Unknown AF", maxlen);
445       return NULL;
446   }
447 
448   return dst;
449 }
450 
451 /**
452  *
453  */
454 struct sockaddr_storage *
tcp_get_ip_from_str(const char * src,struct sockaddr_storage * sa)455 tcp_get_ip_from_str(const char *src, struct sockaddr_storage *sa)
456 {
457   if (sa == NULL || src == NULL)
458     return NULL;
459 
460   if (strstr(src, ":")) {
461     sa->ss_family = AF_INET6;
462     if (inet_pton(AF_INET6, src, &(((struct sockaddr_in6*)sa)->sin6_addr)) != 1)
463       return NULL;
464   } else if (strstr(src, ".")) {
465     sa->ss_family = AF_INET;
466     if (inet_pton(AF_INET, src, &(((struct sockaddr_in*)sa)->sin_addr)) != 1)
467       return NULL;
468   } else {
469     return NULL;
470   }
471 
472   return sa;
473 }
474 
475 /**
476  *
477  */
478 static tvhpoll_t *tcp_server_poll;
479 static uint32_t tcp_server_launch_id;
480 
481 typedef struct tcp_server {
482   int serverfd;
483   struct sockaddr_storage bound;
484   tcp_server_ops_t ops;
485   void *opaque;
486   LIST_ENTRY(tcp_server) link;
487 } tcp_server_t;
488 
489 typedef struct tcp_server_launch {
490   pthread_t tid;
491   uint32_t id;
492   int fd;
493   tcp_server_ops_t ops;
494   void *opaque;
495   char *representative;
496   void (*status) (void *opaque, htsmsg_t *m);
497   struct sockaddr_storage peer;
498   struct sockaddr_storage self;
499   time_t started;
500   LIST_ENTRY(tcp_server_launch) link;
501   LIST_ENTRY(tcp_server_launch) alink;
502   LIST_ENTRY(tcp_server_launch) jlink;
503 } tcp_server_launch_t;
504 
505 static LIST_HEAD(, tcp_server) tcp_server_delete_list = { 0 };
506 static LIST_HEAD(, tcp_server_launch) tcp_server_launches = { 0 };
507 static LIST_HEAD(, tcp_server_launch) tcp_server_active = { 0 };
508 static LIST_HEAD(, tcp_server_launch) tcp_server_join = { 0 };
509 
510 /**
511  *
512  */
513 uint32_t
tcp_connection_count(access_t * aa)514 tcp_connection_count(access_t *aa)
515 {
516   tcp_server_launch_t *tsl;
517   uint32_t used = 0;
518 
519   lock_assert(&global_lock);
520 
521   if (aa == NULL)
522     return 0;
523 
524   LIST_FOREACH(tsl, &tcp_server_active, alink)
525     if (!strcmp(aa->aa_representative ?: "", tsl->representative ?: ""))
526       used++;
527   return used;
528 }
529 
530 /**
531  *
532  */
533 void *
tcp_connection_launch(int fd,void (* status)(void * opaque,htsmsg_t * m),access_t * aa)534 tcp_connection_launch
535   (int fd, void (*status) (void *opaque, htsmsg_t *m), access_t *aa)
536 {
537   tcp_server_launch_t *tsl, *res;
538   uint32_t used, used2;
539   int64_t started = mclk();
540   int c1, c2;
541 
542   lock_assert(&global_lock);
543 
544   assert(status);
545 
546   if (aa == NULL)
547     return NULL;
548 
549 try_again:
550   res = NULL;
551   used = 0;
552   LIST_FOREACH(tsl, &tcp_server_active, alink) {
553     if (tsl->fd == fd) {
554       res = tsl;
555       if (!aa->aa_conn_limit && !aa->aa_conn_limit_streaming)
556         break;
557       continue;
558     }
559     if (!strcmp(aa->aa_representative ?: "", tsl->representative ?: ""))
560       used++;
561   }
562   if (res == NULL)
563     return NULL;
564 
565   if (aa->aa_conn_limit || aa->aa_conn_limit_streaming) {
566     used2 = aa->aa_conn_limit ? dvr_usage_count(aa) : 0;
567     /* the rule is: allow if one condition is OK */
568     c1 = aa->aa_conn_limit ? used + used2 >= aa->aa_conn_limit : -1;
569     c2 = aa->aa_conn_limit_streaming ? used >= aa->aa_conn_limit_streaming : -1;
570 
571     if (c1 && c2) {
572       if (started + sec2mono(3) < mclk()) {
573         tvherror(LS_TCP, "multiple connections are not allowed for user '%s' from '%s' "
574                         "(limit %u, streaming limit %u, active streaming %u, DVR %u)",
575                  aa->aa_username ?: "", aa->aa_representative ?: "",
576                  aa->aa_conn_limit, aa->aa_conn_limit_streaming,
577                  used, used2);
578         return NULL;
579       }
580       pthread_mutex_unlock(&global_lock);
581       tvh_safe_usleep(250000);
582       pthread_mutex_lock(&global_lock);
583       if (tvheadend_is_running())
584         goto try_again;
585       return NULL;
586     }
587   }
588 
589   res->representative = aa->aa_representative ? strdup(aa->aa_representative) : NULL;
590   res->status = status;
591   LIST_INSERT_HEAD(&tcp_server_launches, res, link);
592   notify_reload("connections");
593   return res;
594 }
595 
596 /**
597  *
598  */
599 void
tcp_connection_land(void * tcp_id)600 tcp_connection_land(void *tcp_id)
601 {
602   tcp_server_launch_t *tsl = tcp_id;
603 
604   lock_assert(&global_lock);
605 
606   if (tsl == NULL)
607     return;
608 
609   LIST_REMOVE(tsl, link);
610   notify_reload("connections");
611 
612   free(tsl->representative);
613   tsl->representative = NULL;
614 }
615 
616 /**
617  *
618  */
619 void
tcp_connection_cancel(uint32_t id)620 tcp_connection_cancel(uint32_t id)
621 {
622   tcp_server_launch_t *tsl;
623 
624   lock_assert(&global_lock);
625 
626   LIST_FOREACH(tsl, &tcp_server_active, alink)
627     if (tsl->id == id) {
628       if (tsl->ops.cancel)
629         tsl->ops.cancel(tsl->opaque);
630       break;
631     }
632 }
633 
634 /*
635  *
636  */
637 static void *
tcp_server_start(void * aux)638 tcp_server_start(void *aux)
639 {
640   tcp_server_launch_t *tsl = aux;
641   struct timeval to;
642   int val;
643   char c = 'J';
644 
645   val = 1;
646   setsockopt(tsl->fd, SOL_SOCKET, SO_KEEPALIVE, &val, sizeof(val));
647 
648 #ifdef TCP_KEEPIDLE
649   val = 30;
650   setsockopt(tsl->fd, IPPROTO_TCP, TCP_KEEPIDLE, &val, sizeof(val));
651 #endif
652 
653 #ifdef TCP_KEEPINVL
654   val = 15;
655   setsockopt(tsl->fd, IPPROTO_TCP, TCP_KEEPINTVL, &val, sizeof(val));
656 #endif
657 
658 #ifdef TCP_KEEPCNT
659   val = 5;
660   setsockopt(tsl->fd, IPPROTO_TCP, TCP_KEEPCNT, &val, sizeof(val));
661 #endif
662 
663   val = 1;
664   setsockopt(tsl->fd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val));
665 
666   to.tv_sec  = 30;
667   to.tv_usec =  0;
668   setsockopt(tsl->fd, SOL_SOCKET, SO_SNDTIMEO, &to, sizeof(to));
669 
670   /* Start */
671   time(&tsl->started);
672   pthread_mutex_lock(&global_lock);
673   tsl->id = ++tcp_server_launch_id;
674   if (!tsl->id) tsl->id = ++tcp_server_launch_id;
675   tsl->ops.start(tsl->fd, &tsl->opaque, &tsl->peer, &tsl->self);
676 
677   /* Stop */
678   if (tsl->ops.stop) tsl->ops.stop(tsl->opaque);
679   LIST_REMOVE(tsl, alink);
680   LIST_INSERT_HEAD(&tcp_server_join, tsl, jlink);
681   pthread_mutex_unlock(&global_lock);
682   if (atomic_get(&tcp_server_running))
683     tvh_write(tcp_server_pipe.wr, &c, 1);
684   return NULL;
685 }
686 
687 
688 /**
689  *
690  */
691 static void *
tcp_server_loop(void * aux)692 tcp_server_loop(void *aux)
693 {
694   int r;
695   tvhpoll_event_t ev;
696   tcp_server_t *ts;
697   tcp_server_launch_t *tsl;
698   socklen_t slen;
699   char c;
700 
701   while(atomic_get(&tcp_server_running)) {
702     r = tvhpoll_wait(tcp_server_poll, &ev, 1, -1);
703     if(r < 0) {
704       if (ERRNO_AGAIN(errno))
705         continue;
706       tvherror(LS_TCP, "tcp_server_loop: tvhpoll_wait: %s", strerror(errno));
707       continue;
708     }
709 
710     if (r == 0) continue;
711 
712     if (ev.data.ptr == &tcp_server_pipe) {
713       r = read(tcp_server_pipe.rd, &c, 1);
714       if (r > 0) {
715 next:
716         pthread_mutex_lock(&global_lock);
717         while ((tsl = LIST_FIRST(&tcp_server_join)) != NULL) {
718           LIST_REMOVE(tsl, jlink);
719           pthread_mutex_unlock(&global_lock);
720           pthread_join(tsl->tid, NULL);
721           free(tsl);
722           goto next;
723         }
724         while ((ts = LIST_FIRST(&tcp_server_delete_list)) != NULL) {
725           LIST_REMOVE(ts, link);
726           free(ts);
727         }
728         pthread_mutex_unlock(&global_lock);
729       }
730       continue;
731     }
732 
733     ts = ev.data.ptr;
734 
735     if(ev.events & TVHPOLL_HUP) {
736       close(ts->serverfd);
737       free(ts);
738       continue;
739     }
740 
741     if(ev.events & TVHPOLL_IN) {
742       tsl = malloc(sizeof(tcp_server_launch_t));
743       tsl->ops            = ts->ops;
744       tsl->opaque         = ts->opaque;
745       tsl->status         = NULL;
746       tsl->representative = NULL;
747       slen = sizeof(struct sockaddr_storage);
748 
749       tsl->fd = accept(ts->serverfd,
750                        (struct sockaddr *)&tsl->peer, &slen);
751       if(tsl->fd == -1) {
752      	perror("accept");
753      	free(tsl);
754      	sleep(1);
755      	continue;
756       }
757 
758       slen = sizeof(struct sockaddr_storage);
759       if(getsockname(tsl->fd, (struct sockaddr *)&tsl->self, &slen)) {
760         close(tsl->fd);
761         free(tsl);
762         continue;
763       }
764 
765       pthread_mutex_lock(&global_lock);
766       LIST_INSERT_HEAD(&tcp_server_active, tsl, alink);
767       pthread_mutex_unlock(&global_lock);
768       tvhthread_create(&tsl->tid, NULL, tcp_server_start, tsl, "tcp-start");
769     }
770   }
771   tvhtrace(LS_TCP, "server thread finished");
772   return NULL;
773 }
774 
775 
776 
777 /**
778  *
779  */
780 #if ENABLE_LIBSYSTEMD_DAEMON
tcp_server_create_new(int subsystem,const char * name,const char * bindaddr,int port,tcp_server_ops_t * ops,void * opaque)781 static void *tcp_server_create_new
782 #else
783 void *tcp_server_create
784 #endif
785   (int subsystem, const char *name, const char *bindaddr,
786    int port, tcp_server_ops_t *ops, void *opaque)
787 {
788   int fd, x;
789   tcp_server_t *ts;
790   struct addrinfo hints, *res, *ressave, *use = NULL;
791   struct sockaddr_storage bound;
792   char port_buf[6], buf[50];
793   int one = 1;
794   int zero = 0;
795 
796   snprintf(port_buf, 6, "%d", port);
797 
798   memset(&hints, 0, sizeof(struct addrinfo));
799   hints.ai_flags = AI_PASSIVE | AI_NUMERICSERV;
800   if (bindaddr != NULL)
801       hints.ai_flags |= AI_NUMERICHOST;
802   hints.ai_family = AF_UNSPEC;
803   hints.ai_socktype = SOCK_STREAM;
804 
805   x = getaddrinfo(bindaddr, port_buf, &hints, &res);
806 
807   if(x != 0) {
808     tvherror(LS_TCP, "getaddrinfo: %s: %s", bindaddr != NULL ? bindaddr : "*",
809              x == EAI_SYSTEM ? strerror(errno) : gai_strerror(x));
810     return NULL;
811   }
812 
813   ressave = res;
814   while(res) {
815     if(res->ai_family == tcp_preferred_address_family) {
816       use = res;
817       break;
818     } else if(use == NULL) {
819       use = res;
820     }
821     res = res->ai_next;
822   }
823 
824   fd = tvh_socket(use->ai_family, use->ai_socktype, use->ai_protocol);
825   if(fd == -1) {
826     freeaddrinfo(ressave);
827     return NULL;
828   }
829 
830   if(use->ai_family == AF_INET6)
831     setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &zero, sizeof(int));
832 
833   setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &one, sizeof(int));
834 
835   assert(use->ai_addrlen <= sizeof(bound));
836   memset(&bound, 0, sizeof(bound));
837   memcpy(&bound, use->ai_addr, use->ai_addrlen);
838 
839   x = bind(fd, use->ai_addr, use->ai_addrlen);
840   freeaddrinfo(ressave);
841 
842   if(x != 0)
843   {
844     tvherror(LS_TCP, "bind: %s:%i: %s", bindaddr != NULL ? bindaddr : "*", port, strerror(errno));
845     close(fd);
846     return NULL;
847   }
848 
849   listen(fd, 511);
850 
851   ts = malloc(sizeof(tcp_server_t));
852   ts->serverfd = fd;
853   ts->bound  = bound;
854   ts->ops    = *ops;
855   ts->opaque = opaque;
856 
857   tcp_get_str_from_ip(&bound, buf, sizeof(buf));
858   tvhinfo(subsystem, "Starting %s server %s:%d", name, buf, htons(IP_PORT(bound)));
859 
860   return ts;
861 }
862 
863 #if ENABLE_LIBSYSTEMD_DAEMON
864 /**
865  *
866  */
867 void *
tcp_server_create(int subsystem,const char * name,const char * bindaddr,int port,tcp_server_ops_t * ops,void * opaque)868 tcp_server_create
869   (int subsystem, const char *name, const char *bindaddr,
870    int port, tcp_server_ops_t *ops, void *opaque)
871 {
872   int sd_fds_num, i, fd;
873   struct sockaddr_storage bound;
874   tcp_server_t *ts;
875   struct in_addr addr4;
876   struct in6_addr addr6;
877   char buf[50];
878   int found = 0;
879 
880   sd_fds_num = sd_listen_fds(0);
881   inet_pton(AF_INET, bindaddr ?: "0.0.0.0", &addr4);
882   inet_pton(AF_INET6, bindaddr ?: "::", &addr6);
883 
884   for (i = 0; i < sd_fds_num && !found; i++) {
885     struct sockaddr_in *s_addr4;
886     struct sockaddr_in6 *s_addr6;
887     socklen_t s_len;
888 
889     /* Check which of the systemd-managed descriptors
890      * corresponds to the requested server (if any) */
891     fd = SD_LISTEN_FDS_START + i;
892     memset(&bound, 0, sizeof(bound));
893     s_len = sizeof(bound);
894     if (getsockname(fd, (struct sockaddr *) &bound, &s_len) != 0) {
895       tvherror(LS_TCP, "getsockname failed: %s", strerror(errno));
896       continue;
897     }
898     switch (bound.ss_family) {
899       case AF_INET:
900         s_addr4 = (struct sockaddr_in *) &bound;
901         if (addr4.s_addr == s_addr4->sin_addr.s_addr
902             && htons(port) == s_addr4->sin_port)
903           found = 1;
904         break;
905       case AF_INET6:
906         s_addr6 = (struct sockaddr_in6 *) &bound;
907         if (memcmp(addr6.s6_addr, s_addr6->sin6_addr.s6_addr, 16) == 0
908             && htons(port) == s_addr6->sin6_port)
909           found = 1;
910         break;
911       default:
912         break;
913     }
914   }
915 
916   if (found) {
917     /* use the systemd provided socket */
918     ts = malloc(sizeof(tcp_server_t));
919     ts->serverfd = fd;
920     ts->bound  = bound;
921     ts->ops    = *ops;
922     ts->opaque = opaque;
923     tcp_get_str_from_ip(&bound, buf, sizeof(buf));
924     tvhinfo(subsystem, "Starting %s server %s:%d (systemd)", name, buf, htons(IP_PORT(bound)));
925   } else {
926     /* no systemd-managed socket found, create a new one */
927     tvhinfo(LS_TCP, "No systemd socket: creating a new one");
928     ts = tcp_server_create_new(subsystem, name, bindaddr, port, ops, opaque);
929   }
930 
931   return ts;
932 }
933 #endif
934 
935 /**
936  *
937  */
tcp_server_register(void * server)938 void tcp_server_register(void *server)
939 {
940   tcp_server_t *ts = server;
941   tvhpoll_event_t ev;
942 
943   if (ts == NULL)
944     return;
945 
946   memset(&ev, 0, sizeof(ev));
947 
948   ev.fd       = ts->serverfd;
949   ev.events   = TVHPOLL_IN;
950   ev.data.ptr = ts;
951   tvhpoll_add(tcp_server_poll, &ev, 1);
952 }
953 
954 /**
955  *
956  */
957 void
tcp_server_delete(void * server)958 tcp_server_delete(void *server)
959 {
960   tcp_server_t *ts = server;
961   tvhpoll_event_t ev;
962   char c = 'D';
963 
964   if (server == NULL)
965     return;
966 
967   memset(&ev, 0, sizeof(ev));
968   ev.fd       = ts->serverfd;
969   ev.events   = TVHPOLL_IN;
970   ev.data.ptr = ts;
971   tvhpoll_rem(tcp_server_poll, &ev, 1);
972   close(ts->serverfd);
973   ts->serverfd = -1;
974   LIST_INSERT_HEAD(&tcp_server_delete_list, ts, link);
975   tvh_write(tcp_server_pipe.wr, &c, 1);
976 }
977 
978 /**
979  *
980  */
981 int
tcp_default_ip_addr(struct sockaddr_storage * deflt,int family)982 tcp_default_ip_addr ( struct sockaddr_storage *deflt, int family )
983 {
984 
985   struct sockaddr_storage ss;
986   socklen_t ss_len;
987   int sock;
988 
989   memset(&ss, 0, sizeof(ss));
990   ss.ss_family = family == PF_UNSPEC ? tcp_preferred_address_family : family;
991   if (inet_pton(ss.ss_family,
992                 ss.ss_family == AF_INET ?
993                   /* Google name servers */
994                   "8.8.8.8" : "2001:4860:4860::8888",
995                 IP_IN_ADDR(ss)) <= 0)
996     return -1;
997 
998   IP_PORT_SET(ss, htons(53));
999 
1000   sock = tvh_socket(ss.ss_family, SOCK_STREAM, 0);
1001   if (sock < 0)
1002     return -1;
1003 
1004   if (connect(sock, (struct sockaddr *)&ss, IP_IN_ADDRLEN(ss)) < 0) {
1005     close(sock);
1006     return -1;
1007   }
1008 
1009   ss_len = sizeof(ss);
1010   if (getsockname(sock, (struct sockaddr *)&ss, &ss_len) < 0) {
1011     close(sock);
1012     return -1;
1013   }
1014 
1015   if (ss.ss_family == AF_INET)
1016     IP_AS_V4(ss, port) = 0;
1017   else
1018     IP_AS_V6(ss, port) = 0;
1019 
1020   memset(deflt, 0, sizeof(*deflt));
1021   memcpy(deflt, &ss, ss_len);
1022 
1023   close(sock);
1024   return 0;
1025 }
1026 
1027 /**
1028  *
1029  */
1030 int
tcp_server_bound(void * server,struct sockaddr_storage * bound,int family)1031 tcp_server_bound ( void *server, struct sockaddr_storage *bound, int family )
1032 {
1033   tcp_server_t *ts = server;
1034   int i, len, port;
1035   uint8_t *ptr;
1036 
1037   if (server == NULL) {
1038     memset(bound, 0, sizeof(*bound));
1039     return 0;
1040   }
1041 
1042   len = IP_IN_ADDRLEN(ts->bound);
1043   ptr = (uint8_t *)IP_IN_ADDR(ts->bound);
1044   for (i = 0; i < len; i++)
1045     if (ptr[0])
1046       break;
1047   if (i < len) {
1048     *bound = ts->bound;
1049     return 0;
1050   }
1051   port = IP_PORT(ts->bound);
1052 
1053   /* no bind address was set, try to find one */
1054   if (tcp_default_ip_addr(bound, family) < 0)
1055     return -1;
1056   if (bound->ss_family == AF_INET)
1057     IP_AS_V4(*bound, port) = port;
1058   else
1059     IP_AS_V6(*bound, port) = port;
1060   return 0;
1061 }
1062 
1063 /**
1064  *
1065  */
1066 int
tcp_server_onall(void * server)1067 tcp_server_onall ( void *server )
1068 {
1069   tcp_server_t *ts = server;
1070   int i, len;
1071   uint8_t *ptr;
1072 
1073   if (server == NULL) return 0;
1074 
1075   len = IP_IN_ADDRLEN(ts->bound);
1076   ptr = (uint8_t *)IP_IN_ADDR(ts->bound);
1077   for (i = 0; i < len; i++)
1078     if (ptr[0])
1079       break;
1080   return i >= len;
1081 }
1082 
1083 /*
1084  * Connections status
1085  */
1086 htsmsg_t *
tcp_server_connections(void)1087 tcp_server_connections ( void )
1088 {
1089   tcp_server_launch_t *tsl;
1090   lock_assert(&global_lock);
1091   htsmsg_t *l, *e, *m;
1092   char buf[1024];
1093   int c = 0;
1094 
1095   /* Build list */
1096   l = htsmsg_create_list();
1097   LIST_FOREACH(tsl, &tcp_server_launches, link) {
1098     if (!tsl->status) continue;
1099     c++;
1100     e = htsmsg_create_map();
1101     htsmsg_add_u32(e, "id", tsl->id);
1102     tcp_get_str_from_ip(&tsl->self, buf, sizeof(buf));
1103     htsmsg_add_str(e, "server", buf);
1104     htsmsg_add_u32(e, "server_port", ntohs(IP_PORT(tsl->self)));
1105     tcp_get_str_from_ip(&tsl->peer, buf, sizeof(buf));
1106     htsmsg_add_str(e, "peer", buf);
1107     htsmsg_add_u32(e, "peer_port", ntohs(IP_PORT(tsl->peer)));
1108     htsmsg_add_s64(e, "started", tsl->started);
1109     tsl->status(tsl->opaque, e);
1110     htsmsg_add_msg(l, NULL, e);
1111   }
1112 
1113   /* Output */
1114   m = htsmsg_create_map();
1115   htsmsg_add_msg(m, "entries", l);
1116   htsmsg_add_u32(m, "totalCount", c);
1117   return m;
1118 }
1119 
1120 /**
1121  *
1122  */
1123 pthread_t tcp_server_tid;
1124 
1125 void
tcp_server_preinit(int opt_ipv6)1126 tcp_server_preinit(int opt_ipv6)
1127 {
1128   if(opt_ipv6)
1129     tcp_preferred_address_family = AF_INET6;
1130 }
1131 
1132 void
tcp_server_init(void)1133 tcp_server_init(void)
1134 {
1135   tvhpoll_event_t ev;
1136   tvh_pipe(O_NONBLOCK, &tcp_server_pipe);
1137   tcp_server_poll = tvhpoll_create(10);
1138 
1139   memset(&ev, 0, sizeof(ev));
1140   ev.fd       = tcp_server_pipe.rd;
1141   ev.events   = TVHPOLL_IN;
1142   ev.data.ptr = &tcp_server_pipe;
1143   tvhpoll_add(tcp_server_poll, &ev, 1);
1144 
1145   atomic_set(&tcp_server_running, 1);
1146   tvhthread_create(&tcp_server_tid, NULL, tcp_server_loop, NULL, "tcp-loop");
1147 }
1148 
1149 void
tcp_server_done(void)1150 tcp_server_done(void)
1151 {
1152   tcp_server_t *ts;
1153   tcp_server_launch_t *tsl;
1154   char c = 'E';
1155   int64_t t;
1156 
1157   atomic_set(&tcp_server_running, 0);
1158   tvh_write(tcp_server_pipe.wr, &c, 1);
1159 
1160   pthread_mutex_lock(&global_lock);
1161   LIST_FOREACH(tsl, &tcp_server_active, alink) {
1162     if (tsl->ops.cancel)
1163       tsl->ops.cancel(tsl->opaque);
1164     if (tsl->fd >= 0)
1165       shutdown(tsl->fd, SHUT_RDWR);
1166     pthread_kill(tsl->tid, SIGTERM);
1167   }
1168   pthread_mutex_unlock(&global_lock);
1169 
1170   pthread_join(tcp_server_tid, NULL);
1171   tvh_pipe_close(&tcp_server_pipe);
1172   tvhpoll_destroy(tcp_server_poll);
1173 
1174   pthread_mutex_lock(&global_lock);
1175   t = mclk();
1176   while (LIST_FIRST(&tcp_server_active) != NULL) {
1177     if (t + sec2mono(5) < mclk())
1178       tvhtrace(LS_TCP, "tcp server %p active too long", LIST_FIRST(&tcp_server_active));
1179     pthread_mutex_unlock(&global_lock);
1180     tvh_safe_usleep(20000);
1181     pthread_mutex_lock(&global_lock);
1182   }
1183   while ((tsl = LIST_FIRST(&tcp_server_join)) != NULL) {
1184     LIST_REMOVE(tsl, jlink);
1185     pthread_mutex_unlock(&global_lock);
1186     pthread_join(tsl->tid, NULL);
1187     free(tsl);
1188     pthread_mutex_lock(&global_lock);
1189   }
1190   while ((ts = LIST_FIRST(&tcp_server_delete_list)) != NULL) {
1191     LIST_REMOVE(ts, link);
1192     free(ts);
1193   }
1194   pthread_mutex_unlock(&global_lock);
1195 }
1196