1 /**
2 * TV headend - Timeshift Reader
3 * Copyright (C) 2012 Adam Sutton
4 *
5 * This program is free software: you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation, either version 3 of the License, or
8 * (at your option) any later version.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program. If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 #include "tvheadend.h"
20 #include "streaming.h"
21 #include "timeshift.h"
22 #include "timeshift/private.h"
23 #include "atomic.h"
24 #include "tvhpoll.h"
25
26 #include <sys/types.h>
27 #include <sys/stat.h>
28 #include <unistd.h>
29 #include <fcntl.h>
30 #include <string.h>
31 #include <assert.h>
32
33 #if ENABLE_EPOLL
34 #include <sys/epoll.h>
35 #elif ENABLE_KQUEUE
36 #include <sys/event.h>
37 #include <sys/time.h>
38 #endif
39
40 /* **************************************************************************
41 * Buffered position handling
42 * *************************************************************************/
43
_seek_reset(timeshift_seek_t * seek)44 static timeshift_seek_t *_seek_reset ( timeshift_seek_t *seek )
45 {
46 timeshift_file_t *tsf = seek->file;
47 seek->file = NULL;
48 seek->frame = NULL;
49 timeshift_file_put(tsf);
50 return seek;
51 }
52
_seek_set_file(timeshift_seek_t * seek,timeshift_file_t * tsf,off_t roff)53 static timeshift_seek_t *_seek_set_file
54 ( timeshift_seek_t *seek, timeshift_file_t *tsf, off_t roff )
55 {
56 seek->file = tsf;
57 seek->frame = NULL;
58 if (tsf)
59 tsf->roff = roff;
60 return seek;
61 }
62
_read_close(timeshift_seek_t * seek)63 static timeshift_seek_t *_read_close ( timeshift_seek_t *seek )
64 {
65 if (seek->file && seek->file->rfd >= 0) {
66 close(seek->file->rfd);
67 seek->file->rfd = -1;
68 }
69 return _seek_reset(seek);
70 }
71
72 /* **************************************************************************
73 * File Reading
74 * *************************************************************************/
75
_read_buf(timeshift_file_t * tsf,int fd,void * buf,size_t size)76 static ssize_t _read_buf ( timeshift_file_t *tsf, int fd, void *buf, size_t size )
77 {
78 ssize_t r;
79 size_t ret;
80
81 if (tsf && tsf->ram) {
82 if (tsf->roff == tsf->woff) return 0;
83 if (tsf->roff + size > tsf->woff) return -1;
84 pthread_mutex_lock(&tsf->ram_lock);
85 memcpy(buf, tsf->ram + tsf->roff, size);
86 tsf->roff += size;
87 pthread_mutex_unlock(&tsf->ram_lock);
88 return size;
89 } else {
90 ret = 0;
91 while (size > 0) {
92 r = read(tsf ? tsf->rfd : fd, buf, size);
93 if (r < 0) {
94 if (ERRNO_AGAIN(errno))
95 continue;
96 tvhtrace(LS_TIMESHIFT, "read errno %d", errno);
97 return -1;
98 }
99 if (r > 0) {
100 size -= r;
101 ret += r;
102 buf += r;
103 }
104 if (r == 0)
105 return 0;
106 }
107 if (ret > 0 && tsf)
108 tsf->roff += ret;
109 return ret;
110 }
111 }
112
_read_pktbuf(timeshift_file_t * tsf,int fd,pktbuf_t ** pktbuf)113 static ssize_t _read_pktbuf ( timeshift_file_t *tsf, int fd, pktbuf_t **pktbuf )
114 {
115 ssize_t r, cnt = 0;
116 size_t sz;
117
118 /* Size */
119 r = _read_buf(tsf, fd, &sz, sizeof(sz));
120 if (r < 0) return -1;
121 if (r != sizeof(sz)) return 0;
122 cnt += r;
123
124 /* Empty And Sanity Check */
125 if (!sz || sz > 1024 * 1024) {
126 *pktbuf = NULL;
127 return cnt;
128 }
129
130 /* Data */
131 *pktbuf = pktbuf_alloc(NULL, sz);
132 r = _read_buf(tsf, fd, pktbuf_ptr(*pktbuf), sz);
133 if (r != sz) {
134 pktbuf_destroy(*pktbuf);
135 *pktbuf = NULL;
136 return r < 0 ? -1 : 0;
137 }
138 cnt += r;
139
140 return cnt;
141 }
142
143
_read_msg(timeshift_file_t * tsf,int fd,streaming_message_t ** sm)144 static ssize_t _read_msg ( timeshift_file_t *tsf, int fd, streaming_message_t **sm )
145 {
146 ssize_t r, cnt = 0;
147 size_t sz;
148 streaming_message_type_t type;
149 int64_t time;
150 void *data;
151 int code;
152
153 /* Clear */
154 *sm = NULL;
155
156 /* Size */
157 r = _read_buf(tsf, fd, &sz, sizeof(sz));
158 if (r < 0) return -1;
159 if (r != sizeof(sz)) return 0;
160 cnt += r;
161
162 /* EOF */
163 if (sz == 0) return cnt;
164
165 /* Wrong data size */
166 if (sz > 1024 * 1024) {
167 tvhtrace(LS_TIMESHIFT, "wrong msg size (%lld/0x%llx)", (long long)sz, (long long)sz);
168 return -1;
169 }
170
171 /* Type */
172 r = _read_buf(tsf, fd, &type, sizeof(type));
173 if (r < 0) return -1;
174 if (r != sizeof(type)) return 0;
175 cnt += r;
176
177 /* Time */
178 r = _read_buf(tsf, fd, &time, sizeof(time));
179 if (r < 0) return -1;
180 if (r != sizeof(time)) return 0;
181 cnt += r;
182
183 /* Adjust size */
184 sz -= sizeof(type) + sizeof(time);
185 cnt += sz;
186
187 /* Standard messages */
188 switch (type) {
189
190 /* Unhandled */
191 case SMT_START:
192 case SMT_NOSTART:
193 case SMT_NOSTART_WARN:
194 case SMT_SERVICE_STATUS:
195 case SMT_DESCRAMBLE_INFO:
196 return -1;
197 break;
198
199 /* Code */
200 case SMT_STOP:
201 case SMT_EXIT:
202 case SMT_SPEED:
203 if (sz != sizeof(code)) return -1;
204 r = _read_buf(tsf, fd, &code, sz);
205 if (r != sz) {
206 if (r < 0) return -1;
207 return 0;
208 }
209 *sm = streaming_msg_create_code(type, code);
210 break;
211
212 /* Data */
213 case SMT_SKIP:
214 case SMT_SIGNAL_STATUS:
215 case SMT_MPEGTS:
216 case SMT_PACKET:
217 data = malloc(sz);
218 r = _read_buf(tsf, fd, data, sz);
219 if (r != sz) {
220 free(data);
221 if (r < 0) return -1;
222 return 0;
223 }
224 if (type == SMT_PACKET) {
225 th_pkt_t *pkt = data;
226 pkt->pkt_payload = pkt->pkt_meta = NULL;
227 pkt->pkt_refcount = 0;
228 *sm = streaming_msg_create_pkt(pkt);
229 r = _read_pktbuf(tsf, fd, &pkt->pkt_meta);
230 if (r < 0) {
231 streaming_msg_free(*sm);
232 return r;
233 }
234 cnt += r;
235 r = _read_pktbuf(tsf, fd, &pkt->pkt_payload);
236 if (r < 0) {
237 streaming_msg_free(*sm);
238 return r;
239 }
240 cnt += r;
241 } else {
242 *sm = streaming_msg_create_data(type, data);
243 }
244 (*sm)->sm_time = time;
245 break;
246
247 default:
248 return -1;
249 }
250
251 /* OK */
252 return cnt;
253 }
254
255 /* **************************************************************************
256 * Utilities
257 * *************************************************************************/
258
_timeshift_first_time(timeshift_t * ts,int * active)259 static int64_t _timeshift_first_time
260 ( timeshift_t *ts, int *active )
261 {
262 int64_t ret = 0;
263 int end;
264 timeshift_index_iframe_t *tsi = NULL;
265 timeshift_file_t *tsf = timeshift_filemgr_oldest(ts);
266 while (tsf && !tsi) {
267 if (!(tsi = TAILQ_FIRST(&tsf->iframes)))
268 tsf = timeshift_filemgr_next(tsf, &end, 0);
269 }
270 if (tsi) {
271 *active = 1;
272 ret = tsi->time;
273 }
274 timeshift_file_put(tsf);
275 return ret;
276 }
277
_timeshift_skip(timeshift_t * ts,int64_t req_time,int64_t cur_time,timeshift_seek_t * seek,timeshift_seek_t * nseek)278 static int _timeshift_skip
279 ( timeshift_t *ts, int64_t req_time, int64_t cur_time,
280 timeshift_seek_t *seek, timeshift_seek_t *nseek )
281 {
282 timeshift_index_iframe_t *tsi = seek->frame;
283 timeshift_file_t *tsf = seek->file, *tsf_last;
284 int64_t sec = mono2sec(req_time) / TIMESHIFT_FILE_PERIOD;
285 int back = (req_time < cur_time) ? 1 : 0;
286 int end = 0;
287
288 /* Coarse search */
289 if (!tsi) {
290 while (tsf && !end) {
291 if (back) {
292 if ((tsf->time <= sec) &&
293 (tsi = TAILQ_LAST(&tsf->iframes, timeshift_index_iframe_list)))
294 break;
295 tsf = timeshift_filemgr_prev(tsf, &end, 1);
296 } else {
297 if ((tsf->time >= sec) &&
298 (tsi = TAILQ_FIRST(&tsf->iframes)))
299 break;
300 tsf = timeshift_filemgr_next(tsf, &end, 0);
301 }
302 tsi = NULL;
303 }
304 }
305
306 /* Fine search */
307 if (back) {
308 while (!end && tsf && tsi && (tsi->time > req_time)) {
309 tsi = TAILQ_PREV(tsi, timeshift_index_iframe_list, link);
310 while (!end && tsf && !tsi) {
311 if ((tsf = timeshift_filemgr_prev(tsf, &end, 1)))
312 tsi = TAILQ_LAST(&tsf->iframes, timeshift_index_iframe_list);
313 }
314 }
315 } else {
316 while (!end && tsf && tsi && (tsi->time < req_time)) {
317 tsi = TAILQ_NEXT(tsi, link);
318 while (!end && tsf && !tsi) {
319 if ((tsf = timeshift_filemgr_next(tsf, &end, 0)))
320 tsi = TAILQ_FIRST(&tsf->iframes);
321 }
322 }
323 }
324
325 /* End */
326 if (!tsf || !tsi)
327 end = 1;
328
329 /* Find start/end of buffer */
330 if (end) {
331 timeshift_file_put(tsf);
332 if (back) {
333 tsf = tsf_last = timeshift_filemgr_oldest(ts);
334 tsi = NULL;
335 while (tsf && !tsi) {
336 tsf_last = tsf;
337 if (!(tsi = TAILQ_FIRST(&tsf->iframes)))
338 tsf = timeshift_filemgr_next(tsf, &end, 0);
339 }
340 if (!tsf)
341 tsf = tsf_last;
342 end = -1;
343 } else {
344 tsf = tsf_last = timeshift_filemgr_newest(ts);
345 tsi = NULL;
346 while (tsf && !tsi) {
347 tsf_last = tsf;
348 if (!(tsi = TAILQ_LAST(&tsf->iframes, timeshift_index_iframe_list)))
349 tsf = timeshift_filemgr_prev(tsf, &end, 0);
350 }
351 if (!tsf)
352 tsf = tsf_last;
353 end = 1;
354 }
355 }
356
357 /* Done */
358 nseek->file = tsf;
359 nseek->frame = tsi;
360 return end;
361 }
362
363 /*
364 *
365 */
_timeshift_do_skip(timeshift_t * ts,int64_t req_time,int64_t last_time,timeshift_seek_t * seek)366 static int _timeshift_do_skip
367 ( timeshift_t *ts, int64_t req_time, int64_t last_time,
368 timeshift_seek_t *seek )
369 {
370 timeshift_seek_t nseek;
371 int end;
372
373 tvhdebug(LS_TIMESHIFT, "ts %d skip to %"PRId64" from %"PRId64,
374 ts->id, req_time, last_time);
375
376 timeshift_file_get(seek->file);
377
378 /* Find */
379 end = _timeshift_skip(ts, req_time, last_time, seek, &nseek);
380 if (nseek.frame)
381 tvhdebug(LS_TIMESHIFT, "ts %d skip found pkt @ %"PRId64,
382 ts->id, nseek.frame->time);
383
384 /* File changed (close) */
385 if (nseek.file != seek->file)
386 _read_close(seek);
387
388 timeshift_file_put(seek->file);
389
390 /* Position */
391 *seek = nseek;
392 if (nseek.file != NULL) {
393 if (nseek.frame)
394 nseek.file->roff = nseek.frame->pos;
395 else
396 nseek.file->roff = req_time > last_time ? nseek.file->size : 0;
397 tvhtrace(LS_TIMESHIFT, "do skip seek->file %p roff %"PRId64,
398 nseek.file, (int64_t)nseek.file->roff);
399 }
400
401 return end;
402 }
403
404
405 /*
406 * Output packet
407 */
_timeshift_read(timeshift_t * ts,timeshift_seek_t * seek,streaming_message_t ** sm,int * wait)408 static int _timeshift_read
409 ( timeshift_t *ts, timeshift_seek_t *seek,
410 streaming_message_t **sm, int *wait )
411 {
412 timeshift_file_t *tsf = seek->file;
413 ssize_t r;
414 off_t off = 0;
415
416 *sm = NULL;
417
418 if (tsf) {
419
420 /* Open file */
421 if (tsf->rfd < 0 && !tsf->ram) {
422 tsf->rfd = tvh_open(tsf->path, O_RDONLY, 0);
423 tvhtrace(LS_TIMESHIFT, "ts %d open file %s (fd %i)", ts->id, tsf->path, tsf->rfd);
424 if (tsf->rfd < 0)
425 return -1;
426 }
427 if (tsf->rfd >= 0)
428 if ((off = lseek(tsf->rfd, tsf->roff, SEEK_SET)) != tsf->roff)
429 tvherror(LS_TIMESHIFT, "ts %d seek to %s failed (off %"PRId64" != %"PRId64"): %s",
430 ts->id, tsf->path, (int64_t)tsf->roff, (int64_t)off, strerror(errno));
431
432 /* Read msg */
433 r = _read_msg(tsf, -1, sm);
434 if (r < 0) {
435 streaming_message_t *e = streaming_msg_create_code(SMT_STOP, SM_CODE_UNDEFINED_ERROR);
436 streaming_target_deliver2(ts->output, e);
437 tvhtrace(LS_TIMESHIFT, "ts %d seek to %jd (woff %jd) (fd %i)", ts->id, (intmax_t)off, (intmax_t)tsf->woff, tsf->rfd);
438 tvherror(LS_TIMESHIFT, "ts %d could not read buffer", ts->id);
439 return -1;
440 }
441 tvhtrace(LS_TIMESHIFT, "ts %d seek to %jd (fd %i) read msg %p/%"PRId64" (%"PRId64")",
442 ts->id, (intmax_t)off, tsf->rfd, *sm, *sm ? (*sm)->sm_time : -1, (int64_t)r);
443
444 /* Special case - EOF */
445 if (r <= sizeof(size_t) || tsf->roff > tsf->size || *sm == NULL) {
446 timeshift_file_get(seek->file); /* _read_close decreases file reference */
447 _read_close(seek);
448 _seek_set_file(seek, timeshift_filemgr_next(tsf, NULL, 0), 0);
449 *wait = 0;
450 tvhtrace(LS_TIMESHIFT, "ts %d eof, seek->file %p (prev %p)", ts->id, seek->file, tsf);
451 timeshift_filemgr_dump(ts);
452 }
453 }
454 return 0;
455 }
456
457 /*
458 * Flush all data to live
459 */
_timeshift_flush_to_live(timeshift_t * ts,timeshift_seek_t * seek,int * wait)460 static int _timeshift_flush_to_live
461 ( timeshift_t *ts, timeshift_seek_t *seek, int *wait )
462 {
463 streaming_message_t *sm;
464
465 while (seek->file) {
466 if (_timeshift_read(ts, seek, &sm, wait) == -1)
467 return -1;
468 if (!sm) break;
469 timeshift_packet_log("ouf", ts, sm);
470 streaming_target_deliver2(ts->output, sm);
471 }
472 return 0;
473 }
474
475 /*
476 * Send the status message
477 */
timeshift_fill_status(timeshift_t * ts,timeshift_status_t * status,int64_t current_time)478 static void timeshift_fill_status
479 ( timeshift_t *ts, timeshift_status_t *status, int64_t current_time )
480 {
481 int active = 0;
482 int64_t start, end;
483
484 start = _timeshift_first_time(ts, &active);
485 end = ts->buf_time;
486 if (ts->state <= TS_LIVE) {
487 current_time = end;
488 } else {
489 if (current_time < 0)
490 current_time = 0;
491 if (current_time > end)
492 current_time = end;
493 }
494 status->full = ts->full;
495 tvhtrace(LS_TIMESHIFT, "ts %d status start %"PRId64" end %"PRId64
496 " current %"PRId64" state %d",
497 ts->id, start, end, current_time, ts->state);
498 status->shift = ts_rescale_inv(end - current_time, 1000000);
499 if (active) {
500 status->pts_start = ts_rescale_inv(start, 1000000);
501 status->pts_end = ts_rescale_inv(end, 1000000);
502 } else {
503 status->pts_start = PTS_UNSET;
504 status->pts_end = PTS_UNSET;
505 }
506 }
507
timeshift_status(timeshift_t * ts,int64_t current_time)508 static void timeshift_status
509 ( timeshift_t *ts, int64_t current_time )
510 {
511 streaming_message_t *tsm;
512 timeshift_status_t *status;
513
514 status = calloc(1, sizeof(timeshift_status_t));
515 timeshift_fill_status(ts, status, current_time);
516 tsm = streaming_msg_create_data(SMT_TIMESHIFT_STATUS, status);
517 streaming_target_deliver2(ts->output, tsm);
518 }
519
520 /* **************************************************************************
521 * Thread
522 * *************************************************************************/
523
524
525 /*
526 * Timeshift thread
527 */
timeshift_reader(void * p)528 void *timeshift_reader ( void *p )
529 {
530 timeshift_t *ts = p;
531 int nfds, end, run = 1, wait = -1, state;
532 timeshift_seek_t *seek = &ts->seek;
533 timeshift_file_t *tmp_file;
534 int cur_speed = 100, keyframe_mode = 0;
535 int64_t mono_now, mono_play_time = 0, mono_last_status = 0;
536 int64_t deliver, deliver0, pause_time = 0, last_time = 0, skip_time = 0;
537 int64_t i64;
538 streaming_message_t *sm = NULL, *ctrl = NULL;
539 streaming_skip_t *skip = NULL;
540 tvhpoll_t *pd;
541 tvhpoll_event_t ev = { 0 };
542
543 pd = tvhpoll_create(1);
544 ev.fd = ts->rd_pipe.rd;
545 ev.events = TVHPOLL_IN;
546 tvhpoll_add(pd, &ev, 1);
547
548 /* Output */
549 while (run) {
550
551 // Note: Previously we allowed unlimited wait, but we now must wake periodically
552 // to output status message
553 if (wait < 0 || wait > 1000)
554 wait = 1000;
555
556 /* Wait for data */
557 if(wait)
558 nfds = tvhpoll_wait(pd, &ev, 1, wait);
559 else
560 nfds = 0;
561 wait = -1;
562 end = 0;
563 skip = NULL;
564 mono_now = getfastmonoclock();
565
566 /* Control */
567 pthread_mutex_lock(&ts->state_mutex);
568 if (nfds == 1) {
569 if (_read_msg(NULL, ts->rd_pipe.rd, &ctrl) > 0) {
570
571 /* Exit */
572 if (ctrl->sm_type == SMT_EXIT) {
573 tvhtrace(LS_TIMESHIFT, "ts %d read exit request", ts->id);
574 run = 0;
575 streaming_msg_free(ctrl);
576 ctrl = NULL;
577
578 /* Speed */
579 } else if (ctrl->sm_type == SMT_SPEED) {
580 int speed = ctrl->sm_code;
581 int keyframe;
582
583 /* Bound it */
584 if (speed > 3200) speed = 3200;
585 if (speed < -3200) speed = -3200;
586
587 /* Ignore negative */
588 if (!ts->dobuf && (speed < 0))
589 speed = seek->file ? speed : 0;
590
591 /* Process */
592 if (cur_speed != speed) {
593
594 /* Live playback */
595 state = ts->state;
596 if (state == TS_LIVE) {
597
598 /* Reject */
599 if (speed >= 100) {
600 tvhdebug(LS_TIMESHIFT, "ts %d reject 1x+ in live mode", ts->id);
601 speed = 100;
602
603 /* Set position */
604 } else {
605 tvhdebug(LS_TIMESHIFT, "ts %d enter timeshift mode", ts->id);
606 ts->dobuf = 1;
607 _seek_reset(seek);
608 tmp_file = timeshift_filemgr_newest(ts);
609 if (tmp_file != NULL) {
610 i64 = tmp_file->last;
611 timeshift_file_put(tmp_file);
612 } else {
613 i64 = ts->buf_time;
614 }
615 seek->file = timeshift_filemgr_get(ts, i64);
616 if (seek->file != NULL) {
617 seek->file->roff = seek->file->size;
618 pause_time = seek->file->last;
619 last_time = pause_time;
620 } else {
621 pause_time = i64;
622 last_time = pause_time;
623 }
624 }
625 }
626
627 /* Check keyframe mode */
628 keyframe = (speed < 0) || (speed > 400);
629 if (keyframe != keyframe_mode) {
630 tvhdebug(LS_TIMESHIFT, "using keyframe mode? %s", keyframe ? "yes" : "no");
631 keyframe_mode = keyframe;
632 if (keyframe)
633 seek->frame = NULL;
634 }
635
636 /* Update */
637 if (speed != 100 || state != TS_LIVE) {
638 ts->state = speed == 0 ? TS_PAUSE : TS_PLAY;
639 tvhtrace(LS_TIMESHIFT, "reader - set %s", speed == 0 ? "TS_PAUSE" : "TS_PLAY");
640 }
641 if ((ts->state == TS_PLAY && state != TS_PLAY) || (speed != cur_speed)) {
642 mono_play_time = mono_now;
643 tvhtrace(LS_TIMESHIFT, "update play time TS_LIVE - %"PRId64" play buffer from %"PRId64,
644 mono_now, pause_time);
645 if (speed != cur_speed)
646 pause_time = last_time;
647 } else if (ts->state == TS_PAUSE && state != TS_PAUSE) {
648 pause_time = last_time;
649 }
650 cur_speed = speed;
651 tvhdebug(LS_TIMESHIFT, "ts %d change speed %d", ts->id, speed);
652 }
653
654 /* Send on the message */
655 ctrl->sm_code = speed;
656 streaming_target_deliver2(ts->output, ctrl);
657 ctrl = NULL;
658
659 /* Skip/Seek */
660 } else if (ctrl->sm_type == SMT_SKIP) {
661 skip = ctrl->sm_data;
662 switch (skip->type) {
663 case SMT_SKIP_LIVE:
664 if (ts->state != TS_LIVE) {
665
666 /* Reset */
667 if (ts->full) {
668 timeshift_filemgr_flush(ts, NULL);
669 _seek_reset(seek);
670 ts->full = 0;
671 }
672
673 /* Release */
674 if (sm)
675 streaming_msg_free(sm);
676
677 /* Find end */
678 skip_time = 0x7fffffffffffffffLL;
679 // TODO: change this sometime!
680 }
681 break;
682
683 case SMT_SKIP_ABS_TIME:
684 /* -fallthrough */
685 case SMT_SKIP_REL_TIME:
686
687 /* Convert */
688 skip_time = ts_rescale(skip->time, 1000000);
689 tvhdebug(LS_TIMESHIFT, "ts %d skip %"PRId64" requested %"PRId64, ts->id, skip_time, skip->time);
690
691 /* Live playback (stage1) */
692 if (ts->state == TS_LIVE) {
693 _seek_reset(seek);
694 tmp_file = timeshift_filemgr_newest(ts);
695 if (tmp_file) {
696 i64 = tmp_file->last;
697 timeshift_file_put(tmp_file);
698 }
699 if (tmp_file && (seek->file = timeshift_filemgr_get(ts, i64)) != NULL) {
700 seek->file->roff = seek->file->size;
701 last_time = seek->file->last;
702 } else {
703 last_time = ts->buf_time;
704 }
705 }
706
707 /* May have failed */
708 if (skip->type == SMT_SKIP_REL_TIME)
709 skip_time += last_time;
710 tvhdebug(LS_TIMESHIFT, "ts %d skip time %"PRId64, ts->id, skip_time);
711
712 /* Live (stage2) */
713 if (ts->state == TS_LIVE) {
714 if (skip_time >= ts->buf_time - TIMESHIFT_PLAY_BUF) {
715 tvhdebug(LS_TIMESHIFT, "ts %d skip ignored, already live", ts->id);
716 skip = NULL;
717 } else {
718 ts->state = TS_PLAY;
719 ts->dobuf = 1;
720 tvhtrace(LS_TIMESHIFT, "reader - set TS_PLAY");
721 }
722 }
723
724 /* OK */
725 if (skip) {
726 /* seek */
727 seek->frame = NULL;
728 end = _timeshift_do_skip(ts, skip_time, last_time, seek);
729 if (seek->frame) {
730 pause_time = seek->frame->time;
731 tvhtrace(LS_TIMESHIFT, "ts %d skip - play buffer from %"PRId64" last_time %"PRId64,
732 ts->id, pause_time, last_time);
733
734 /* Adjust time */
735 if (mono_play_time != mono_now)
736 tvhtrace(LS_TIMESHIFT, "ts %d update play time skip - %"PRId64, ts->id, mono_now);
737 mono_play_time = mono_now;
738
739 /* Clear existing packet */
740 if (sm) {
741 streaming_msg_free(sm);
742 sm = NULL;
743 }
744 } else {
745 skip = NULL;
746 }
747 }
748 break;
749 default:
750 tvherror(LS_TIMESHIFT, "ts %d invalid/unsupported skip type: %d", ts->id, skip->type);
751 skip = NULL;
752 break;
753 }
754
755 /* Error */
756 if (!skip) {
757 ((streaming_skip_t*)ctrl->sm_data)->type = SMT_SKIP_ERROR;
758 streaming_target_deliver2(ts->output, ctrl);
759 ctrl = NULL;
760 }
761
762 /* Ignore */
763 } else {
764 streaming_msg_free(ctrl);
765 ctrl = NULL;
766 }
767 }
768 }
769
770
771 /* Done */
772 if (!run || !seek->file || ((ts->state != TS_PLAY && !skip))) {
773 if (mono_now >= (mono_last_status + sec2mono(1))) {
774 timeshift_status(ts, last_time);
775 mono_last_status = mono_now;
776 }
777 pthread_mutex_unlock(&ts->state_mutex);
778 continue;
779 }
780
781 /* Calculate delivery time */
782 deliver0 = (mono_now - mono_play_time) + TIMESHIFT_PLAY_BUF;
783 deliver = (deliver0 * cur_speed) / 100;
784 deliver = (deliver + pause_time);
785 tvhtrace(LS_TIMESHIFT, "speed %d now %"PRId64" play_time %"PRId64" deliver %"PRId64" deliver0 %"PRId64,
786 cur_speed, mono_now, mono_play_time, deliver, deliver0);
787
788 /* Determine next packet */
789 if (!sm) {
790
791 /* Rewind or Fast forward (i-frame only) */
792 if (skip || keyframe_mode) {
793 int64_t req_time;
794
795 /* Time */
796 if (!skip)
797 req_time = last_time + ((cur_speed < 0) ? -1 : 1);
798 else
799 req_time = skip_time;
800
801 end = _timeshift_do_skip(ts, req_time, last_time, seek);
802 }
803
804 /* Clear old message */
805 if (sm) {
806 streaming_msg_free(sm);
807 sm = NULL;
808 }
809
810 /* Find packet */
811 if (_timeshift_read(ts, seek, &sm, &wait) == -1) {
812 pthread_mutex_unlock(&ts->state_mutex);
813 break;
814 }
815 }
816
817 /* Send skip response */
818 if (skip) {
819 if (sm) {
820 /* Status message */
821 skip->time = ts_rescale_inv(sm->sm_time, 1000000);
822 skip->type = SMT_SKIP_ABS_TIME;
823 tvhdebug(LS_TIMESHIFT, "ts %d skip to pts %"PRId64" ok", ts->id, sm->sm_time);
824 /* Update timeshift status */
825 timeshift_fill_status(ts, &skip->timeshift, sm->sm_time);
826 mono_last_status = mono_now;
827 } else {
828 /* Report error */
829 skip->type = SMT_SKIP_ERROR;
830 skip = NULL;
831 tvhdebug(LS_TIMESHIFT, "ts %d skip failed (%d)", ts->id, sm ? sm->sm_type : -1);
832 }
833 streaming_target_deliver2(ts->output, ctrl);
834 } else {
835 streaming_msg_free(ctrl);
836 }
837 ctrl = NULL;
838
839 /* Deliver */
840 if (sm && (skip ||
841 (((cur_speed < 0) && (sm->sm_time >= deliver)) ||
842 ((cur_speed > 0) && (sm->sm_time <= deliver))))) {
843
844 last_time = sm->sm_time;
845 if (!skip && keyframe_mode) /* always send status on keyframe mode */
846 timeshift_status(ts, last_time);
847 timeshift_packet_log("out", ts, sm);
848 streaming_target_deliver2(ts->output, sm);
849 sm = NULL;
850 wait = 0;
851
852 } else if (sm) {
853
854 if (cur_speed > 0)
855 wait = (sm->sm_time - deliver) / 1000;
856 else
857 wait = (deliver - sm->sm_time) / 1000;
858 if (wait == 0) wait = 1;
859 tvhtrace(LS_TIMESHIFT, "ts %d wait %d speed %d sm_time %"PRId64" deliver %"PRId64,
860 ts->id, wait, cur_speed, sm->sm_time, deliver);
861
862 }
863
864 /* Periodic timeshift status */
865 if (mono_now >= (mono_last_status + sec2mono(1))) {
866 timeshift_status(ts, last_time);
867 mono_last_status = mono_now;
868 }
869
870 /* Terminate */
871 if (!seek->file || end != 0) {
872
873 /* Back to live (unless buffer is full) */
874 if ((end == 1 && !ts->full) || !seek->file) {
875 tvhdebug(LS_TIMESHIFT, "ts %d eob revert to live mode", ts->id);
876 cur_speed = 100;
877 ctrl = streaming_msg_create_code(SMT_SPEED, cur_speed);
878 streaming_target_deliver2(ts->output, ctrl);
879 ctrl = NULL;
880 tvhtrace(LS_TIMESHIFT, "reader - set TS_LIVE");
881
882 /* Flush timeshift buffer to live */
883 if (_timeshift_flush_to_live(ts, seek, &wait) == -1) {
884 pthread_mutex_unlock(&ts->state_mutex);
885 break;
886 }
887
888 ts->state = TS_LIVE;
889
890 /* Close file (if open) */
891 _read_close(seek);
892
893 /* Pause */
894 } else {
895 if (cur_speed <= 0) {
896 cur_speed = 0;
897 tvhtrace(LS_TIMESHIFT, "reader - set TS_PAUSE");
898 ts->state = TS_PAUSE;
899 } else {
900 cur_speed = 100;
901 tvhtrace(LS_TIMESHIFT, "reader - set TS_PLAY");
902 if (ts->state != TS_PLAY) {
903 ts->state = TS_PLAY;
904 ts->dobuf = 1;
905 if (mono_play_time != mono_now)
906 tvhtrace(LS_TIMESHIFT, "update play time (pause) - %"PRId64, mono_now);
907 mono_play_time = mono_now;
908 }
909 }
910 tvhdebug(LS_TIMESHIFT, "ts %d sob speed %d last time %"PRId64, ts->id, cur_speed, last_time);
911 pause_time = last_time;
912 ctrl = streaming_msg_create_code(SMT_SPEED, cur_speed);
913 streaming_target_deliver2(ts->output, ctrl);
914 ctrl = NULL;
915 }
916
917 }
918
919 pthread_mutex_unlock(&ts->state_mutex);
920 }
921
922 /* Cleanup */
923 tvhpoll_destroy(pd);
924 _read_close(seek);
925 if (sm) streaming_msg_free(sm);
926 if (ctrl) streaming_msg_free(ctrl);
927 tvhtrace(LS_TIMESHIFT, "ts %d exit reader thread", ts->id);
928
929 return NULL;
930 }
931