1 /*
2  *  Tvheadend - TS file input system
3  *
4  *  Copyright (C) 2013 Adam Sutton
5  *
6  *  This program is free software: you can redistribute it and/or modify
7  *  it under the terms of the GNU General Public License as published by
8  *  the Free Software Foundation, either version 3 of the License, or
9  *  (at your option) any later version.
10  *
11  *  This program is distributed in the hope that it will be useful,
12  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  *  GNU General Public License for more details.
15  *
16  *  You should have received a copy of the GNU General Public License
17  *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
18  */
19 
20 #include "tvheadend.h"
21 #include "tsfile_private.h"
22 #include "input.h"
23 #include "input/mpegts/dvb.h"
24 #include "tvhpoll.h"
25 
26 #include <sys/types.h>
27 #include <sys/stat.h>
28 #include <fcntl.h>
29 #include <unistd.h>
30 #include <errno.h>
31 #include <assert.h>
32 #include <sched.h>
33 
34 extern const idclass_t mpegts_input_class;
35 
36 
37 static void *
tsfile_input_thread(void * aux)38 tsfile_input_thread ( void *aux )
39 {
40   int fd = -1, nfds;
41   size_t len, rem;
42   ssize_t c;
43   tvhpoll_t *efd;
44   tvhpoll_event_t ev;
45   struct stat st;
46   sbuf_t buf;
47   mpegts_pcr_t pcr;
48   int64_t pcr_last = PTS_UNSET;
49   int64_t pcr_last_mono = 0;
50   tsfile_input_t *mi = aux;
51   mpegts_mux_instance_t *mmi;
52   tsfile_mux_instance_t *tmi;
53 
54   /* Open file */
55   pthread_mutex_lock(&global_lock);
56 
57   if ((mmi = LIST_FIRST(&mi->mi_mux_active))) {
58     tmi = (tsfile_mux_instance_t*)mmi;
59     fd  = tvh_open(tmi->mmi_tsfile_path, O_RDONLY | O_NONBLOCK, 0);
60     if (fd == -1)
61       tvherror(LS_TSFILE, "open(%s) failed %d (%s)",
62                tmi->mmi_tsfile_path, errno, strerror(errno));
63     else
64       tvhtrace(LS_TSFILE, "adapter %d opened %s", mi->mi_instance, tmi->mmi_tsfile_path);
65   }
66   pthread_mutex_unlock(&global_lock);
67   if (fd == -1) return NULL;
68 
69   /* Polling */
70   memset(&ev, 0, sizeof(ev));
71   efd = tvhpoll_create(2);
72   ev.events          = TVHPOLL_IN;
73   ev.fd = ev.data.fd = mi->ti_thread_pipe.rd;
74   tvhpoll_add(efd, &ev, 1);
75 
76   /* Alloc memory */
77   sbuf_init_fixed(&buf, 18800);
78 
79   /* Get file length */
80   if (fstat(fd, &st)) {
81     tvherror(LS_TSFILE, "stat() failed %d (%s)",
82              errno, strerror(errno));
83     goto exit;
84   }
85 
86   /* Check for extra (incomplete) packet at end */
87   rem = st.st_size % 188;
88   len = 0;
89   tvhtrace(LS_TSFILE, "adapter %d file size %jd rem %zu",
90            mi->mi_instance, (intmax_t)st.st_size, rem);
91 
92   pcr_last_mono = getfastmonoclock();
93 
94   /* Process input */
95   while (1) {
96 
97     /* Find PCR PID */
98     if (tmi->mmi_tsfile_pcr_pid == MPEGTS_PID_NONE) {
99       mpegts_service_t *s;
100       pthread_mutex_lock(&tsfile_lock);
101       LIST_FOREACH(s, &tmi->mmi_mux->mm_services, s_dvb_mux_link) {
102         if (s->s_pcr_pid)
103           tmi->mmi_tsfile_pcr_pid = s->s_pcr_pid;
104       }
105       pthread_mutex_unlock(&tsfile_lock);
106     }
107 
108     /* Check for terminate */
109     nfds = tvhpoll_wait(efd, &ev, 1, 0);
110     if (nfds == 1) break;
111 
112     /* Read */
113     c = sbuf_read(&buf, fd);
114     if (c < 0) {
115       if (ERRNO_AGAIN(errno))
116         continue;
117       tvherror(LS_TSFILE, "read() error %d (%s)",
118                errno, strerror(errno));
119       break;
120     }
121     len += c;
122 
123     /* Reset */
124     if (len >= st.st_size) {
125       len = 0;
126       c -= rem;
127       tvhtrace(LS_TSFILE, "adapter %d reached eof, resetting", mi->mi_instance);
128       lseek(fd, 0, SEEK_SET);
129       pcr_last = PTS_UNSET;
130     }
131 
132     /* Process */
133     if (c > 0) {
134       pcr.pcr_first = PTS_UNSET;
135       pcr.pcr_last  = PTS_UNSET;
136       pcr.pcr_pid   = tmi->mmi_tsfile_pcr_pid;
137       mpegts_input_recv_packets((mpegts_input_t*)mi, mmi, &buf, 0, &pcr);
138       if (pcr.pcr_pid)
139         tmi->mmi_tsfile_pcr_pid = pcr.pcr_pid;
140 
141       /* Delay */
142       if (pcr.pcr_first != PTS_UNSET) {
143         if (pcr_last != PTS_UNSET) {
144           int64_t delta, r;
145 
146           delta = pcr.pcr_first - pcr_last;
147 
148           if (delta < 0)
149             delta = 0;
150           else if (delta > 90000)
151             delta = 90000;
152           delta *= 11;
153 
154           do {
155             r = tvh_usleep_abs(pcr_last_mono + delta);
156           } while (ERRNO_AGAIN(r) || r > 0);
157         }
158         pcr_last      = pcr.pcr_first;
159         pcr_last_mono = getfastmonoclock();
160       }
161     }
162     sched_yield();
163   }
164 
165 exit:
166   sbuf_free(&buf);
167   tvhpoll_destroy(efd);
168   close(fd);
169   return NULL;
170 }
171 
172 static void
tsfile_input_stop_mux(mpegts_input_t * mi,mpegts_mux_instance_t * mmi)173 tsfile_input_stop_mux ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi )
174 {
175   int err;
176   tsfile_input_t *ti = (tsfile_input_t*)mi;
177 
178   /* Stop thread */
179   if (ti->ti_thread_pipe.rd != -1) {
180     tvhtrace(LS_TSFILE, "adapter %d stopping thread", mi->mi_instance);
181     err = tvh_write(ti->ti_thread_pipe.wr, "", 1);
182     assert(err != -1);
183     pthread_join(ti->ti_thread_id, NULL);
184     tvh_pipe_close(&ti->ti_thread_pipe);
185     tvhtrace(LS_TSFILE, "adapter %d stopped thread", mi->mi_instance);
186   }
187 
188   mmi->mmi_mux->mm_active = NULL;
189   LIST_REMOVE(mmi, mmi_active_link);
190 }
191 
192 static int
tsfile_input_start_mux(mpegts_input_t * mi,mpegts_mux_instance_t * t,int weight)193 tsfile_input_start_mux ( mpegts_input_t *mi, mpegts_mux_instance_t *t, int weight )
194 {
195   struct stat st;
196   mpegts_mux_t          *mm  = t->mmi_mux;
197   tsfile_mux_instance_t *mmi = (tsfile_mux_instance_t*)t;
198   tsfile_input_t        *ti  = (tsfile_input_t*)mi;
199   tvhtrace(LS_TSFILE, "adapter %d starting mmi %p", mi->mi_instance, mmi);
200 
201   /* Already tuned */
202   if (mmi->mmi_mux->mm_active == t) {
203     tvhtrace(LS_TSFILE, "mmi %p is already active", mmi);
204     return 0;
205   }
206   assert(mmi->mmi_mux->mm_active == NULL);
207   assert(LIST_FIRST(&mi->mi_mux_active) == NULL);
208 
209   /* Check file is accessible */
210   if (lstat(mmi->mmi_tsfile_path, &st)) {
211     tvherror(LS_TSFILE, "mmi %p could not stat '%s' (%i)",
212              mmi, mmi->mmi_tsfile_path, errno);
213     mmi->mmi_tune_failed = 1;
214     return SM_CODE_TUNING_FAILED;
215   }
216 
217   /* Start thread */
218   if (ti->ti_thread_pipe.rd == -1) {
219     if (tvh_pipe(O_NONBLOCK, &ti->ti_thread_pipe)) {
220       mmi->mmi_tune_failed = 1;
221       tvherror(LS_TSFILE, "failed to create thread pipe");
222       return SM_CODE_TUNING_FAILED;
223     }
224     tvhtrace(LS_TSFILE, "adapter %d starting thread", mi->mi_instance);
225     tvhthread_create(&ti->ti_thread_id, NULL, tsfile_input_thread, mi, "tsfile");
226   }
227 
228   /* Current */
229   mmi->mmi_mux->mm_active = t;
230 
231   /* Install table handlers */
232   psi_tables_install(mi, mm, DVB_SYS_UNKNOWN);
233 
234   return 0;
235 }
236 
237 tsfile_input_t *
tsfile_input_create(int idx)238 tsfile_input_create ( int idx )
239 {
240   tsfile_input_t *mi;
241 
242   /* Create object */
243   mi = (tsfile_input_t*)
244     mpegts_input_create0(calloc(1, sizeof(tsfile_input_t)),
245                          &mpegts_input_class,
246                          NULL, NULL);
247   mi->mi_instance       = idx;
248   mi->mi_enabled        = 1;
249   mi->mi_start_mux      = tsfile_input_start_mux;
250   mi->mi_stop_mux       = tsfile_input_stop_mux;
251   LIST_INSERT_HEAD(&tsfile_inputs, mi, tsi_link);
252   if (!mi->mi_name)
253     mi->mi_name = strdup("TSFile");
254 
255   mi->ti_thread_pipe.rd = mi->ti_thread_pipe.wr = -1;
256 
257   /* Start table thread */
258   return mi;
259 }
260 
261 /******************************************************************************
262  * Editor Configuration
263  *
264  * vim:sts=2:ts=2:sw=2:et
265  *****************************************************************************/
266