1 /**
2  *  TV headend - Timeshift Reader
3  *  Copyright (C) 2012 Adam Sutton
4  *
5  *  This program is free software: you can redistribute it and/or modify
6  *  it under the terms of the GNU General Public License as published by
7  *  the Free Software Foundation, either version 3 of the License, or
8  *  (at your option) any later version.
9  *
10  *  This program is distributed in the hope that it will be useful,
11  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
12  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  *  GNU General Public License for more details.
14  *
15  *  You should have received a copy of the GNU General Public License
16  *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
17  */
18 
19 #include "tvheadend.h"
20 #include "streaming.h"
21 #include "timeshift.h"
22 #include "timeshift/private.h"
23 #include "atomic.h"
24 #include "tvhpoll.h"
25 
26 #include <sys/types.h>
27 #include <sys/stat.h>
28 #include <unistd.h>
29 #include <fcntl.h>
30 #include <string.h>
31 #include <assert.h>
32 
33 #if ENABLE_EPOLL
34 #include <sys/epoll.h>
35 #elif ENABLE_KQUEUE
36 #include <sys/event.h>
37 #include <sys/time.h>
38 #endif
39 
40 /* **************************************************************************
41  * Buffered position handling
42  * *************************************************************************/
43 
_seek_reset(timeshift_seek_t * seek)44 static timeshift_seek_t *_seek_reset ( timeshift_seek_t *seek )
45 {
46   timeshift_file_t *tsf = seek->file;
47   seek->file  = NULL;
48   seek->frame = NULL;
49   timeshift_file_put(tsf);
50   return seek;
51 }
52 
_seek_set_file(timeshift_seek_t * seek,timeshift_file_t * tsf,off_t roff)53 static timeshift_seek_t *_seek_set_file
54   ( timeshift_seek_t *seek, timeshift_file_t *tsf, off_t roff )
55 {
56   seek->file  = tsf;
57   seek->frame = NULL;
58   if (tsf)
59     tsf->roff = roff;
60   return seek;
61 }
62 
_read_close(timeshift_seek_t * seek)63 static timeshift_seek_t *_read_close ( timeshift_seek_t *seek )
64 {
65   if (seek->file && seek->file->rfd >= 0) {
66     close(seek->file->rfd);
67     seek->file->rfd = -1;
68   }
69   return _seek_reset(seek);
70 }
71 
72 /* **************************************************************************
73  * File Reading
74  * *************************************************************************/
75 
_read_buf(timeshift_file_t * tsf,int fd,void * buf,size_t size)76 static ssize_t _read_buf ( timeshift_file_t *tsf, int fd, void *buf, size_t size )
77 {
78   ssize_t r;
79   size_t ret;
80 
81   if (tsf && tsf->ram) {
82     if (tsf->roff == tsf->woff) return 0;
83     if (tsf->roff + size > tsf->woff) return -1;
84     pthread_mutex_lock(&tsf->ram_lock);
85     memcpy(buf, tsf->ram + tsf->roff, size);
86     tsf->roff += size;
87     pthread_mutex_unlock(&tsf->ram_lock);
88     return size;
89   } else {
90     ret = 0;
91     while (size > 0) {
92       r = read(tsf ? tsf->rfd : fd, buf, size);
93       if (r < 0) {
94         if (ERRNO_AGAIN(errno))
95           continue;
96         tvhtrace(LS_TIMESHIFT, "read errno %d", errno);
97         return -1;
98       }
99       if (r > 0) {
100         size -= r;
101         ret += r;
102         buf += r;
103       }
104       if (r == 0)
105         return 0;
106     }
107     if (ret > 0 && tsf)
108       tsf->roff += ret;
109     return ret;
110   }
111 }
112 
_read_pktbuf(timeshift_file_t * tsf,int fd,pktbuf_t ** pktbuf)113 static ssize_t _read_pktbuf ( timeshift_file_t *tsf, int fd, pktbuf_t **pktbuf )
114 {
115   ssize_t r, cnt = 0;
116   size_t sz;
117 
118   /* Size */
119   r = _read_buf(tsf, fd, &sz, sizeof(sz));
120   if (r < 0) return -1;
121   if (r != sizeof(sz)) return 0;
122   cnt += r;
123 
124   /* Empty And Sanity Check */
125   if (!sz || sz > 1024 * 1024) {
126     *pktbuf = NULL;
127     return cnt;
128   }
129 
130   /* Data */
131   *pktbuf = pktbuf_alloc(NULL, sz);
132   r = _read_buf(tsf, fd, pktbuf_ptr(*pktbuf), sz);
133   if (r != sz) {
134     pktbuf_destroy(*pktbuf);
135     *pktbuf = NULL;
136     return r < 0 ? -1 : 0;
137   }
138   cnt += r;
139 
140   return cnt;
141 }
142 
143 
_read_msg(timeshift_file_t * tsf,int fd,streaming_message_t ** sm)144 static ssize_t _read_msg ( timeshift_file_t *tsf, int fd, streaming_message_t **sm )
145 {
146   ssize_t r, cnt = 0;
147   size_t sz;
148   streaming_message_type_t type;
149   int64_t time;
150   void *data;
151   int code;
152 
153   /* Clear */
154   *sm = NULL;
155 
156   /* Size */
157   r = _read_buf(tsf, fd, &sz, sizeof(sz));
158   if (r < 0) return -1;
159   if (r != sizeof(sz)) return 0;
160   cnt += r;
161 
162   /* EOF */
163   if (sz == 0) return cnt;
164 
165   /* Wrong data size */
166   if (sz > 1024 * 1024) {
167     tvhtrace(LS_TIMESHIFT, "wrong msg size (%lld/0x%llx)", (long long)sz, (long long)sz);
168     return -1;
169   }
170 
171   /* Type */
172   r = _read_buf(tsf, fd, &type, sizeof(type));
173   if (r < 0) return -1;
174   if (r != sizeof(type)) return 0;
175   cnt += r;
176 
177   /* Time */
178   r = _read_buf(tsf, fd, &time, sizeof(time));
179   if (r < 0) return -1;
180   if (r != sizeof(time)) return 0;
181   cnt += r;
182 
183   /* Adjust size */
184   sz -= sizeof(type) + sizeof(time);
185   cnt += sz;
186 
187   /* Standard messages */
188   switch (type) {
189 
190     /* Unhandled */
191     case SMT_START:
192     case SMT_NOSTART:
193     case SMT_NOSTART_WARN:
194     case SMT_SERVICE_STATUS:
195     case SMT_DESCRAMBLE_INFO:
196       return -1;
197       break;
198 
199     /* Code */
200     case SMT_STOP:
201     case SMT_EXIT:
202     case SMT_SPEED:
203       if (sz != sizeof(code)) return -1;
204       r = _read_buf(tsf, fd, &code, sz);
205       if (r != sz) {
206         if (r < 0) return -1;
207         return 0;
208       }
209       *sm = streaming_msg_create_code(type, code);
210       break;
211 
212     /* Data */
213     case SMT_SKIP:
214     case SMT_SIGNAL_STATUS:
215     case SMT_MPEGTS:
216     case SMT_PACKET:
217       data = malloc(sz);
218       r = _read_buf(tsf, fd, data, sz);
219       if (r != sz) {
220         free(data);
221         if (r < 0) return -1;
222         return 0;
223       }
224       if (type == SMT_PACKET) {
225         th_pkt_t *pkt = data;
226         pkt->pkt_payload  = pkt->pkt_meta = NULL;
227         pkt->pkt_refcount = 0;
228         *sm = streaming_msg_create_pkt(pkt);
229         r   = _read_pktbuf(tsf, fd, &pkt->pkt_meta);
230         if (r < 0) {
231           streaming_msg_free(*sm);
232           return r;
233         }
234         cnt += r;
235         r   = _read_pktbuf(tsf, fd, &pkt->pkt_payload);
236         if (r < 0) {
237           streaming_msg_free(*sm);
238           return r;
239         }
240         cnt += r;
241       } else {
242         *sm = streaming_msg_create_data(type, data);
243       }
244       (*sm)->sm_time = time;
245       break;
246 
247     default:
248       return -1;
249   }
250 
251   /* OK */
252   return cnt;
253 }
254 
255 /* **************************************************************************
256  * Utilities
257  * *************************************************************************/
258 
_timeshift_first_time(timeshift_t * ts,int * active)259 static int64_t _timeshift_first_time
260   ( timeshift_t *ts, int *active )
261 {
262   int64_t ret = 0;
263   int end;
264   timeshift_index_iframe_t *tsi = NULL;
265   timeshift_file_t *tsf = timeshift_filemgr_oldest(ts);
266   while (tsf && !tsi) {
267     if (!(tsi = TAILQ_FIRST(&tsf->iframes)))
268       tsf = timeshift_filemgr_next(tsf, &end, 0);
269   }
270   if (tsi) {
271     *active = 1;
272     ret = tsi->time;
273   }
274   timeshift_file_put(tsf);
275   return ret;
276 }
277 
_timeshift_skip(timeshift_t * ts,int64_t req_time,int64_t cur_time,timeshift_seek_t * seek,timeshift_seek_t * nseek)278 static int _timeshift_skip
279   ( timeshift_t *ts, int64_t req_time, int64_t cur_time,
280     timeshift_seek_t *seek, timeshift_seek_t *nseek )
281 {
282   timeshift_index_iframe_t *tsi  = seek->frame;
283   timeshift_file_t         *tsf  = seek->file, *tsf_last;
284   int64_t                   sec  = mono2sec(req_time) / TIMESHIFT_FILE_PERIOD;
285   int                       back = (req_time < cur_time) ? 1 : 0;
286   int                       end  = 0;
287 
288   /* Coarse search */
289   if (!tsi) {
290     while (tsf && !end) {
291       if (back) {
292         if ((tsf->time <= sec) &&
293             (tsi = TAILQ_LAST(&tsf->iframes, timeshift_index_iframe_list)))
294           break;
295         tsf = timeshift_filemgr_prev(tsf, &end, 1);
296       } else {
297         if ((tsf->time >= sec) &&
298             (tsi = TAILQ_FIRST(&tsf->iframes)))
299           break;
300         tsf = timeshift_filemgr_next(tsf, &end, 0);
301       }
302       tsi = NULL;
303     }
304   }
305 
306   /* Fine search */
307   if (back) {
308     while (!end && tsf && tsi && (tsi->time > req_time)) {
309       tsi = TAILQ_PREV(tsi, timeshift_index_iframe_list, link);
310       while (!end && tsf && !tsi) {
311         if ((tsf = timeshift_filemgr_prev(tsf, &end, 1)))
312           tsi = TAILQ_LAST(&tsf->iframes, timeshift_index_iframe_list);
313       }
314     }
315   } else {
316     while (!end && tsf && tsi && (tsi->time < req_time)) {
317       tsi = TAILQ_NEXT(tsi, link);
318       while (!end && tsf && !tsi) {
319         if ((tsf = timeshift_filemgr_next(tsf, &end, 0)))
320           tsi = TAILQ_FIRST(&tsf->iframes);
321       }
322     }
323   }
324 
325   /* End */
326   if (!tsf || !tsi)
327     end = 1;
328 
329   /* Find start/end of buffer */
330   if (end) {
331     timeshift_file_put(tsf);
332     if (back) {
333       tsf = tsf_last = timeshift_filemgr_oldest(ts);
334       tsi = NULL;
335       while (tsf && !tsi) {
336         tsf_last = tsf;
337         if (!(tsi = TAILQ_FIRST(&tsf->iframes)))
338           tsf = timeshift_filemgr_next(tsf, &end, 0);
339       }
340       if (!tsf)
341         tsf = tsf_last;
342       end = -1;
343     } else {
344       tsf = tsf_last = timeshift_filemgr_newest(ts);
345       tsi = NULL;
346       while (tsf && !tsi) {
347         tsf_last = tsf;
348         if (!(tsi = TAILQ_LAST(&tsf->iframes, timeshift_index_iframe_list)))
349           tsf = timeshift_filemgr_prev(tsf, &end, 0);
350       }
351       if (!tsf)
352         tsf = tsf_last;
353       end = 1;
354     }
355   }
356 
357   /* Done */
358   nseek->file  = tsf;
359   nseek->frame = tsi;
360   return end;
361 }
362 
363 /*
364  *
365  */
_timeshift_do_skip(timeshift_t * ts,int64_t req_time,int64_t last_time,timeshift_seek_t * seek)366 static int _timeshift_do_skip
367   ( timeshift_t *ts, int64_t req_time, int64_t last_time,
368     timeshift_seek_t *seek )
369 {
370   timeshift_seek_t nseek;
371   int end;
372 
373   tvhdebug(LS_TIMESHIFT, "ts %d skip to %"PRId64" from %"PRId64,
374            ts->id, req_time, last_time);
375 
376   timeshift_file_get(seek->file);
377 
378   /* Find */
379   end = _timeshift_skip(ts, req_time, last_time, seek, &nseek);
380   if (nseek.frame)
381     tvhdebug(LS_TIMESHIFT, "ts %d skip found pkt @ %"PRId64,
382              ts->id, nseek.frame->time);
383 
384   /* File changed (close) */
385   if (nseek.file != seek->file)
386     _read_close(seek);
387 
388   timeshift_file_put(seek->file);
389 
390   /* Position */
391   *seek = nseek;
392   if (nseek.file != NULL) {
393     if (nseek.frame)
394       nseek.file->roff = nseek.frame->pos;
395     else
396       nseek.file->roff = req_time > last_time ? nseek.file->size : 0;
397     tvhtrace(LS_TIMESHIFT, "do skip seek->file %p roff %"PRId64,
398              nseek.file, (int64_t)nseek.file->roff);
399   }
400 
401   return end;
402 }
403 
404 
405 /*
406  * Output packet
407  */
_timeshift_read(timeshift_t * ts,timeshift_seek_t * seek,streaming_message_t ** sm,int * wait)408 static int _timeshift_read
409   ( timeshift_t *ts, timeshift_seek_t *seek,
410     streaming_message_t **sm, int *wait )
411 {
412   timeshift_file_t *tsf = seek->file;
413   ssize_t r;
414   off_t off = 0;
415 
416   *sm = NULL;
417 
418   if (tsf) {
419 
420     /* Open file */
421     if (tsf->rfd < 0 && !tsf->ram) {
422       tsf->rfd = tvh_open(tsf->path, O_RDONLY, 0);
423       tvhtrace(LS_TIMESHIFT, "ts %d open file %s (fd %i)", ts->id, tsf->path, tsf->rfd);
424       if (tsf->rfd < 0)
425         return -1;
426     }
427     if (tsf->rfd >= 0)
428       if ((off = lseek(tsf->rfd, tsf->roff, SEEK_SET)) != tsf->roff)
429         tvherror(LS_TIMESHIFT, "ts %d seek to %s failed (off %"PRId64" != %"PRId64"): %s",
430                  ts->id, tsf->path, (int64_t)tsf->roff, (int64_t)off, strerror(errno));
431 
432     /* Read msg */
433     r = _read_msg(tsf, -1, sm);
434     if (r < 0) {
435       streaming_message_t *e = streaming_msg_create_code(SMT_STOP, SM_CODE_UNDEFINED_ERROR);
436       streaming_target_deliver2(ts->output, e);
437       tvhtrace(LS_TIMESHIFT, "ts %d seek to %jd (woff %jd) (fd %i)", ts->id, (intmax_t)off, (intmax_t)tsf->woff, tsf->rfd);
438       tvherror(LS_TIMESHIFT, "ts %d could not read buffer", ts->id);
439       return -1;
440     }
441     tvhtrace(LS_TIMESHIFT, "ts %d seek to %jd (fd %i) read msg %p/%"PRId64" (%"PRId64")",
442              ts->id, (intmax_t)off, tsf->rfd, *sm, *sm ? (*sm)->sm_time : -1, (int64_t)r);
443 
444     /* Special case - EOF */
445     if (r <= sizeof(size_t) || tsf->roff > tsf->size || *sm == NULL) {
446       timeshift_file_get(seek->file); /* _read_close decreases file reference */
447       _read_close(seek);
448       _seek_set_file(seek, timeshift_filemgr_next(tsf, NULL, 0), 0);
449       *wait     = 0;
450       tvhtrace(LS_TIMESHIFT, "ts %d eof, seek->file %p (prev %p)", ts->id, seek->file, tsf);
451       timeshift_filemgr_dump(ts);
452     }
453   }
454   return 0;
455 }
456 
457 /*
458  * Flush all data to live
459  */
_timeshift_flush_to_live(timeshift_t * ts,timeshift_seek_t * seek,int * wait)460 static int _timeshift_flush_to_live
461   ( timeshift_t *ts, timeshift_seek_t *seek, int *wait )
462 {
463   streaming_message_t *sm;
464 
465   while (seek->file) {
466     if (_timeshift_read(ts, seek, &sm, wait) == -1)
467       return -1;
468     if (!sm) break;
469     timeshift_packet_log("ouf", ts, sm);
470     streaming_target_deliver2(ts->output, sm);
471   }
472   return 0;
473 }
474 
475 /*
476  * Send the status message
477  */
timeshift_fill_status(timeshift_t * ts,timeshift_status_t * status,int64_t current_time)478 static void timeshift_fill_status
479   ( timeshift_t *ts, timeshift_status_t *status, int64_t current_time )
480 {
481   int active = 0;
482   int64_t start, end;
483 
484   start = _timeshift_first_time(ts, &active);
485   end   = ts->buf_time;
486   if (ts->state <= TS_LIVE) {
487     current_time = end;
488   } else {
489     if (current_time < 0)
490       current_time = 0;
491     if (current_time > end)
492       current_time = end;
493   }
494   status->full = ts->full;
495   tvhtrace(LS_TIMESHIFT, "ts %d status start %"PRId64" end %"PRId64
496                         " current %"PRId64" state %d",
497            ts->id, start, end, current_time, ts->state);
498   status->shift = ts_rescale_inv(end - current_time, 1000000);
499   if (active) {
500     status->pts_start = ts_rescale_inv(start, 1000000);
501     status->pts_end   = ts_rescale_inv(end,   1000000);
502   } else {
503     status->pts_start = PTS_UNSET;
504     status->pts_end   = PTS_UNSET;
505   }
506 }
507 
timeshift_status(timeshift_t * ts,int64_t current_time)508 static void timeshift_status
509   ( timeshift_t *ts, int64_t current_time )
510 {
511   streaming_message_t *tsm;
512   timeshift_status_t *status;
513 
514   status = calloc(1, sizeof(timeshift_status_t));
515   timeshift_fill_status(ts, status, current_time);
516   tsm = streaming_msg_create_data(SMT_TIMESHIFT_STATUS, status);
517   streaming_target_deliver2(ts->output, tsm);
518 }
519 
520 /* **************************************************************************
521  * Thread
522  * *************************************************************************/
523 
524 
525 /*
526  * Timeshift thread
527  */
timeshift_reader(void * p)528 void *timeshift_reader ( void *p )
529 {
530   timeshift_t *ts = p;
531   int nfds, end, run = 1, wait = -1, state;
532   timeshift_seek_t *seek = &ts->seek;
533   timeshift_file_t *tmp_file;
534   int cur_speed = 100, keyframe_mode = 0;
535   int64_t mono_now, mono_play_time = 0, mono_last_status = 0;
536   int64_t deliver, deliver0, pause_time = 0, last_time = 0, skip_time = 0;
537   int64_t i64;
538   streaming_message_t *sm = NULL, *ctrl = NULL;
539   streaming_skip_t *skip = NULL;
540   tvhpoll_t *pd;
541   tvhpoll_event_t ev = { 0 };
542 
543   pd = tvhpoll_create(1);
544   ev.fd     = ts->rd_pipe.rd;
545   ev.events = TVHPOLL_IN;
546   tvhpoll_add(pd, &ev, 1);
547 
548   /* Output */
549   while (run) {
550 
551     // Note: Previously we allowed unlimited wait, but we now must wake periodically
552     //       to output status message
553     if (wait < 0 || wait > 1000)
554       wait = 1000;
555 
556     /* Wait for data */
557     if(wait)
558       nfds = tvhpoll_wait(pd, &ev, 1, wait);
559     else
560       nfds = 0;
561     wait      = -1;
562     end       = 0;
563     skip      = NULL;
564     mono_now  = getfastmonoclock();
565 
566     /* Control */
567     pthread_mutex_lock(&ts->state_mutex);
568     if (nfds == 1) {
569       if (_read_msg(NULL, ts->rd_pipe.rd, &ctrl) > 0) {
570 
571         /* Exit */
572         if (ctrl->sm_type == SMT_EXIT) {
573           tvhtrace(LS_TIMESHIFT, "ts %d read exit request", ts->id);
574           run = 0;
575           streaming_msg_free(ctrl);
576           ctrl = NULL;
577 
578         /* Speed */
579         } else if (ctrl->sm_type == SMT_SPEED) {
580           int speed = ctrl->sm_code;
581           int keyframe;
582 
583           /* Bound it */
584           if (speed > 3200)  speed = 3200;
585           if (speed < -3200) speed = -3200;
586 
587           /* Ignore negative */
588           if (!ts->dobuf && (speed < 0))
589             speed = seek->file ? speed : 0;
590 
591           /* Process */
592           if (cur_speed != speed) {
593 
594             /* Live playback */
595             state = ts->state;
596             if (state == TS_LIVE) {
597 
598               /* Reject */
599               if (speed >= 100) {
600                 tvhdebug(LS_TIMESHIFT, "ts %d reject 1x+ in live mode", ts->id);
601                 speed = 100;
602 
603               /* Set position */
604               } else {
605                 tvhdebug(LS_TIMESHIFT, "ts %d enter timeshift mode", ts->id);
606                 ts->dobuf = 1;
607                 _seek_reset(seek);
608                 tmp_file = timeshift_filemgr_newest(ts);
609                 if (tmp_file != NULL) {
610                   i64 = tmp_file->last;
611                   timeshift_file_put(tmp_file);
612                 } else {
613                   i64 = ts->buf_time;
614                 }
615                 seek->file = timeshift_filemgr_get(ts, i64);
616                 if (seek->file != NULL) {
617                   seek->file->roff = seek->file->size;
618                   pause_time       = seek->file->last;
619                   last_time        = pause_time;
620                 } else {
621                   pause_time       = i64;
622                   last_time        = pause_time;
623                 }
624               }
625             }
626 
627             /* Check keyframe mode */
628             keyframe      = (speed < 0) || (speed > 400);
629             if (keyframe != keyframe_mode) {
630               tvhdebug(LS_TIMESHIFT, "using keyframe mode? %s", keyframe ? "yes" : "no");
631               keyframe_mode = keyframe;
632               if (keyframe)
633                 seek->frame = NULL;
634             }
635 
636             /* Update */
637             if (speed != 100 || state != TS_LIVE) {
638               ts->state = speed == 0 ? TS_PAUSE : TS_PLAY;
639               tvhtrace(LS_TIMESHIFT, "reader - set %s", speed == 0 ? "TS_PAUSE" : "TS_PLAY");
640             }
641             if ((ts->state == TS_PLAY && state != TS_PLAY) || (speed != cur_speed)) {
642               mono_play_time = mono_now;
643               tvhtrace(LS_TIMESHIFT, "update play time TS_LIVE - %"PRId64" play buffer from %"PRId64,
644                        mono_now, pause_time);
645               if (speed != cur_speed)
646                 pause_time = last_time;
647             } else if (ts->state == TS_PAUSE && state != TS_PAUSE) {
648               pause_time = last_time;
649             }
650             cur_speed = speed;
651             tvhdebug(LS_TIMESHIFT, "ts %d change speed %d", ts->id, speed);
652           }
653 
654           /* Send on the message */
655           ctrl->sm_code = speed;
656           streaming_target_deliver2(ts->output, ctrl);
657           ctrl = NULL;
658 
659         /* Skip/Seek */
660         } else if (ctrl->sm_type == SMT_SKIP) {
661           skip = ctrl->sm_data;
662           switch (skip->type) {
663             case SMT_SKIP_LIVE:
664               if (ts->state != TS_LIVE) {
665 
666                 /* Reset */
667                 if (ts->full) {
668                   timeshift_filemgr_flush(ts, NULL);
669                   _seek_reset(seek);
670                   ts->full = 0;
671                 }
672 
673                 /* Release */
674                 if (sm)
675                   streaming_msg_free(sm);
676 
677                 /* Find end */
678                 skip_time = 0x7fffffffffffffffLL;
679                 // TODO: change this sometime!
680               }
681               break;
682 
683             case SMT_SKIP_ABS_TIME:
684               /* -fallthrough */
685             case SMT_SKIP_REL_TIME:
686 
687               /* Convert */
688               skip_time = ts_rescale(skip->time, 1000000);
689               tvhdebug(LS_TIMESHIFT, "ts %d skip %"PRId64" requested %"PRId64, ts->id, skip_time, skip->time);
690 
691               /* Live playback (stage1) */
692               if (ts->state == TS_LIVE) {
693                 _seek_reset(seek);
694                 tmp_file = timeshift_filemgr_newest(ts);
695                 if (tmp_file) {
696                   i64 = tmp_file->last;
697                   timeshift_file_put(tmp_file);
698                 }
699                 if (tmp_file && (seek->file = timeshift_filemgr_get(ts, i64)) != NULL) {
700                   seek->file->roff = seek->file->size;
701                   last_time        = seek->file->last;
702                 } else {
703                   last_time        = ts->buf_time;
704                 }
705               }
706 
707               /* May have failed */
708               if (skip->type == SMT_SKIP_REL_TIME)
709                 skip_time += last_time;
710               tvhdebug(LS_TIMESHIFT, "ts %d skip time %"PRId64, ts->id, skip_time);
711 
712               /* Live (stage2) */
713               if (ts->state == TS_LIVE) {
714                 if (skip_time >= ts->buf_time - TIMESHIFT_PLAY_BUF) {
715                   tvhdebug(LS_TIMESHIFT, "ts %d skip ignored, already live", ts->id);
716                   skip = NULL;
717                 } else {
718                   ts->state = TS_PLAY;
719                   ts->dobuf = 1;
720                   tvhtrace(LS_TIMESHIFT, "reader - set TS_PLAY");
721                 }
722               }
723 
724               /* OK */
725               if (skip) {
726                 /* seek */
727                 seek->frame = NULL;
728                 end = _timeshift_do_skip(ts, skip_time, last_time, seek);
729                 if (seek->frame) {
730                   pause_time = seek->frame->time;
731                   tvhtrace(LS_TIMESHIFT, "ts %d skip - play buffer from %"PRId64" last_time %"PRId64,
732                            ts->id, pause_time, last_time);
733 
734                   /* Adjust time */
735                   if (mono_play_time != mono_now)
736                     tvhtrace(LS_TIMESHIFT, "ts %d update play time skip - %"PRId64, ts->id, mono_now);
737                   mono_play_time = mono_now;
738 
739                   /* Clear existing packet */
740                   if (sm) {
741                     streaming_msg_free(sm);
742                     sm = NULL;
743                   }
744                 } else {
745                   skip = NULL;
746                 }
747               }
748               break;
749             default:
750               tvherror(LS_TIMESHIFT, "ts %d invalid/unsupported skip type: %d", ts->id, skip->type);
751               skip = NULL;
752               break;
753           }
754 
755           /* Error */
756           if (!skip) {
757             ((streaming_skip_t*)ctrl->sm_data)->type = SMT_SKIP_ERROR;
758             streaming_target_deliver2(ts->output, ctrl);
759             ctrl = NULL;
760           }
761 
762         /* Ignore */
763         } else {
764           streaming_msg_free(ctrl);
765           ctrl = NULL;
766         }
767       }
768     }
769 
770 
771     /* Done */
772     if (!run || !seek->file || ((ts->state != TS_PLAY && !skip))) {
773       if (mono_now >= (mono_last_status + sec2mono(1))) {
774         timeshift_status(ts, last_time);
775         mono_last_status = mono_now;
776       }
777       pthread_mutex_unlock(&ts->state_mutex);
778       continue;
779     }
780 
781     /* Calculate delivery time */
782     deliver0 = (mono_now - mono_play_time) + TIMESHIFT_PLAY_BUF;
783     deliver = (deliver0 * cur_speed) / 100;
784     deliver = (deliver + pause_time);
785     tvhtrace(LS_TIMESHIFT, "speed %d now %"PRId64" play_time %"PRId64" deliver %"PRId64" deliver0 %"PRId64,
786              cur_speed, mono_now, mono_play_time, deliver, deliver0);
787 
788     /* Determine next packet */
789     if (!sm) {
790 
791       /* Rewind or Fast forward (i-frame only) */
792       if (skip || keyframe_mode) {
793         int64_t req_time;
794 
795         /* Time */
796         if (!skip)
797           req_time = last_time + ((cur_speed < 0) ? -1 : 1);
798         else
799           req_time = skip_time;
800 
801         end = _timeshift_do_skip(ts, req_time, last_time, seek);
802       }
803 
804       /* Clear old message */
805       if (sm) {
806         streaming_msg_free(sm);
807         sm = NULL;
808       }
809 
810       /* Find packet */
811       if (_timeshift_read(ts, seek, &sm, &wait) == -1) {
812         pthread_mutex_unlock(&ts->state_mutex);
813         break;
814       }
815     }
816 
817     /* Send skip response */
818     if (skip) {
819       if (sm) {
820         /* Status message */
821         skip->time = ts_rescale_inv(sm->sm_time, 1000000);
822         skip->type = SMT_SKIP_ABS_TIME;
823         tvhdebug(LS_TIMESHIFT, "ts %d skip to pts %"PRId64" ok", ts->id, sm->sm_time);
824         /* Update timeshift status */
825         timeshift_fill_status(ts, &skip->timeshift, sm->sm_time);
826         mono_last_status = mono_now;
827       } else {
828         /* Report error */
829         skip->type = SMT_SKIP_ERROR;
830         skip       = NULL;
831         tvhdebug(LS_TIMESHIFT, "ts %d skip failed (%d)", ts->id, sm ? sm->sm_type : -1);
832       }
833       streaming_target_deliver2(ts->output, ctrl);
834     } else {
835       streaming_msg_free(ctrl);
836     }
837     ctrl = NULL;
838 
839     /* Deliver */
840     if (sm && (skip ||
841                (((cur_speed < 0) && (sm->sm_time >= deliver)) ||
842                 ((cur_speed > 0) && (sm->sm_time <= deliver))))) {
843 
844       last_time = sm->sm_time;
845       if (!skip && keyframe_mode) /* always send status on keyframe mode */
846         timeshift_status(ts, last_time);
847       timeshift_packet_log("out", ts, sm);
848       streaming_target_deliver2(ts->output, sm);
849       sm        = NULL;
850       wait      = 0;
851 
852     } else if (sm) {
853 
854       if (cur_speed > 0)
855         wait = (sm->sm_time - deliver) / 1000;
856       else
857         wait = (deliver - sm->sm_time) / 1000;
858       if (wait == 0) wait = 1;
859       tvhtrace(LS_TIMESHIFT, "ts %d wait %d speed %d sm_time %"PRId64" deliver %"PRId64,
860                ts->id, wait, cur_speed, sm->sm_time, deliver);
861 
862     }
863 
864     /* Periodic timeshift status */
865     if (mono_now >= (mono_last_status + sec2mono(1))) {
866       timeshift_status(ts, last_time);
867       mono_last_status = mono_now;
868     }
869 
870     /* Terminate */
871     if (!seek->file || end != 0) {
872 
873       /* Back to live (unless buffer is full) */
874       if ((end == 1 && !ts->full) || !seek->file) {
875         tvhdebug(LS_TIMESHIFT, "ts %d eob revert to live mode", ts->id);
876         cur_speed = 100;
877         ctrl      = streaming_msg_create_code(SMT_SPEED, cur_speed);
878         streaming_target_deliver2(ts->output, ctrl);
879         ctrl      = NULL;
880         tvhtrace(LS_TIMESHIFT, "reader - set TS_LIVE");
881 
882         /* Flush timeshift buffer to live */
883         if (_timeshift_flush_to_live(ts, seek, &wait) == -1) {
884           pthread_mutex_unlock(&ts->state_mutex);
885           break;
886         }
887 
888         ts->state = TS_LIVE;
889 
890         /* Close file (if open) */
891         _read_close(seek);
892 
893       /* Pause */
894       } else {
895         if (cur_speed <= 0) {
896           cur_speed = 0;
897           tvhtrace(LS_TIMESHIFT, "reader - set TS_PAUSE");
898           ts->state = TS_PAUSE;
899         } else {
900           cur_speed = 100;
901           tvhtrace(LS_TIMESHIFT, "reader - set TS_PLAY");
902           if (ts->state != TS_PLAY) {
903             ts->state = TS_PLAY;
904             ts->dobuf = 1;
905             if (mono_play_time != mono_now)
906               tvhtrace(LS_TIMESHIFT, "update play time (pause) - %"PRId64, mono_now);
907             mono_play_time = mono_now;
908           }
909         }
910         tvhdebug(LS_TIMESHIFT, "ts %d sob speed %d last time %"PRId64, ts->id, cur_speed, last_time);
911         pause_time = last_time;
912         ctrl       = streaming_msg_create_code(SMT_SPEED, cur_speed);
913         streaming_target_deliver2(ts->output, ctrl);
914         ctrl       = NULL;
915       }
916 
917     }
918 
919     pthread_mutex_unlock(&ts->state_mutex);
920   }
921 
922   /* Cleanup */
923   tvhpoll_destroy(pd);
924   _read_close(seek);
925   if (sm)       streaming_msg_free(sm);
926   if (ctrl)     streaming_msg_free(ctrl);
927   tvhtrace(LS_TIMESHIFT, "ts %d exit reader thread", ts->id);
928 
929   return NULL;
930 }
931