1 /*
2  *  Tvheadend - SAT-IP server - RTP part
3  *
4  *  Copyright (C) 2015 Jaroslav Kysela
5  *
6  *  This program is free software: you can redistribute it and/or modify
7  *  it under the terms of the GNU General Public License as published by
8  *  the Free Software Foundation, either version 3 of the License, or
9  *  (at your option) any later version.
10  *
11  *  This program is distributed in the hope that it will be useful,
12  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  *  GNU General Public License for more details.
15  *
16  *  You should have received a copy of the GNU General Public License
17  *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
18  */
19 
20 #include <signal.h>
21 #include <ctype.h>
22 #include "tvheadend.h"
23 #include "config.h"
24 #include "input.h"
25 #include "streaming.h"
26 #include "satip/server.h"
27 #include <netinet/ip.h>
28 #if ENABLE_ANDROID
29 #include <sys/socket.h>
30 #endif
31 #define COMPAT_IPTOS
32 #include "compat.h"
33 
34 #define RTP_PACKETS 128
35 #define RTP_PAYLOAD (7*188+12)
36 #define RTP_TCP_MIN_PAYLOAD (7*188+12+4)   /* fit ethernet packet */
37 #define RTP_TCP_MAX_PAYLOAD (348*188+12+4) /* cca 64kB */
38 #define RTCP_PAYLOAD (1420)
39 
40 #define RTP_TCP_BUFFER_SIZE (64*1024*1024)
41 #define RTP_TCP_BUFFER_ROOM (2048)
42 
43 typedef struct satip_rtp_table {
44   TAILQ_ENTRY(satip_rtp_table) link;
45   mpegts_psi_table_t tbl;
46   int pid;
47   int remove_mark;
48 } satip_rtp_table_t;
49 
50 typedef struct satip_rtp_session {
51   TAILQ_ENTRY(satip_rtp_session) link;
52   pthread_t tid;
53   struct sockaddr_storage peer;
54   struct sockaddr_storage peer2;
55   int port;
56   th_subscription_t *subs;
57   streaming_queue_t *sq;
58   int fd_rtp;
59   int fd_rtcp;
60   int frontend;
61   int source;
62   int allow_data;
63   int disable_rtcp;
64   dvb_mux_conf_t dmc;
65   mpegts_apids_t pids;
66   TAILQ_HEAD(, satip_rtp_table) pmt_tables;
67   udp_multisend_t um;
68   struct iovec *um_iovec;
69   struct iovec tcp_data;
70   uint32_t tcp_payload;
71   uint32_t tcp_buffer_size;
72   int um_packet;
73   uint16_t seq;
74   signal_status_t sig;
75   int sig_lock;
76   pthread_mutex_t lock;
77   http_connection_t *hc;
78   sbuf_t table_data;
79   void (*no_data_cb)(void *opaque);
80   void *no_data_opaque;
81 } satip_rtp_session_t;
82 
83 static pthread_mutex_t satip_rtp_lock;
84 static pthread_t satip_rtcp_tid;
85 static int satip_rtcp_run;
86 static TAILQ_HEAD(, satip_rtp_session) satip_rtp_sessions;
87 
88 static void
satip_rtp_pmt_cb(mpegts_psi_table_t * mt,const uint8_t * buf,int len)89 satip_rtp_pmt_cb(mpegts_psi_table_t *mt, const uint8_t *buf, int len)
90 {
91   satip_rtp_session_t *rtp;
92   uint8_t out[1024], *ob;
93   // uint16_t sid, pid;
94   int l, ol;
95 
96   memcpy(out, buf, ol = 3);
97   buf += ol;
98   len -= ol;
99 
100   // sid = (buf[0] << 8) | buf[1];
101   l = (buf[7] & 0x0f) << 8 | buf[8];
102 
103   if (l > len - 9)
104     return;
105 
106   rtp = (satip_rtp_session_t *)mt->mt_opaque;
107 
108   memcpy(out + ol, buf, 9);
109 
110   ol  += 9;     /* skip common descriptors */
111   buf += 9 + l;
112   len -= 9 + l;
113 
114   /* no common descriptors */
115   out[7+3] &= 0xf0;
116   out[8+3] = 0;
117 
118   while (len >= 5) {
119     //pid = (buf[1] & 0x1f) << 8 | buf[2];
120     l   = (buf[3] & 0xf) << 8 | buf[4];
121 
122     if (l > len - 5)
123       return;
124 
125     if (sizeof(out) < ol + l + 5 + 4 /* crc */) {
126       tvherror(LS_PASS, "PMT entry too long (%i)", l);
127       return;
128     }
129 
130     memcpy(out + ol, buf, 5 + l);
131     ol += 5 + l;
132 
133     buf += 5 + l;
134     len -= 5 + l;
135   }
136 
137   /* update section length */
138   out[1] = (out[1] & 0xf0) | ((ol + 4 - 3) >> 8);
139   out[2] = (ol + 4 - 3) & 0xff;
140 
141   ol = dvb_table_append_crc32(out, ol, sizeof(out));
142 
143   if (ol > 0 && (l = dvb_table_remux(mt, out, ol, &ob)) > 0)
144     sbuf_append(&rtp->table_data, ob, l);
145 }
146 
147 static void
satip_rtp_header(satip_rtp_session_t * rtp,struct iovec * v,uint32_t off)148 satip_rtp_header(satip_rtp_session_t *rtp, struct iovec *v, uint32_t off)
149 {
150   uint8_t *data = v->iov_base;
151   uint32_t tstamp = mono2sec(mclk()) + rtp->seq;
152 
153   rtp->seq++;
154 
155   v->iov_len = off + 12;
156   data[off+0] = 0x80;
157   data[off+1] = 33;
158   data[off+2] = (rtp->seq >> 8) & 0xff;
159   data[off+3] = rtp->seq & 0xff;
160   data[off+4] = (tstamp >> 24) & 0xff;
161   data[off+5] = (tstamp >> 16) & 0xff;
162   data[off+6] = (tstamp >> 8) & 0xff;
163   data[off+7] = tstamp & 0xff;
164   memset(data + off + 8, 0xa5, 4);
165 }
166 
167 static int
satip_rtp_send(satip_rtp_session_t * rtp)168 satip_rtp_send(satip_rtp_session_t *rtp)
169 {
170   struct iovec *v = rtp->um_iovec, *v2;
171   int packets, copy, len, r;
172   if (v->iov_len == RTP_PAYLOAD) {
173     packets = rtp->um_packet;
174     v2 = v + packets;
175     copy = 1;
176     if (v2->iov_len == RTP_PAYLOAD) {
177       packets++;
178       copy = 0;
179     }
180     while (1) {
181       r = udp_multisend_send(&rtp->um, rtp->fd_rtp, packets);
182       if (r < 0) {
183         if (errno == EINTR)
184           continue;
185         if (errno == EAGAIN || errno == EWOULDBLOCK) {
186           tvh_usleep(100);
187           continue;
188         }
189         tvhtrace(LS_SATIPS, "rtp udp multisend failed (errno %d)", errno);
190         return r;
191       }
192       break;
193     }
194     if (r != packets) {
195       tvhtrace(LS_SATIPS, "rtp udp multisend failed (packets %d written %d)", packets, r);
196       return -1;
197     }
198     if (copy)
199       memcpy(v->iov_base, v2->iov_base, len = v2->iov_len);
200     else
201       len = 0;
202     rtp->um_packet = 0;
203     udp_multisend_clean(&rtp->um);
204     v->iov_len = len;
205   }
206   if (v->iov_len == 0)
207     satip_rtp_header(rtp, rtp->um_iovec + rtp->um_packet, 0);
208   return 0;
209 }
210 
211 static inline int
satip_rtp_append_data(satip_rtp_session_t * rtp,struct iovec ** _v,uint8_t * data)212 satip_rtp_append_data(satip_rtp_session_t *rtp, struct iovec **_v, uint8_t *data)
213 {
214   struct iovec *v = *_v;
215   int r;
216   assert(v->iov_len + 188 <= RTP_PAYLOAD);
217   memcpy(v->iov_base + v->iov_len, data, 188);
218   v->iov_len += 188;
219   if (v->iov_len == RTP_PAYLOAD) {
220     if ((rtp->um_packet + 1) == RTP_PACKETS) {
221       r = satip_rtp_send(rtp);
222       if (r < 0)
223         return r;
224     } else {
225       rtp->um_packet++;
226       satip_rtp_header(rtp, rtp->um_iovec + rtp->um_packet, 0);
227     }
228     *_v = rtp->um_iovec + rtp->um_packet;
229   } else {
230     assert(v->iov_len < RTP_PAYLOAD);
231   }
232   return 0;
233 }
234 
235 static int
satip_rtp_loop(satip_rtp_session_t * rtp,uint8_t * data,int len)236 satip_rtp_loop(satip_rtp_session_t *rtp, uint8_t *data, int len)
237 {
238   int i, j, pid, last_pid = -1, r;
239   mpegts_apid_t *pids = rtp->pids.pids;
240   struct iovec *v = rtp->um_iovec + rtp->um_packet;
241   satip_rtp_table_t *tbl;
242 
243   assert((len % 188) == 0);
244   for ( ; len >= 188 ; data += 188, len -= 188) {
245     pid = ((data[1] & 0x1f) << 8) | data[2];
246     if (pid != last_pid && !rtp->pids.all) {
247       for (i = 0; i < rtp->pids.count; i++) {
248         j = pids[i].pid;
249         if (pid < j) break;
250         if (j == pid) goto found;
251       }
252       continue;
253 found:
254       TAILQ_FOREACH(tbl, &rtp->pmt_tables, link)
255         if (tbl->pid == pid) {
256           dvb_table_parse(&tbl->tbl, "-", data, 188, 1, 0, satip_rtp_pmt_cb);
257           if (rtp->table_data.sb_ptr > 0) {
258             for (i = r = 0; i < rtp->table_data.sb_ptr; i += 188) {
259               r = satip_rtp_append_data(rtp, &v, rtp->table_data.sb_data + i);
260               if (r)
261                 break;
262             }
263             sbuf_reset(&rtp->table_data, 10*188);
264             if (r)
265               return r;
266           }
267           break;
268         }
269       if (tbl)
270         continue;
271       last_pid = pid;
272     }
273     r = satip_rtp_append_data(rtp, &v, data);
274     if (r < 0)
275       return r;
276   }
277   return 0;
278 }
279 
280 static int
satip_rtp_tcp_data(satip_rtp_session_t * rtp,uint8_t stream,uint8_t * data,size_t data_len,int may_discard)281 satip_rtp_tcp_data(satip_rtp_session_t *rtp, uint8_t stream,
282                    uint8_t *data, size_t data_len, int may_discard)
283 {
284   assert(data_len <= 0xffff);
285   data[0] = '$';
286   data[1] = stream;
287   data[2] = (data_len - 4) >> 8;
288   data[3] = (data_len - 4) & 0xff;
289   return http_extra_send_prealloc(rtp->hc, data, data_len, may_discard);
290 }
291 
292 static inline int
satip_rtp_flush_tcp_data(satip_rtp_session_t * rtp)293 satip_rtp_flush_tcp_data(satip_rtp_session_t *rtp)
294 {
295   struct iovec *v = &rtp->tcp_data;
296   int r = 0;
297 
298   if (v->iov_len)
299     r = satip_rtp_tcp_data(rtp, 0, v->iov_base, v->iov_len, 1);
300   else
301     free(v->iov_base);
302   v->iov_base = NULL;
303   v->iov_len = 0;
304   return r;
305 }
306 
307 static inline int
satip_rtp_append_tcp_data(satip_rtp_session_t * rtp,uint8_t * data,size_t len)308 satip_rtp_append_tcp_data(satip_rtp_session_t *rtp, uint8_t *data, size_t len)
309 {
310   struct iovec *v = &rtp->tcp_data;
311   int r = 0;
312 
313   if (v->iov_base == NULL) {
314     v->iov_base = malloc(rtp->tcp_payload);
315     satip_rtp_header(rtp, v, 4);
316   }
317   assert(v->iov_len + len <= rtp->tcp_payload);
318   memcpy(v->iov_base + v->iov_len, data, len);
319   v->iov_len += len;
320   if (v->iov_len == rtp->tcp_payload)
321     r = satip_rtp_flush_tcp_data(rtp);
322   return r;
323 }
324 
325 static int
satip_rtp_tcp_loop(satip_rtp_session_t * rtp,uint8_t * data,int len)326 satip_rtp_tcp_loop(satip_rtp_session_t *rtp, uint8_t *data, int len)
327 {
328   int i, j, pid, last_pid = -1, r;
329   mpegts_apid_t *pids = rtp->pids.pids;
330   satip_rtp_table_t *tbl;
331 
332   assert((len % 188) == 0);
333   for ( ; len >= 188 ; data += 188, len -= 188) {
334     pid = ((data[1] & 0x1f) << 8) | data[2];
335     if (pid != last_pid && !rtp->pids.all) {
336       for (i = 0; i < rtp->pids.count; i++) {
337         j = pids[i].pid;
338         if (pid < j) break;
339         if (j == pid) goto found;
340       }
341       continue;
342 found:
343       TAILQ_FOREACH(tbl, &rtp->pmt_tables, link)
344         if (tbl->pid == pid) {
345           dvb_table_parse(&tbl->tbl, "-", data, 188, 1, 0, satip_rtp_pmt_cb);
346           if (rtp->table_data.sb_ptr) {
347             r = satip_rtp_append_tcp_data(rtp, rtp->table_data.sb_data, rtp->table_data.sb_ptr);
348             sbuf_reset(&rtp->table_data, 10*188);
349             if (r)
350               return -1;
351           }
352           break;
353         }
354       if (tbl)
355         continue;
356       last_pid = pid;
357     }
358     r = satip_rtp_append_tcp_data(rtp, data, 188);
359     if (r)
360       return -1;
361   }
362   return 0;
363 }
364 
365 static void
satip_rtp_signal_status(satip_rtp_session_t * rtp,signal_status_t * sig)366 satip_rtp_signal_status(satip_rtp_session_t *rtp, signal_status_t *sig)
367 {
368   pthread_mutex_lock(&rtp->lock);
369   rtp->sig = *sig;
370   pthread_mutex_unlock(&rtp->lock);
371 }
372 
373 static void *
satip_rtp_thread(void * aux)374 satip_rtp_thread(void *aux)
375 {
376   satip_rtp_session_t *rtp = aux;
377   streaming_queue_t *sq = rtp->sq;
378   streaming_message_t *sm;
379   th_subscription_t *subs = rtp->subs;
380   pktbuf_t *pb;
381   char peername[50];
382   int alive = 1, fatal = 0, r;
383   int tcp = rtp->port == RTSP_TCP_DATA;
384 
385   tcp_get_str_from_ip(&rtp->peer, peername, sizeof(peername));
386   tvhdebug(LS_SATIPS, "RTP streaming to %s:%d open", peername,
387            tcp ? ntohs(IP_PORT(rtp->peer)) : rtp->port);
388 
389   pthread_mutex_lock(&sq->sq_mutex);
390   while (rtp->sq && !fatal) {
391     sm = TAILQ_FIRST(&sq->sq_queue);
392     if (sm == NULL) {
393       if (tcp) {
394         r = satip_rtp_flush_tcp_data(rtp);
395       } else {
396         r = satip_rtp_send(rtp);
397       }
398       if (r) {
399         fatal = 1;
400         continue;
401       }
402       tvh_cond_wait(&sq->sq_cond, &sq->sq_mutex);
403       continue;
404     }
405     streaming_queue_remove(sq, sm);
406     pthread_mutex_unlock(&sq->sq_mutex);
407 
408     switch (sm->sm_type) {
409     case SMT_MPEGTS:
410       pb = sm->sm_data;
411       r = pktbuf_len(pb);
412       subscription_add_bytes_out(subs, r);
413       if (r > 0)
414         atomic_set(&rtp->sig_lock, 1);
415       if (atomic_get(&rtp->allow_data)) {
416         pthread_mutex_lock(&rtp->lock);
417         if (tcp)
418           r = satip_rtp_tcp_loop(rtp, pktbuf_ptr(pb), r);
419         else
420           r = satip_rtp_loop(rtp, pktbuf_ptr(pb), r);
421         pthread_mutex_unlock(&rtp->lock);
422         if (r) fatal = 1;
423       }
424       break;
425     case SMT_SIGNAL_STATUS:
426       satip_rtp_signal_status(rtp, sm->sm_data);
427       break;
428     case SMT_NOSTART:
429     case SMT_EXIT:
430       if (rtp->no_data_cb)
431         rtp->no_data_cb(rtp->no_data_opaque);
432       alive = 0;
433       break;
434 
435     case SMT_START:
436     case SMT_STOP:
437     case SMT_NOSTART_WARN:
438     case SMT_PACKET:
439     case SMT_GRACE:
440     case SMT_SKIP:
441     case SMT_SPEED:
442     case SMT_SERVICE_STATUS:
443     case SMT_TIMESHIFT_STATUS:
444     case SMT_DESCRAMBLE_INFO:
445       break;
446     }
447 
448     streaming_msg_free(sm);
449     pthread_mutex_lock(&sq->sq_mutex);
450   }
451   pthread_mutex_unlock(&sq->sq_mutex);
452 
453   tvhdebug(LS_SATIPS, "RTP streaming to %s:%d closed (%s request)%s",
454            peername,
455            tcp ? ntohs(IP_PORT(rtp->peer)) : rtp->port,
456            alive ? "remote" : "streaming",
457            fatal ? " (fatal)" : "");
458 
459   return NULL;
460 }
461 
462 /*
463  *
464  */
satip_rtp_queue(th_subscription_t * subs,streaming_queue_t * sq,http_connection_t * hc,struct sockaddr_storage * peer,int port,int fd_rtp,int fd_rtcp,int frontend,int source,dvb_mux_conf_t * dmc,mpegts_apids_t * pids,int allow_data,int perm_lock,void (* no_data_cb)(void * opaque),void * no_data_opaque)465 void *satip_rtp_queue(th_subscription_t *subs,
466                       streaming_queue_t *sq,
467                       http_connection_t *hc,
468                       struct sockaddr_storage *peer, int port,
469                       int fd_rtp, int fd_rtcp,
470                       int frontend, int source, dvb_mux_conf_t *dmc,
471                       mpegts_apids_t *pids, int allow_data, int perm_lock,
472                       void (*no_data_cb)(void *opaque),
473                       void *no_data_opaque)
474 {
475   satip_rtp_session_t *rtp = calloc(1, sizeof(*rtp));
476   size_t len;
477   socklen_t socklen;
478   int dscp, payload;
479 
480   if (rtp == NULL)
481     return NULL;
482 
483   rtp->peer = *peer;
484   rtp->peer2 = *peer;
485   if (port != RTSP_TCP_DATA)
486     IP_PORT_SET(rtp->peer2, htons(port + 1));
487   rtp->port = port;
488   rtp->fd_rtp = fd_rtp;
489   rtp->fd_rtcp = fd_rtcp;
490   rtp->subs = subs;
491   rtp->sq = sq;
492   rtp->hc = hc;
493   payload = satip_server_conf.satip_rtptcpsize * 188 + 12 + 4;
494   rtp->tcp_payload = MINMAX(payload, RTP_TCP_MIN_PAYLOAD, RTP_TCP_MAX_PAYLOAD);
495   rtp->tcp_buffer_size = 16*1024*1024;
496   rtp->no_data_cb = no_data_cb;
497   rtp->no_data_opaque = no_data_opaque;
498   atomic_set(&rtp->allow_data, allow_data);
499   mpegts_pid_init(&rtp->pids);
500   mpegts_pid_copy(&rtp->pids, pids);
501   TAILQ_INIT(&rtp->pmt_tables);
502   if (port != RTSP_TCP_DATA) {
503     udp_multisend_init(&rtp->um, RTP_PACKETS, RTP_PAYLOAD, &rtp->um_iovec);
504     satip_rtp_header(rtp, rtp->um_iovec, 0);
505   } else {
506     socklen = sizeof(len);
507     if (getsockopt(fd_rtp, SOL_SOCKET, SO_SNDBUF, &len, &socklen) == 0 &&
508         len < RTP_TCP_BUFFER_SIZE) {
509       len = RTP_TCP_BUFFER_SIZE;
510       setsockopt(fd_rtp, SOL_SOCKET, SO_SNDBUF, &len, sizeof(len));
511     }
512     socklen = sizeof(len);
513     if (getsockopt(fd_rtp, SOL_SOCKET, SO_SNDBUF, &len, &socklen) == 0) {
514       rtp->tcp_buffer_size = len;
515       if (len - RTP_TCP_BUFFER_ROOM < rtp->tcp_payload) {
516         rtp->tcp_payload = len - RTP_TCP_BUFFER_ROOM;
517         rtp->tcp_payload -= rtp->tcp_payload % 188;
518         rtp->tcp_payload += 12 + 4;
519       }
520     }
521   }
522   rtp->frontend = frontend;
523   rtp->dmc = *dmc;
524   rtp->source = source;
525   pthread_mutex_init(&rtp->lock, NULL);
526 
527   dscp = config.dscp >= 0 ? config.dscp : IPTOS_DSCP_EF;
528   socket_set_dscp(rtp->fd_rtp, dscp, NULL, 0);
529   if (rtp->fd_rtcp >= 0)
530     socket_set_dscp(rtp->fd_rtcp, dscp, NULL, 0);
531 
532   if (perm_lock) {
533     int tmp = ((satip_server_conf.satip_iptv_sig_level * 0xffff) + 0x8000) / 245;
534     rtp->sig.signal_scale = SIGNAL_STATUS_SCALE_RELATIVE;
535     rtp->sig.signal = MINMAX(tmp, 0, 0xffff);
536     rtp->sig.snr_scale = SIGNAL_STATUS_SCALE_RELATIVE;
537     rtp->sig.snr = MAX(0xffff, (rtp->sig.signal * 3) / 2);
538   }
539 
540   tvhtrace(LS_SATIPS, "rtp queue %p", rtp);
541 
542   pthread_mutex_lock(&satip_rtp_lock);
543   TAILQ_INSERT_TAIL(&satip_rtp_sessions, rtp, link);
544   tvhthread_create(&rtp->tid, NULL, satip_rtp_thread, rtp, "satip-rtp");
545   pthread_mutex_unlock(&satip_rtp_lock);
546   return rtp;
547 }
548 
satip_rtp_allow_data(void * _rtp)549 void satip_rtp_allow_data(void *_rtp)
550 {
551   satip_rtp_session_t *rtp = _rtp;
552 
553   if (rtp == NULL)
554     return;
555   pthread_mutex_lock(&satip_rtp_lock);
556   atomic_set(&rtp->allow_data, 1);
557   pthread_mutex_unlock(&satip_rtp_lock);
558 }
559 
satip_rtp_update_pids(void * _rtp,mpegts_apids_t * pids)560 void satip_rtp_update_pids(void *_rtp, mpegts_apids_t *pids)
561 {
562   satip_rtp_session_t *rtp = _rtp;
563 
564   if (rtp == NULL)
565     return;
566   pthread_mutex_lock(&satip_rtp_lock);
567   pthread_mutex_lock(&rtp->lock);
568   mpegts_pid_copy(&rtp->pids, pids);
569   pthread_mutex_unlock(&rtp->lock);
570   pthread_mutex_unlock(&satip_rtp_lock);
571 }
572 
satip_rtp_update_pmt_pids(void * _rtp,mpegts_apids_t * pmt_pids)573 void satip_rtp_update_pmt_pids(void *_rtp, mpegts_apids_t *pmt_pids)
574 {
575   satip_rtp_session_t *rtp = _rtp;
576   satip_rtp_table_t *tbl, *tbl_next;
577   int i, pid;
578 
579   if (rtp == NULL)
580     return;
581   pthread_mutex_lock(&satip_rtp_lock);
582   pthread_mutex_lock(&rtp->lock);
583   TAILQ_FOREACH(tbl, &rtp->pmt_tables, link)
584     if (!mpegts_pid_rexists(pmt_pids, tbl->pid))
585       tbl->remove_mark = 1;
586   for (i = 0; i < pmt_pids->count; i++) {
587     pid = pmt_pids->pids[i].pid;
588     TAILQ_FOREACH(tbl, &rtp->pmt_tables, link)
589       if (tbl->pid == pid)
590         break;
591     if (!tbl) {
592       tbl = calloc(1, sizeof(*tbl));
593       dvb_table_parse_init(&tbl->tbl, "satip-pmt", LS_TBL_SATIP, pid,
594                            DVB_PMT_BASE, DVB_PMT_MASK, rtp);
595       tbl->pid = pid;
596       TAILQ_INSERT_TAIL(&rtp->pmt_tables, tbl, link);
597     }
598   }
599   for (tbl = TAILQ_FIRST(&rtp->pmt_tables); tbl; tbl = tbl_next){
600     tbl_next = TAILQ_NEXT(tbl, link);
601     if (tbl->remove_mark) {
602       TAILQ_REMOVE(&rtp->pmt_tables, tbl, link);
603       free(tbl);
604     }
605   }
606   pthread_mutex_unlock(&rtp->lock);
607   pthread_mutex_unlock(&satip_rtp_lock);
608 }
609 
satip_rtp_close(void * _rtp)610 void satip_rtp_close(void *_rtp)
611 {
612   satip_rtp_session_t *rtp = _rtp;
613   satip_rtp_table_t *tbl;
614   streaming_queue_t *sq;
615 
616   if (rtp == NULL)
617     return;
618   pthread_mutex_lock(&satip_rtp_lock);
619   tvhtrace(LS_SATIPS, "rtp close %p", rtp);
620   TAILQ_REMOVE(&satip_rtp_sessions, rtp, link);
621   sq = rtp->sq;
622   pthread_mutex_lock(&sq->sq_mutex);
623   rtp->sq = NULL;
624   tvh_cond_signal(&sq->sq_cond, 0);
625   pthread_mutex_unlock(&sq->sq_mutex);
626   pthread_mutex_unlock(&satip_rtp_lock);
627   pthread_join(rtp->tid, NULL);
628   if (rtp->port == RTSP_TCP_DATA) {
629     http_extra_destroy(rtp->hc);
630     free(rtp->tcp_data.iov_base);
631   } else {
632     udp_multisend_free(&rtp->um);
633   }
634   mpegts_pid_done(&rtp->pids);
635   while ((tbl = TAILQ_FIRST(&rtp->pmt_tables)) != NULL) {
636     dvb_table_parse_done(&tbl->tbl);
637     TAILQ_REMOVE(&rtp->pmt_tables, tbl, link);
638     free(tbl);
639   }
640   pthread_mutex_destroy(&rtp->lock);
641   free(rtp);
642 }
643 
644 /*
645  *
646  */
647 static const char *
satip_rtcp_pol(int pol)648 satip_rtcp_pol(int pol)
649 {
650   switch (pol) {
651   case DVB_POLARISATION_HORIZONTAL:
652     return "h";
653   case DVB_POLARISATION_VERTICAL:
654     return "v";
655   case DVB_POLARISATION_CIRCULAR_LEFT:
656     return "l";
657   case DVB_POLARISATION_CIRCULAR_RIGHT:
658     return "r";
659   case DVB_POLARISATION_OFF:
660     return "off";
661   default:
662     return "";
663   }
664 }
665 
666 /*
667  *
668  */
669 static const char *
satip_rtcp_fec(int fec)670 satip_rtcp_fec(int fec)
671 {
672   static char buf[16];
673   char *p = buf;
674   const char *s;
675 
676   if (fec == DVB_FEC_AUTO || fec == DVB_FEC_NONE)
677     return "";
678   s = dvb_fec2str(fec);
679   if (s == NULL)
680     return "";
681   strlcpy(buf, s, sizeof(buf));
682   p = strchr(buf, '/');
683   while (p && *p) {
684     *p = *(p+1);
685     p++;
686   }
687   return buf;
688 }
689 
690 /*
691  *
692  */
693 static int
satip_status_build(satip_rtp_session_t * rtp,char * buf,int len)694 satip_status_build(satip_rtp_session_t *rtp, char *buf, int len)
695 {
696   char pids[1400];
697   const char *delsys, *msys, *pilot, *rolloff;
698   const char *bw, *tmode, *gi, *plp, *t2id, *sm, *c2tft, *ds, *specinv;
699   int r, level = 0, lock = 0, quality = 0;
700 
701   lock = atomic_get(&rtp->sig_lock);
702   if (satip_server_conf.satip_force_sig_level > 0) {
703     level = MINMAX(satip_server_conf.satip_force_sig_level, 1, 240);
704     quality = MAX((level + 15) / 15, 15);
705   } else {
706     switch (rtp->sig.signal_scale) {
707     case SIGNAL_STATUS_SCALE_RELATIVE:
708       level = MINMAX((rtp->sig.signal * 245) / 0xffff, 0, 240);
709       break;
710     case SIGNAL_STATUS_SCALE_DECIBEL:
711       level = MINMAX((rtp->sig.signal + 90000) / 375, 0, 240);
712       break;
713     default:
714       level = lock ? 120 : 0;
715       break;
716     }
717     switch (rtp->sig.snr_scale) {
718     case SIGNAL_STATUS_SCALE_RELATIVE:
719       quality = MINMAX((rtp->sig.snr * 16) / 0xffff, 0, 15);
720       break;
721     case SIGNAL_STATUS_SCALE_DECIBEL:
722       quality = MINMAX(rtp->sig.snr / 2000, 0, 15);
723       break;
724     default:
725       quality = lock ? 10 : 0;
726       break;
727     }
728   }
729 
730   mpegts_pid_dump(&rtp->pids, pids, sizeof(pids), 0, 0);
731 
732   switch (rtp->dmc.dmc_fe_delsys) {
733   case DVB_SYS_DVBS:
734   case DVB_SYS_DVBS2:
735     delsys = rtp->dmc.dmc_fe_delsys == DVB_SYS_DVBS ? "dvbs" : "dvbs2";
736     switch (rtp->dmc.dmc_fe_modulation) {
737     case DVB_MOD_QPSK:  msys = "qpsk"; break;
738     case DVB_MOD_PSK_8: msys = "8psk"; break;
739     default:            msys = ""; break;
740     }
741     switch (rtp->dmc.dmc_fe_pilot) {
742     case DVB_PILOT_ON:  pilot = "on"; break;
743     case DVB_PILOT_OFF: pilot = "off"; break;
744     default:            pilot = ""; break;
745     }
746     switch (rtp->dmc.dmc_fe_rolloff) {
747     case DVB_ROLLOFF_20: rolloff = "20"; break;
748     case DVB_ROLLOFF_25: rolloff = "25"; break;
749     case DVB_ROLLOFF_35: rolloff = "35"; break;
750     default:             rolloff = ""; break;
751     }
752     /* ver=<major>.<minor>;src=<srcID>;tuner=<feID>,<level>,<lock>,<quality>,<frequency>,<polarisation>,\
753      * <system>,<type>,<pilots>,<roll_off>,<symbol_rate>,<fec_inner>;pids=<pid0>,...,<pidn>
754      */
755     r = snprintf(buf, len,
756       "ver=1.0;src=%d;tuner=%d,%d,%d,%d,%.f,%s,%s,%s,%s,%s,%.f,%s;pids=%s",
757       rtp->source, rtp->frontend, level, lock, quality,
758       (float)rtp->dmc.dmc_fe_freq / 1000.0,
759       satip_rtcp_pol(rtp->dmc.u.dmc_fe_qpsk.polarisation),
760       delsys, msys, pilot, rolloff,
761       (float)rtp->dmc.u.dmc_fe_qpsk.symbol_rate / 1000.0,
762       satip_rtcp_fec(rtp->dmc.u.dmc_fe_qpsk.fec_inner),
763       pids);
764     break;
765   case DVB_SYS_DVBT:
766   case DVB_SYS_DVBT2:
767     delsys = rtp->dmc.dmc_fe_delsys == DVB_SYS_DVBT ? "dvbt" : "dvbt2";
768     switch (rtp->dmc.u.dmc_fe_ofdm.bandwidth) {
769     case DVB_BANDWIDTH_1_712_MHZ:  bw = "1.712"; break;
770     case DVB_BANDWIDTH_5_MHZ:      bw = "5"; break;
771     case DVB_BANDWIDTH_6_MHZ:      bw = "6"; break;
772     case DVB_BANDWIDTH_7_MHZ:      bw = "7"; break;
773     case DVB_BANDWIDTH_8_MHZ:      bw = "8"; break;
774     case DVB_BANDWIDTH_10_MHZ:     bw = "10"; break;
775     default:                       bw = ""; break;
776     }
777     switch (rtp->dmc.u.dmc_fe_ofdm.transmission_mode) {
778     case DVB_TRANSMISSION_MODE_1K:  tmode = "1k"; break;
779     case DVB_TRANSMISSION_MODE_2K:  tmode = "2k"; break;
780     case DVB_TRANSMISSION_MODE_4K:  tmode = "4k"; break;
781     case DVB_TRANSMISSION_MODE_8K:  tmode = "8k"; break;
782     case DVB_TRANSMISSION_MODE_16K: tmode = "16k"; break;
783     case DVB_TRANSMISSION_MODE_32K: tmode = "32k"; break;
784     default:                        tmode = ""; break;
785     }
786     switch (rtp->dmc.dmc_fe_modulation) {
787     case DVB_MOD_QAM_16:  msys = "qam16"; break;
788     case DVB_MOD_QAM_32:  msys = "qam32"; break;
789     case DVB_MOD_QAM_64:  msys = "qam64"; break;
790     case DVB_MOD_QAM_128: msys = "qam128"; break;
791     default:              msys = ""; break;
792     }
793     switch (rtp->dmc.u.dmc_fe_ofdm.guard_interval) {
794     case DVB_GUARD_INTERVAL_1_4:    gi = "14"; break;
795     case DVB_GUARD_INTERVAL_1_8:    gi = "18"; break;
796     case DVB_GUARD_INTERVAL_1_16:   gi = "116"; break;
797     case DVB_GUARD_INTERVAL_1_32:   gi = "132"; break;
798     case DVB_GUARD_INTERVAL_1_128:  gi = "1128"; break;
799     case DVB_GUARD_INTERVAL_19_128: gi = "19128"; break;
800     case DVB_GUARD_INTERVAL_19_256: gi = "19256"; break;
801     default:                        gi = ""; break;
802     }
803     plp = "";
804     t2id = "";
805     sm = "";
806     /* ver=1.1;tuner=<feID>,<level>,<lock>,<quality>,<freq>,<bw>,<msys>,<tmode>,<mtype>,<gi>,\
807      * <fec>,<plp>,<t2id>,<sm>;pids=<pid0>,...,<pidn>
808      */
809     r = snprintf(buf, len,
810       "ver=1.1;tuner=%d,%d,%d,%d,%.f,%s,%s,%s,%s,%s,%s,%s,%s,%s;pids=%s",
811       rtp->frontend, level, lock, quality,
812       (float)rtp->dmc.dmc_fe_freq / 1000000.0,
813       bw, delsys, tmode, msys, gi,
814       satip_rtcp_fec(rtp->dmc.u.dmc_fe_ofdm.code_rate_HP),
815       plp, t2id, sm, pids);
816     break;
817   case DVB_SYS_DVBC_ANNEX_A:
818   case DVB_SYS_DVBC_ANNEX_C:
819     delsys = rtp->dmc.dmc_fe_delsys == DVB_SYS_DVBC_ANNEX_A ? "dvbc" : "dvbc2";
820     bw = "";
821     switch (rtp->dmc.dmc_fe_modulation) {
822     case DVB_MOD_QAM_16:  msys = "qam16"; break;
823     case DVB_MOD_QAM_32:  msys = "qam32"; break;
824     case DVB_MOD_QAM_64:  msys = "qam64"; break;
825     case DVB_MOD_QAM_128: msys = "qam128"; break;
826     default:              msys = ""; break;
827     }
828     c2tft = "";
829     ds = "";
830     plp = "";
831     specinv = "";
832     /* ver=1.2;tuner=<feID>,<level>,<lock>,<quality>,<freq>,<bw>,<msys>,<mtype>,<sr>,<c2tft>,<ds>,<plp>,
833      * <specinv>;pids=<pid0>,...,<pidn>
834      */
835     r = snprintf(buf, len,
836       "ver=1.1;tuner=%d,%d,%d,%d,%.f,%s,%s,%s,%.f,%s,%s,%s,%s;pids=%s",
837       rtp->frontend, level, lock, quality,
838       (float)rtp->dmc.dmc_fe_freq / 1000000.0,
839       bw, delsys, msys,
840       (float)rtp->dmc.u.dmc_fe_qam.symbol_rate / 1000.0,
841       c2tft, ds, plp, specinv, pids);
842     break;
843   default:
844     r = snprintf(buf, len, "ver=1.0;src=%d;tuner=%d,%d,%d,%d,,,,,,,,;pids=%s",
845                  rtp->source, rtp->frontend, level, lock, quality, pids);
846     break;
847   }
848 
849   return r >= len ? len - 1 : r;
850 }
851 
852 /*
853  *
854  */
satip_rtp_status(void * _rtp,char * buf,int len)855 int satip_rtp_status(void *_rtp, char *buf, int len)
856 {
857   satip_rtp_session_t *rtp = _rtp;
858   int r = 0;
859 
860   buf[0] = '\0';
861   if (rtp == NULL)
862     return 0;
863   pthread_mutex_lock(&satip_rtp_lock);
864   pthread_mutex_lock(&rtp->lock);
865   r = satip_status_build(rtp, buf, len);
866   pthread_mutex_unlock(&rtp->lock);
867   pthread_mutex_unlock(&satip_rtp_lock);
868   return r;
869 }
870 
871 /*
872  *
873  */
874 static int
satip_rtcp_build(satip_rtp_session_t * rtp,uint8_t * msg)875 satip_rtcp_build(satip_rtp_session_t *rtp, uint8_t *msg)
876 {
877   char buf[1500];
878   int len, len2;
879 
880   pthread_mutex_lock(&rtp->lock);
881   len = satip_status_build(rtp, buf, sizeof(buf));
882   pthread_mutex_unlock(&rtp->lock);
883 
884   len = len2 = MIN(len, RTCP_PAYLOAD - 16);
885   if (len == 0)
886     len++;
887   while ((len % 4) != 0)
888     buf[len++] = 0;
889   memcpy(msg + 16, buf, len);
890 
891   len += 16;
892   msg[0] = 0x80;
893   msg[1] = 204;
894   msg[2] = (((len - 1) / 4) >> 8) & 0xff;
895   msg[3] = ((len - 1) / 4) & 0xff;
896   msg[4] = 0;
897   msg[5] = 0;
898   msg[6] = 0;
899   msg[7] = 0;
900   msg[8] = 'S';
901   msg[9] = 'E';
902   msg[10] = 'S';
903   msg[11] = '1';
904   msg[12] = 0;
905   msg[13] = 0;
906   msg[14] = (len2 >> 8) & 0xff;
907   msg[15] = len2 & 0xff;
908 
909   return len;
910 }
911 
912 /*
913  *
914  */
915 static void *
satip_rtcp_thread(void * aux)916 satip_rtcp_thread(void *aux)
917 {
918   satip_rtp_session_t *rtp;
919   int64_t us;
920   uint8_t msg[RTCP_PAYLOAD+1], *msg1;
921   char addrbuf[50];
922   int r, len, err;
923 
924   tvhtrace(LS_SATIPS, "starting rtcp thread");
925   while (atomic_get(&satip_rtcp_run)) {
926     us = 150000;
927     do {
928       us = tvh_usleep(us);
929       if (us < 0)
930         goto end;
931       if (!atomic_get(&satip_rtcp_run))
932         goto end;
933     } while (us > 0);
934     pthread_mutex_lock(&satip_rtp_lock);
935     TAILQ_FOREACH(rtp, &satip_rtp_sessions, link) {
936       if (rtp->sq == NULL || rtp->disable_rtcp) continue;
937       len = satip_rtcp_build(rtp, msg);
938       if (len <= 0) continue;
939       if (tvhtrace_enabled()) {
940         msg[len] = '\0';
941         tcp_get_str_from_ip(&rtp->peer2, addrbuf, sizeof(addrbuf));
942         tvhtrace(LS_SATIPS, "RTCP send to %s:%d : %s", addrbuf, ntohs(IP_PORT(rtp->peer2)), msg + 16);
943       }
944       if (rtp->port == RTSP_TCP_DATA) {
945         msg1 = malloc(len);
946         if (msg1) {
947           memcpy(msg1, msg, len);
948           err = satip_rtp_tcp_data(rtp, 1, msg1, len, 0);
949           r = err ? -1 : 0;
950         } else {
951           r = -1;
952           err = ENOMEM;
953         }
954       } else {
955         r = sendto(rtp->fd_rtcp, msg, len, 0,
956                    (struct sockaddr*)&rtp->peer2,
957                    rtp->peer2.ss_family == AF_INET6 ?
958                      sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in));
959         err = errno;
960       }
961       if (r < 0) {
962         if (err != ECONNREFUSED) {
963           tcp_get_str_from_ip(&rtp->peer2, addrbuf, sizeof(addrbuf));
964           tvhwarn(LS_SATIPS, "RTCP send to error %s:%d : %s",
965                   addrbuf, ntohs(IP_PORT(rtp->peer2)), strerror(err));
966         } else {
967           rtp->disable_rtcp = 1;
968         }
969       }
970     }
971     pthread_mutex_unlock(&satip_rtp_lock);
972   }
973 end:
974   return NULL;
975 }
976 
977 /*
978  *
979  */
satip_rtp_init(int boot)980 void satip_rtp_init(int boot)
981 {
982   TAILQ_INIT(&satip_rtp_sessions);
983   pthread_mutex_init(&satip_rtp_lock, NULL);
984 
985   if (boot)
986     atomic_set(&satip_rtcp_run, 0);
987 
988   if (!boot && !atomic_get(&satip_rtcp_run)) {
989     atomic_set(&satip_rtcp_run, 1);
990     tvhthread_create(&satip_rtcp_tid, NULL, satip_rtcp_thread, NULL, "satip-rtcp");
991   }
992 }
993 
994 /*
995  *
996  */
satip_rtp_done(void)997 void satip_rtp_done(void)
998 {
999   assert(TAILQ_EMPTY(&satip_rtp_sessions));
1000   if (atomic_get(&satip_rtcp_run)) {
1001     atomic_set(&satip_rtcp_run, 0);
1002     pthread_kill(satip_rtcp_tid, SIGTERM);
1003     pthread_join(satip_rtcp_tid, NULL);
1004   }
1005 }
1006