1 /*
2 * Tvheadend - SAT-IP server - RTP part
3 *
4 * Copyright (C) 2015 Jaroslav Kysela
5 *
6 * This program is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include <signal.h>
21 #include <ctype.h>
22 #include "tvheadend.h"
23 #include "config.h"
24 #include "input.h"
25 #include "streaming.h"
26 #include "satip/server.h"
27 #include <netinet/ip.h>
28 #if ENABLE_ANDROID
29 #include <sys/socket.h>
30 #endif
31 #define COMPAT_IPTOS
32 #include "compat.h"
33
34 #define RTP_PACKETS 128
35 #define RTP_PAYLOAD (7*188+12)
36 #define RTP_TCP_MIN_PAYLOAD (7*188+12+4) /* fit ethernet packet */
37 #define RTP_TCP_MAX_PAYLOAD (348*188+12+4) /* cca 64kB */
38 #define RTCP_PAYLOAD (1420)
39
40 #define RTP_TCP_BUFFER_SIZE (64*1024*1024)
41 #define RTP_TCP_BUFFER_ROOM (2048)
42
43 typedef struct satip_rtp_table {
44 TAILQ_ENTRY(satip_rtp_table) link;
45 mpegts_psi_table_t tbl;
46 int pid;
47 int remove_mark;
48 } satip_rtp_table_t;
49
50 typedef struct satip_rtp_session {
51 TAILQ_ENTRY(satip_rtp_session) link;
52 pthread_t tid;
53 struct sockaddr_storage peer;
54 struct sockaddr_storage peer2;
55 int port;
56 th_subscription_t *subs;
57 streaming_queue_t *sq;
58 int fd_rtp;
59 int fd_rtcp;
60 int frontend;
61 int source;
62 int allow_data;
63 int disable_rtcp;
64 dvb_mux_conf_t dmc;
65 mpegts_apids_t pids;
66 TAILQ_HEAD(, satip_rtp_table) pmt_tables;
67 udp_multisend_t um;
68 struct iovec *um_iovec;
69 struct iovec tcp_data;
70 uint32_t tcp_payload;
71 uint32_t tcp_buffer_size;
72 int um_packet;
73 uint16_t seq;
74 signal_status_t sig;
75 int sig_lock;
76 pthread_mutex_t lock;
77 http_connection_t *hc;
78 sbuf_t table_data;
79 void (*no_data_cb)(void *opaque);
80 void *no_data_opaque;
81 } satip_rtp_session_t;
82
83 static pthread_mutex_t satip_rtp_lock;
84 static pthread_t satip_rtcp_tid;
85 static int satip_rtcp_run;
86 static TAILQ_HEAD(, satip_rtp_session) satip_rtp_sessions;
87
88 static void
satip_rtp_pmt_cb(mpegts_psi_table_t * mt,const uint8_t * buf,int len)89 satip_rtp_pmt_cb(mpegts_psi_table_t *mt, const uint8_t *buf, int len)
90 {
91 satip_rtp_session_t *rtp;
92 uint8_t out[1024], *ob;
93 // uint16_t sid, pid;
94 int l, ol;
95
96 memcpy(out, buf, ol = 3);
97 buf += ol;
98 len -= ol;
99
100 // sid = (buf[0] << 8) | buf[1];
101 l = (buf[7] & 0x0f) << 8 | buf[8];
102
103 if (l > len - 9)
104 return;
105
106 rtp = (satip_rtp_session_t *)mt->mt_opaque;
107
108 memcpy(out + ol, buf, 9);
109
110 ol += 9; /* skip common descriptors */
111 buf += 9 + l;
112 len -= 9 + l;
113
114 /* no common descriptors */
115 out[7+3] &= 0xf0;
116 out[8+3] = 0;
117
118 while (len >= 5) {
119 //pid = (buf[1] & 0x1f) << 8 | buf[2];
120 l = (buf[3] & 0xf) << 8 | buf[4];
121
122 if (l > len - 5)
123 return;
124
125 if (sizeof(out) < ol + l + 5 + 4 /* crc */) {
126 tvherror(LS_PASS, "PMT entry too long (%i)", l);
127 return;
128 }
129
130 memcpy(out + ol, buf, 5 + l);
131 ol += 5 + l;
132
133 buf += 5 + l;
134 len -= 5 + l;
135 }
136
137 /* update section length */
138 out[1] = (out[1] & 0xf0) | ((ol + 4 - 3) >> 8);
139 out[2] = (ol + 4 - 3) & 0xff;
140
141 ol = dvb_table_append_crc32(out, ol, sizeof(out));
142
143 if (ol > 0 && (l = dvb_table_remux(mt, out, ol, &ob)) > 0)
144 sbuf_append(&rtp->table_data, ob, l);
145 }
146
147 static void
satip_rtp_header(satip_rtp_session_t * rtp,struct iovec * v,uint32_t off)148 satip_rtp_header(satip_rtp_session_t *rtp, struct iovec *v, uint32_t off)
149 {
150 uint8_t *data = v->iov_base;
151 uint32_t tstamp = mono2sec(mclk()) + rtp->seq;
152
153 rtp->seq++;
154
155 v->iov_len = off + 12;
156 data[off+0] = 0x80;
157 data[off+1] = 33;
158 data[off+2] = (rtp->seq >> 8) & 0xff;
159 data[off+3] = rtp->seq & 0xff;
160 data[off+4] = (tstamp >> 24) & 0xff;
161 data[off+5] = (tstamp >> 16) & 0xff;
162 data[off+6] = (tstamp >> 8) & 0xff;
163 data[off+7] = tstamp & 0xff;
164 memset(data + off + 8, 0xa5, 4);
165 }
166
167 static int
satip_rtp_send(satip_rtp_session_t * rtp)168 satip_rtp_send(satip_rtp_session_t *rtp)
169 {
170 struct iovec *v = rtp->um_iovec, *v2;
171 int packets, copy, len, r;
172 if (v->iov_len == RTP_PAYLOAD) {
173 packets = rtp->um_packet;
174 v2 = v + packets;
175 copy = 1;
176 if (v2->iov_len == RTP_PAYLOAD) {
177 packets++;
178 copy = 0;
179 }
180 while (1) {
181 r = udp_multisend_send(&rtp->um, rtp->fd_rtp, packets);
182 if (r < 0) {
183 if (errno == EINTR)
184 continue;
185 if (errno == EAGAIN || errno == EWOULDBLOCK) {
186 tvh_usleep(100);
187 continue;
188 }
189 tvhtrace(LS_SATIPS, "rtp udp multisend failed (errno %d)", errno);
190 return r;
191 }
192 break;
193 }
194 if (r != packets) {
195 tvhtrace(LS_SATIPS, "rtp udp multisend failed (packets %d written %d)", packets, r);
196 return -1;
197 }
198 if (copy)
199 memcpy(v->iov_base, v2->iov_base, len = v2->iov_len);
200 else
201 len = 0;
202 rtp->um_packet = 0;
203 udp_multisend_clean(&rtp->um);
204 v->iov_len = len;
205 }
206 if (v->iov_len == 0)
207 satip_rtp_header(rtp, rtp->um_iovec + rtp->um_packet, 0);
208 return 0;
209 }
210
211 static inline int
satip_rtp_append_data(satip_rtp_session_t * rtp,struct iovec ** _v,uint8_t * data)212 satip_rtp_append_data(satip_rtp_session_t *rtp, struct iovec **_v, uint8_t *data)
213 {
214 struct iovec *v = *_v;
215 int r;
216 assert(v->iov_len + 188 <= RTP_PAYLOAD);
217 memcpy(v->iov_base + v->iov_len, data, 188);
218 v->iov_len += 188;
219 if (v->iov_len == RTP_PAYLOAD) {
220 if ((rtp->um_packet + 1) == RTP_PACKETS) {
221 r = satip_rtp_send(rtp);
222 if (r < 0)
223 return r;
224 } else {
225 rtp->um_packet++;
226 satip_rtp_header(rtp, rtp->um_iovec + rtp->um_packet, 0);
227 }
228 *_v = rtp->um_iovec + rtp->um_packet;
229 } else {
230 assert(v->iov_len < RTP_PAYLOAD);
231 }
232 return 0;
233 }
234
235 static int
satip_rtp_loop(satip_rtp_session_t * rtp,uint8_t * data,int len)236 satip_rtp_loop(satip_rtp_session_t *rtp, uint8_t *data, int len)
237 {
238 int i, j, pid, last_pid = -1, r;
239 mpegts_apid_t *pids = rtp->pids.pids;
240 struct iovec *v = rtp->um_iovec + rtp->um_packet;
241 satip_rtp_table_t *tbl;
242
243 assert((len % 188) == 0);
244 for ( ; len >= 188 ; data += 188, len -= 188) {
245 pid = ((data[1] & 0x1f) << 8) | data[2];
246 if (pid != last_pid && !rtp->pids.all) {
247 for (i = 0; i < rtp->pids.count; i++) {
248 j = pids[i].pid;
249 if (pid < j) break;
250 if (j == pid) goto found;
251 }
252 continue;
253 found:
254 TAILQ_FOREACH(tbl, &rtp->pmt_tables, link)
255 if (tbl->pid == pid) {
256 dvb_table_parse(&tbl->tbl, "-", data, 188, 1, 0, satip_rtp_pmt_cb);
257 if (rtp->table_data.sb_ptr > 0) {
258 for (i = r = 0; i < rtp->table_data.sb_ptr; i += 188) {
259 r = satip_rtp_append_data(rtp, &v, rtp->table_data.sb_data + i);
260 if (r)
261 break;
262 }
263 sbuf_reset(&rtp->table_data, 10*188);
264 if (r)
265 return r;
266 }
267 break;
268 }
269 if (tbl)
270 continue;
271 last_pid = pid;
272 }
273 r = satip_rtp_append_data(rtp, &v, data);
274 if (r < 0)
275 return r;
276 }
277 return 0;
278 }
279
280 static int
satip_rtp_tcp_data(satip_rtp_session_t * rtp,uint8_t stream,uint8_t * data,size_t data_len,int may_discard)281 satip_rtp_tcp_data(satip_rtp_session_t *rtp, uint8_t stream,
282 uint8_t *data, size_t data_len, int may_discard)
283 {
284 assert(data_len <= 0xffff);
285 data[0] = '$';
286 data[1] = stream;
287 data[2] = (data_len - 4) >> 8;
288 data[3] = (data_len - 4) & 0xff;
289 return http_extra_send_prealloc(rtp->hc, data, data_len, may_discard);
290 }
291
292 static inline int
satip_rtp_flush_tcp_data(satip_rtp_session_t * rtp)293 satip_rtp_flush_tcp_data(satip_rtp_session_t *rtp)
294 {
295 struct iovec *v = &rtp->tcp_data;
296 int r = 0;
297
298 if (v->iov_len)
299 r = satip_rtp_tcp_data(rtp, 0, v->iov_base, v->iov_len, 1);
300 else
301 free(v->iov_base);
302 v->iov_base = NULL;
303 v->iov_len = 0;
304 return r;
305 }
306
307 static inline int
satip_rtp_append_tcp_data(satip_rtp_session_t * rtp,uint8_t * data,size_t len)308 satip_rtp_append_tcp_data(satip_rtp_session_t *rtp, uint8_t *data, size_t len)
309 {
310 struct iovec *v = &rtp->tcp_data;
311 int r = 0;
312
313 if (v->iov_base == NULL) {
314 v->iov_base = malloc(rtp->tcp_payload);
315 satip_rtp_header(rtp, v, 4);
316 }
317 assert(v->iov_len + len <= rtp->tcp_payload);
318 memcpy(v->iov_base + v->iov_len, data, len);
319 v->iov_len += len;
320 if (v->iov_len == rtp->tcp_payload)
321 r = satip_rtp_flush_tcp_data(rtp);
322 return r;
323 }
324
325 static int
satip_rtp_tcp_loop(satip_rtp_session_t * rtp,uint8_t * data,int len)326 satip_rtp_tcp_loop(satip_rtp_session_t *rtp, uint8_t *data, int len)
327 {
328 int i, j, pid, last_pid = -1, r;
329 mpegts_apid_t *pids = rtp->pids.pids;
330 satip_rtp_table_t *tbl;
331
332 assert((len % 188) == 0);
333 for ( ; len >= 188 ; data += 188, len -= 188) {
334 pid = ((data[1] & 0x1f) << 8) | data[2];
335 if (pid != last_pid && !rtp->pids.all) {
336 for (i = 0; i < rtp->pids.count; i++) {
337 j = pids[i].pid;
338 if (pid < j) break;
339 if (j == pid) goto found;
340 }
341 continue;
342 found:
343 TAILQ_FOREACH(tbl, &rtp->pmt_tables, link)
344 if (tbl->pid == pid) {
345 dvb_table_parse(&tbl->tbl, "-", data, 188, 1, 0, satip_rtp_pmt_cb);
346 if (rtp->table_data.sb_ptr) {
347 r = satip_rtp_append_tcp_data(rtp, rtp->table_data.sb_data, rtp->table_data.sb_ptr);
348 sbuf_reset(&rtp->table_data, 10*188);
349 if (r)
350 return -1;
351 }
352 break;
353 }
354 if (tbl)
355 continue;
356 last_pid = pid;
357 }
358 r = satip_rtp_append_tcp_data(rtp, data, 188);
359 if (r)
360 return -1;
361 }
362 return 0;
363 }
364
365 static void
satip_rtp_signal_status(satip_rtp_session_t * rtp,signal_status_t * sig)366 satip_rtp_signal_status(satip_rtp_session_t *rtp, signal_status_t *sig)
367 {
368 pthread_mutex_lock(&rtp->lock);
369 rtp->sig = *sig;
370 pthread_mutex_unlock(&rtp->lock);
371 }
372
373 static void *
satip_rtp_thread(void * aux)374 satip_rtp_thread(void *aux)
375 {
376 satip_rtp_session_t *rtp = aux;
377 streaming_queue_t *sq = rtp->sq;
378 streaming_message_t *sm;
379 th_subscription_t *subs = rtp->subs;
380 pktbuf_t *pb;
381 char peername[50];
382 int alive = 1, fatal = 0, r;
383 int tcp = rtp->port == RTSP_TCP_DATA;
384
385 tcp_get_str_from_ip(&rtp->peer, peername, sizeof(peername));
386 tvhdebug(LS_SATIPS, "RTP streaming to %s:%d open", peername,
387 tcp ? ntohs(IP_PORT(rtp->peer)) : rtp->port);
388
389 pthread_mutex_lock(&sq->sq_mutex);
390 while (rtp->sq && !fatal) {
391 sm = TAILQ_FIRST(&sq->sq_queue);
392 if (sm == NULL) {
393 if (tcp) {
394 r = satip_rtp_flush_tcp_data(rtp);
395 } else {
396 r = satip_rtp_send(rtp);
397 }
398 if (r) {
399 fatal = 1;
400 continue;
401 }
402 tvh_cond_wait(&sq->sq_cond, &sq->sq_mutex);
403 continue;
404 }
405 streaming_queue_remove(sq, sm);
406 pthread_mutex_unlock(&sq->sq_mutex);
407
408 switch (sm->sm_type) {
409 case SMT_MPEGTS:
410 pb = sm->sm_data;
411 r = pktbuf_len(pb);
412 subscription_add_bytes_out(subs, r);
413 if (r > 0)
414 atomic_set(&rtp->sig_lock, 1);
415 if (atomic_get(&rtp->allow_data)) {
416 pthread_mutex_lock(&rtp->lock);
417 if (tcp)
418 r = satip_rtp_tcp_loop(rtp, pktbuf_ptr(pb), r);
419 else
420 r = satip_rtp_loop(rtp, pktbuf_ptr(pb), r);
421 pthread_mutex_unlock(&rtp->lock);
422 if (r) fatal = 1;
423 }
424 break;
425 case SMT_SIGNAL_STATUS:
426 satip_rtp_signal_status(rtp, sm->sm_data);
427 break;
428 case SMT_NOSTART:
429 case SMT_EXIT:
430 if (rtp->no_data_cb)
431 rtp->no_data_cb(rtp->no_data_opaque);
432 alive = 0;
433 break;
434
435 case SMT_START:
436 case SMT_STOP:
437 case SMT_NOSTART_WARN:
438 case SMT_PACKET:
439 case SMT_GRACE:
440 case SMT_SKIP:
441 case SMT_SPEED:
442 case SMT_SERVICE_STATUS:
443 case SMT_TIMESHIFT_STATUS:
444 case SMT_DESCRAMBLE_INFO:
445 break;
446 }
447
448 streaming_msg_free(sm);
449 pthread_mutex_lock(&sq->sq_mutex);
450 }
451 pthread_mutex_unlock(&sq->sq_mutex);
452
453 tvhdebug(LS_SATIPS, "RTP streaming to %s:%d closed (%s request)%s",
454 peername,
455 tcp ? ntohs(IP_PORT(rtp->peer)) : rtp->port,
456 alive ? "remote" : "streaming",
457 fatal ? " (fatal)" : "");
458
459 return NULL;
460 }
461
462 /*
463 *
464 */
satip_rtp_queue(th_subscription_t * subs,streaming_queue_t * sq,http_connection_t * hc,struct sockaddr_storage * peer,int port,int fd_rtp,int fd_rtcp,int frontend,int source,dvb_mux_conf_t * dmc,mpegts_apids_t * pids,int allow_data,int perm_lock,void (* no_data_cb)(void * opaque),void * no_data_opaque)465 void *satip_rtp_queue(th_subscription_t *subs,
466 streaming_queue_t *sq,
467 http_connection_t *hc,
468 struct sockaddr_storage *peer, int port,
469 int fd_rtp, int fd_rtcp,
470 int frontend, int source, dvb_mux_conf_t *dmc,
471 mpegts_apids_t *pids, int allow_data, int perm_lock,
472 void (*no_data_cb)(void *opaque),
473 void *no_data_opaque)
474 {
475 satip_rtp_session_t *rtp = calloc(1, sizeof(*rtp));
476 size_t len;
477 socklen_t socklen;
478 int dscp, payload;
479
480 if (rtp == NULL)
481 return NULL;
482
483 rtp->peer = *peer;
484 rtp->peer2 = *peer;
485 if (port != RTSP_TCP_DATA)
486 IP_PORT_SET(rtp->peer2, htons(port + 1));
487 rtp->port = port;
488 rtp->fd_rtp = fd_rtp;
489 rtp->fd_rtcp = fd_rtcp;
490 rtp->subs = subs;
491 rtp->sq = sq;
492 rtp->hc = hc;
493 payload = satip_server_conf.satip_rtptcpsize * 188 + 12 + 4;
494 rtp->tcp_payload = MINMAX(payload, RTP_TCP_MIN_PAYLOAD, RTP_TCP_MAX_PAYLOAD);
495 rtp->tcp_buffer_size = 16*1024*1024;
496 rtp->no_data_cb = no_data_cb;
497 rtp->no_data_opaque = no_data_opaque;
498 atomic_set(&rtp->allow_data, allow_data);
499 mpegts_pid_init(&rtp->pids);
500 mpegts_pid_copy(&rtp->pids, pids);
501 TAILQ_INIT(&rtp->pmt_tables);
502 if (port != RTSP_TCP_DATA) {
503 udp_multisend_init(&rtp->um, RTP_PACKETS, RTP_PAYLOAD, &rtp->um_iovec);
504 satip_rtp_header(rtp, rtp->um_iovec, 0);
505 } else {
506 socklen = sizeof(len);
507 if (getsockopt(fd_rtp, SOL_SOCKET, SO_SNDBUF, &len, &socklen) == 0 &&
508 len < RTP_TCP_BUFFER_SIZE) {
509 len = RTP_TCP_BUFFER_SIZE;
510 setsockopt(fd_rtp, SOL_SOCKET, SO_SNDBUF, &len, sizeof(len));
511 }
512 socklen = sizeof(len);
513 if (getsockopt(fd_rtp, SOL_SOCKET, SO_SNDBUF, &len, &socklen) == 0) {
514 rtp->tcp_buffer_size = len;
515 if (len - RTP_TCP_BUFFER_ROOM < rtp->tcp_payload) {
516 rtp->tcp_payload = len - RTP_TCP_BUFFER_ROOM;
517 rtp->tcp_payload -= rtp->tcp_payload % 188;
518 rtp->tcp_payload += 12 + 4;
519 }
520 }
521 }
522 rtp->frontend = frontend;
523 rtp->dmc = *dmc;
524 rtp->source = source;
525 pthread_mutex_init(&rtp->lock, NULL);
526
527 dscp = config.dscp >= 0 ? config.dscp : IPTOS_DSCP_EF;
528 socket_set_dscp(rtp->fd_rtp, dscp, NULL, 0);
529 if (rtp->fd_rtcp >= 0)
530 socket_set_dscp(rtp->fd_rtcp, dscp, NULL, 0);
531
532 if (perm_lock) {
533 int tmp = ((satip_server_conf.satip_iptv_sig_level * 0xffff) + 0x8000) / 245;
534 rtp->sig.signal_scale = SIGNAL_STATUS_SCALE_RELATIVE;
535 rtp->sig.signal = MINMAX(tmp, 0, 0xffff);
536 rtp->sig.snr_scale = SIGNAL_STATUS_SCALE_RELATIVE;
537 rtp->sig.snr = MAX(0xffff, (rtp->sig.signal * 3) / 2);
538 }
539
540 tvhtrace(LS_SATIPS, "rtp queue %p", rtp);
541
542 pthread_mutex_lock(&satip_rtp_lock);
543 TAILQ_INSERT_TAIL(&satip_rtp_sessions, rtp, link);
544 tvhthread_create(&rtp->tid, NULL, satip_rtp_thread, rtp, "satip-rtp");
545 pthread_mutex_unlock(&satip_rtp_lock);
546 return rtp;
547 }
548
satip_rtp_allow_data(void * _rtp)549 void satip_rtp_allow_data(void *_rtp)
550 {
551 satip_rtp_session_t *rtp = _rtp;
552
553 if (rtp == NULL)
554 return;
555 pthread_mutex_lock(&satip_rtp_lock);
556 atomic_set(&rtp->allow_data, 1);
557 pthread_mutex_unlock(&satip_rtp_lock);
558 }
559
satip_rtp_update_pids(void * _rtp,mpegts_apids_t * pids)560 void satip_rtp_update_pids(void *_rtp, mpegts_apids_t *pids)
561 {
562 satip_rtp_session_t *rtp = _rtp;
563
564 if (rtp == NULL)
565 return;
566 pthread_mutex_lock(&satip_rtp_lock);
567 pthread_mutex_lock(&rtp->lock);
568 mpegts_pid_copy(&rtp->pids, pids);
569 pthread_mutex_unlock(&rtp->lock);
570 pthread_mutex_unlock(&satip_rtp_lock);
571 }
572
satip_rtp_update_pmt_pids(void * _rtp,mpegts_apids_t * pmt_pids)573 void satip_rtp_update_pmt_pids(void *_rtp, mpegts_apids_t *pmt_pids)
574 {
575 satip_rtp_session_t *rtp = _rtp;
576 satip_rtp_table_t *tbl, *tbl_next;
577 int i, pid;
578
579 if (rtp == NULL)
580 return;
581 pthread_mutex_lock(&satip_rtp_lock);
582 pthread_mutex_lock(&rtp->lock);
583 TAILQ_FOREACH(tbl, &rtp->pmt_tables, link)
584 if (!mpegts_pid_rexists(pmt_pids, tbl->pid))
585 tbl->remove_mark = 1;
586 for (i = 0; i < pmt_pids->count; i++) {
587 pid = pmt_pids->pids[i].pid;
588 TAILQ_FOREACH(tbl, &rtp->pmt_tables, link)
589 if (tbl->pid == pid)
590 break;
591 if (!tbl) {
592 tbl = calloc(1, sizeof(*tbl));
593 dvb_table_parse_init(&tbl->tbl, "satip-pmt", LS_TBL_SATIP, pid,
594 DVB_PMT_BASE, DVB_PMT_MASK, rtp);
595 tbl->pid = pid;
596 TAILQ_INSERT_TAIL(&rtp->pmt_tables, tbl, link);
597 }
598 }
599 for (tbl = TAILQ_FIRST(&rtp->pmt_tables); tbl; tbl = tbl_next){
600 tbl_next = TAILQ_NEXT(tbl, link);
601 if (tbl->remove_mark) {
602 TAILQ_REMOVE(&rtp->pmt_tables, tbl, link);
603 free(tbl);
604 }
605 }
606 pthread_mutex_unlock(&rtp->lock);
607 pthread_mutex_unlock(&satip_rtp_lock);
608 }
609
satip_rtp_close(void * _rtp)610 void satip_rtp_close(void *_rtp)
611 {
612 satip_rtp_session_t *rtp = _rtp;
613 satip_rtp_table_t *tbl;
614 streaming_queue_t *sq;
615
616 if (rtp == NULL)
617 return;
618 pthread_mutex_lock(&satip_rtp_lock);
619 tvhtrace(LS_SATIPS, "rtp close %p", rtp);
620 TAILQ_REMOVE(&satip_rtp_sessions, rtp, link);
621 sq = rtp->sq;
622 pthread_mutex_lock(&sq->sq_mutex);
623 rtp->sq = NULL;
624 tvh_cond_signal(&sq->sq_cond, 0);
625 pthread_mutex_unlock(&sq->sq_mutex);
626 pthread_mutex_unlock(&satip_rtp_lock);
627 pthread_join(rtp->tid, NULL);
628 if (rtp->port == RTSP_TCP_DATA) {
629 http_extra_destroy(rtp->hc);
630 free(rtp->tcp_data.iov_base);
631 } else {
632 udp_multisend_free(&rtp->um);
633 }
634 mpegts_pid_done(&rtp->pids);
635 while ((tbl = TAILQ_FIRST(&rtp->pmt_tables)) != NULL) {
636 dvb_table_parse_done(&tbl->tbl);
637 TAILQ_REMOVE(&rtp->pmt_tables, tbl, link);
638 free(tbl);
639 }
640 pthread_mutex_destroy(&rtp->lock);
641 free(rtp);
642 }
643
644 /*
645 *
646 */
647 static const char *
satip_rtcp_pol(int pol)648 satip_rtcp_pol(int pol)
649 {
650 switch (pol) {
651 case DVB_POLARISATION_HORIZONTAL:
652 return "h";
653 case DVB_POLARISATION_VERTICAL:
654 return "v";
655 case DVB_POLARISATION_CIRCULAR_LEFT:
656 return "l";
657 case DVB_POLARISATION_CIRCULAR_RIGHT:
658 return "r";
659 case DVB_POLARISATION_OFF:
660 return "off";
661 default:
662 return "";
663 }
664 }
665
666 /*
667 *
668 */
669 static const char *
satip_rtcp_fec(int fec)670 satip_rtcp_fec(int fec)
671 {
672 static char buf[16];
673 char *p = buf;
674 const char *s;
675
676 if (fec == DVB_FEC_AUTO || fec == DVB_FEC_NONE)
677 return "";
678 s = dvb_fec2str(fec);
679 if (s == NULL)
680 return "";
681 strlcpy(buf, s, sizeof(buf));
682 p = strchr(buf, '/');
683 while (p && *p) {
684 *p = *(p+1);
685 p++;
686 }
687 return buf;
688 }
689
690 /*
691 *
692 */
693 static int
satip_status_build(satip_rtp_session_t * rtp,char * buf,int len)694 satip_status_build(satip_rtp_session_t *rtp, char *buf, int len)
695 {
696 char pids[1400];
697 const char *delsys, *msys, *pilot, *rolloff;
698 const char *bw, *tmode, *gi, *plp, *t2id, *sm, *c2tft, *ds, *specinv;
699 int r, level = 0, lock = 0, quality = 0;
700
701 lock = atomic_get(&rtp->sig_lock);
702 if (satip_server_conf.satip_force_sig_level > 0) {
703 level = MINMAX(satip_server_conf.satip_force_sig_level, 1, 240);
704 quality = MAX((level + 15) / 15, 15);
705 } else {
706 switch (rtp->sig.signal_scale) {
707 case SIGNAL_STATUS_SCALE_RELATIVE:
708 level = MINMAX((rtp->sig.signal * 245) / 0xffff, 0, 240);
709 break;
710 case SIGNAL_STATUS_SCALE_DECIBEL:
711 level = MINMAX((rtp->sig.signal + 90000) / 375, 0, 240);
712 break;
713 default:
714 level = lock ? 120 : 0;
715 break;
716 }
717 switch (rtp->sig.snr_scale) {
718 case SIGNAL_STATUS_SCALE_RELATIVE:
719 quality = MINMAX((rtp->sig.snr * 16) / 0xffff, 0, 15);
720 break;
721 case SIGNAL_STATUS_SCALE_DECIBEL:
722 quality = MINMAX(rtp->sig.snr / 2000, 0, 15);
723 break;
724 default:
725 quality = lock ? 10 : 0;
726 break;
727 }
728 }
729
730 mpegts_pid_dump(&rtp->pids, pids, sizeof(pids), 0, 0);
731
732 switch (rtp->dmc.dmc_fe_delsys) {
733 case DVB_SYS_DVBS:
734 case DVB_SYS_DVBS2:
735 delsys = rtp->dmc.dmc_fe_delsys == DVB_SYS_DVBS ? "dvbs" : "dvbs2";
736 switch (rtp->dmc.dmc_fe_modulation) {
737 case DVB_MOD_QPSK: msys = "qpsk"; break;
738 case DVB_MOD_PSK_8: msys = "8psk"; break;
739 default: msys = ""; break;
740 }
741 switch (rtp->dmc.dmc_fe_pilot) {
742 case DVB_PILOT_ON: pilot = "on"; break;
743 case DVB_PILOT_OFF: pilot = "off"; break;
744 default: pilot = ""; break;
745 }
746 switch (rtp->dmc.dmc_fe_rolloff) {
747 case DVB_ROLLOFF_20: rolloff = "20"; break;
748 case DVB_ROLLOFF_25: rolloff = "25"; break;
749 case DVB_ROLLOFF_35: rolloff = "35"; break;
750 default: rolloff = ""; break;
751 }
752 /* ver=<major>.<minor>;src=<srcID>;tuner=<feID>,<level>,<lock>,<quality>,<frequency>,<polarisation>,\
753 * <system>,<type>,<pilots>,<roll_off>,<symbol_rate>,<fec_inner>;pids=<pid0>,...,<pidn>
754 */
755 r = snprintf(buf, len,
756 "ver=1.0;src=%d;tuner=%d,%d,%d,%d,%.f,%s,%s,%s,%s,%s,%.f,%s;pids=%s",
757 rtp->source, rtp->frontend, level, lock, quality,
758 (float)rtp->dmc.dmc_fe_freq / 1000.0,
759 satip_rtcp_pol(rtp->dmc.u.dmc_fe_qpsk.polarisation),
760 delsys, msys, pilot, rolloff,
761 (float)rtp->dmc.u.dmc_fe_qpsk.symbol_rate / 1000.0,
762 satip_rtcp_fec(rtp->dmc.u.dmc_fe_qpsk.fec_inner),
763 pids);
764 break;
765 case DVB_SYS_DVBT:
766 case DVB_SYS_DVBT2:
767 delsys = rtp->dmc.dmc_fe_delsys == DVB_SYS_DVBT ? "dvbt" : "dvbt2";
768 switch (rtp->dmc.u.dmc_fe_ofdm.bandwidth) {
769 case DVB_BANDWIDTH_1_712_MHZ: bw = "1.712"; break;
770 case DVB_BANDWIDTH_5_MHZ: bw = "5"; break;
771 case DVB_BANDWIDTH_6_MHZ: bw = "6"; break;
772 case DVB_BANDWIDTH_7_MHZ: bw = "7"; break;
773 case DVB_BANDWIDTH_8_MHZ: bw = "8"; break;
774 case DVB_BANDWIDTH_10_MHZ: bw = "10"; break;
775 default: bw = ""; break;
776 }
777 switch (rtp->dmc.u.dmc_fe_ofdm.transmission_mode) {
778 case DVB_TRANSMISSION_MODE_1K: tmode = "1k"; break;
779 case DVB_TRANSMISSION_MODE_2K: tmode = "2k"; break;
780 case DVB_TRANSMISSION_MODE_4K: tmode = "4k"; break;
781 case DVB_TRANSMISSION_MODE_8K: tmode = "8k"; break;
782 case DVB_TRANSMISSION_MODE_16K: tmode = "16k"; break;
783 case DVB_TRANSMISSION_MODE_32K: tmode = "32k"; break;
784 default: tmode = ""; break;
785 }
786 switch (rtp->dmc.dmc_fe_modulation) {
787 case DVB_MOD_QAM_16: msys = "qam16"; break;
788 case DVB_MOD_QAM_32: msys = "qam32"; break;
789 case DVB_MOD_QAM_64: msys = "qam64"; break;
790 case DVB_MOD_QAM_128: msys = "qam128"; break;
791 default: msys = ""; break;
792 }
793 switch (rtp->dmc.u.dmc_fe_ofdm.guard_interval) {
794 case DVB_GUARD_INTERVAL_1_4: gi = "14"; break;
795 case DVB_GUARD_INTERVAL_1_8: gi = "18"; break;
796 case DVB_GUARD_INTERVAL_1_16: gi = "116"; break;
797 case DVB_GUARD_INTERVAL_1_32: gi = "132"; break;
798 case DVB_GUARD_INTERVAL_1_128: gi = "1128"; break;
799 case DVB_GUARD_INTERVAL_19_128: gi = "19128"; break;
800 case DVB_GUARD_INTERVAL_19_256: gi = "19256"; break;
801 default: gi = ""; break;
802 }
803 plp = "";
804 t2id = "";
805 sm = "";
806 /* ver=1.1;tuner=<feID>,<level>,<lock>,<quality>,<freq>,<bw>,<msys>,<tmode>,<mtype>,<gi>,\
807 * <fec>,<plp>,<t2id>,<sm>;pids=<pid0>,...,<pidn>
808 */
809 r = snprintf(buf, len,
810 "ver=1.1;tuner=%d,%d,%d,%d,%.f,%s,%s,%s,%s,%s,%s,%s,%s,%s;pids=%s",
811 rtp->frontend, level, lock, quality,
812 (float)rtp->dmc.dmc_fe_freq / 1000000.0,
813 bw, delsys, tmode, msys, gi,
814 satip_rtcp_fec(rtp->dmc.u.dmc_fe_ofdm.code_rate_HP),
815 plp, t2id, sm, pids);
816 break;
817 case DVB_SYS_DVBC_ANNEX_A:
818 case DVB_SYS_DVBC_ANNEX_C:
819 delsys = rtp->dmc.dmc_fe_delsys == DVB_SYS_DVBC_ANNEX_A ? "dvbc" : "dvbc2";
820 bw = "";
821 switch (rtp->dmc.dmc_fe_modulation) {
822 case DVB_MOD_QAM_16: msys = "qam16"; break;
823 case DVB_MOD_QAM_32: msys = "qam32"; break;
824 case DVB_MOD_QAM_64: msys = "qam64"; break;
825 case DVB_MOD_QAM_128: msys = "qam128"; break;
826 default: msys = ""; break;
827 }
828 c2tft = "";
829 ds = "";
830 plp = "";
831 specinv = "";
832 /* ver=1.2;tuner=<feID>,<level>,<lock>,<quality>,<freq>,<bw>,<msys>,<mtype>,<sr>,<c2tft>,<ds>,<plp>,
833 * <specinv>;pids=<pid0>,...,<pidn>
834 */
835 r = snprintf(buf, len,
836 "ver=1.1;tuner=%d,%d,%d,%d,%.f,%s,%s,%s,%.f,%s,%s,%s,%s;pids=%s",
837 rtp->frontend, level, lock, quality,
838 (float)rtp->dmc.dmc_fe_freq / 1000000.0,
839 bw, delsys, msys,
840 (float)rtp->dmc.u.dmc_fe_qam.symbol_rate / 1000.0,
841 c2tft, ds, plp, specinv, pids);
842 break;
843 default:
844 r = snprintf(buf, len, "ver=1.0;src=%d;tuner=%d,%d,%d,%d,,,,,,,,;pids=%s",
845 rtp->source, rtp->frontend, level, lock, quality, pids);
846 break;
847 }
848
849 return r >= len ? len - 1 : r;
850 }
851
852 /*
853 *
854 */
satip_rtp_status(void * _rtp,char * buf,int len)855 int satip_rtp_status(void *_rtp, char *buf, int len)
856 {
857 satip_rtp_session_t *rtp = _rtp;
858 int r = 0;
859
860 buf[0] = '\0';
861 if (rtp == NULL)
862 return 0;
863 pthread_mutex_lock(&satip_rtp_lock);
864 pthread_mutex_lock(&rtp->lock);
865 r = satip_status_build(rtp, buf, len);
866 pthread_mutex_unlock(&rtp->lock);
867 pthread_mutex_unlock(&satip_rtp_lock);
868 return r;
869 }
870
871 /*
872 *
873 */
874 static int
satip_rtcp_build(satip_rtp_session_t * rtp,uint8_t * msg)875 satip_rtcp_build(satip_rtp_session_t *rtp, uint8_t *msg)
876 {
877 char buf[1500];
878 int len, len2;
879
880 pthread_mutex_lock(&rtp->lock);
881 len = satip_status_build(rtp, buf, sizeof(buf));
882 pthread_mutex_unlock(&rtp->lock);
883
884 len = len2 = MIN(len, RTCP_PAYLOAD - 16);
885 if (len == 0)
886 len++;
887 while ((len % 4) != 0)
888 buf[len++] = 0;
889 memcpy(msg + 16, buf, len);
890
891 len += 16;
892 msg[0] = 0x80;
893 msg[1] = 204;
894 msg[2] = (((len - 1) / 4) >> 8) & 0xff;
895 msg[3] = ((len - 1) / 4) & 0xff;
896 msg[4] = 0;
897 msg[5] = 0;
898 msg[6] = 0;
899 msg[7] = 0;
900 msg[8] = 'S';
901 msg[9] = 'E';
902 msg[10] = 'S';
903 msg[11] = '1';
904 msg[12] = 0;
905 msg[13] = 0;
906 msg[14] = (len2 >> 8) & 0xff;
907 msg[15] = len2 & 0xff;
908
909 return len;
910 }
911
912 /*
913 *
914 */
915 static void *
satip_rtcp_thread(void * aux)916 satip_rtcp_thread(void *aux)
917 {
918 satip_rtp_session_t *rtp;
919 int64_t us;
920 uint8_t msg[RTCP_PAYLOAD+1], *msg1;
921 char addrbuf[50];
922 int r, len, err;
923
924 tvhtrace(LS_SATIPS, "starting rtcp thread");
925 while (atomic_get(&satip_rtcp_run)) {
926 us = 150000;
927 do {
928 us = tvh_usleep(us);
929 if (us < 0)
930 goto end;
931 if (!atomic_get(&satip_rtcp_run))
932 goto end;
933 } while (us > 0);
934 pthread_mutex_lock(&satip_rtp_lock);
935 TAILQ_FOREACH(rtp, &satip_rtp_sessions, link) {
936 if (rtp->sq == NULL || rtp->disable_rtcp) continue;
937 len = satip_rtcp_build(rtp, msg);
938 if (len <= 0) continue;
939 if (tvhtrace_enabled()) {
940 msg[len] = '\0';
941 tcp_get_str_from_ip(&rtp->peer2, addrbuf, sizeof(addrbuf));
942 tvhtrace(LS_SATIPS, "RTCP send to %s:%d : %s", addrbuf, ntohs(IP_PORT(rtp->peer2)), msg + 16);
943 }
944 if (rtp->port == RTSP_TCP_DATA) {
945 msg1 = malloc(len);
946 if (msg1) {
947 memcpy(msg1, msg, len);
948 err = satip_rtp_tcp_data(rtp, 1, msg1, len, 0);
949 r = err ? -1 : 0;
950 } else {
951 r = -1;
952 err = ENOMEM;
953 }
954 } else {
955 r = sendto(rtp->fd_rtcp, msg, len, 0,
956 (struct sockaddr*)&rtp->peer2,
957 rtp->peer2.ss_family == AF_INET6 ?
958 sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in));
959 err = errno;
960 }
961 if (r < 0) {
962 if (err != ECONNREFUSED) {
963 tcp_get_str_from_ip(&rtp->peer2, addrbuf, sizeof(addrbuf));
964 tvhwarn(LS_SATIPS, "RTCP send to error %s:%d : %s",
965 addrbuf, ntohs(IP_PORT(rtp->peer2)), strerror(err));
966 } else {
967 rtp->disable_rtcp = 1;
968 }
969 }
970 }
971 pthread_mutex_unlock(&satip_rtp_lock);
972 }
973 end:
974 return NULL;
975 }
976
977 /*
978 *
979 */
satip_rtp_init(int boot)980 void satip_rtp_init(int boot)
981 {
982 TAILQ_INIT(&satip_rtp_sessions);
983 pthread_mutex_init(&satip_rtp_lock, NULL);
984
985 if (boot)
986 atomic_set(&satip_rtcp_run, 0);
987
988 if (!boot && !atomic_get(&satip_rtcp_run)) {
989 atomic_set(&satip_rtcp_run, 1);
990 tvhthread_create(&satip_rtcp_tid, NULL, satip_rtcp_thread, NULL, "satip-rtcp");
991 }
992 }
993
994 /*
995 *
996 */
satip_rtp_done(void)997 void satip_rtp_done(void)
998 {
999 assert(TAILQ_EMPTY(&satip_rtp_sessions));
1000 if (atomic_get(&satip_rtcp_run)) {
1001 atomic_set(&satip_rtcp_run, 0);
1002 pthread_kill(satip_rtcp_tid, SIGTERM);
1003 pthread_join(satip_rtcp_tid, NULL);
1004 }
1005 }
1006