1 /*
2 * Tvheadend - TS file input system
3 *
4 * Copyright (C) 2013 Adam Sutton
5 *
6 * This program is free software: you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation, either version 3 of the License, or
9 * (at your option) any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 *
16 * You should have received a copy of the GNU General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "tvheadend.h"
21 #include "tsfile_private.h"
22 #include "input.h"
23 #include "input/mpegts/dvb.h"
24 #include "tvhpoll.h"
25
26 #include <sys/types.h>
27 #include <sys/stat.h>
28 #include <fcntl.h>
29 #include <unistd.h>
30 #include <errno.h>
31 #include <assert.h>
32 #include <sched.h>
33
34 extern const idclass_t mpegts_input_class;
35
36
37 static void *
tsfile_input_thread(void * aux)38 tsfile_input_thread ( void *aux )
39 {
40 int fd = -1, nfds;
41 size_t len, rem;
42 ssize_t c;
43 tvhpoll_t *efd;
44 tvhpoll_event_t ev;
45 struct stat st;
46 sbuf_t buf;
47 mpegts_pcr_t pcr;
48 int64_t pcr_last = PTS_UNSET;
49 int64_t pcr_last_mono = 0;
50 tsfile_input_t *mi = aux;
51 mpegts_mux_instance_t *mmi;
52 tsfile_mux_instance_t *tmi;
53
54 /* Open file */
55 pthread_mutex_lock(&global_lock);
56
57 if ((mmi = LIST_FIRST(&mi->mi_mux_active))) {
58 tmi = (tsfile_mux_instance_t*)mmi;
59 fd = tvh_open(tmi->mmi_tsfile_path, O_RDONLY | O_NONBLOCK, 0);
60 if (fd == -1)
61 tvherror(LS_TSFILE, "open(%s) failed %d (%s)",
62 tmi->mmi_tsfile_path, errno, strerror(errno));
63 else
64 tvhtrace(LS_TSFILE, "adapter %d opened %s", mi->mi_instance, tmi->mmi_tsfile_path);
65 }
66 pthread_mutex_unlock(&global_lock);
67 if (fd == -1) return NULL;
68
69 /* Polling */
70 memset(&ev, 0, sizeof(ev));
71 efd = tvhpoll_create(2);
72 ev.events = TVHPOLL_IN;
73 ev.fd = ev.data.fd = mi->ti_thread_pipe.rd;
74 tvhpoll_add(efd, &ev, 1);
75
76 /* Alloc memory */
77 sbuf_init_fixed(&buf, 18800);
78
79 /* Get file length */
80 if (fstat(fd, &st)) {
81 tvherror(LS_TSFILE, "stat() failed %d (%s)",
82 errno, strerror(errno));
83 goto exit;
84 }
85
86 /* Check for extra (incomplete) packet at end */
87 rem = st.st_size % 188;
88 len = 0;
89 tvhtrace(LS_TSFILE, "adapter %d file size %jd rem %zu",
90 mi->mi_instance, (intmax_t)st.st_size, rem);
91
92 pcr_last_mono = getfastmonoclock();
93
94 /* Process input */
95 while (1) {
96
97 /* Find PCR PID */
98 if (tmi->mmi_tsfile_pcr_pid == MPEGTS_PID_NONE) {
99 mpegts_service_t *s;
100 pthread_mutex_lock(&tsfile_lock);
101 LIST_FOREACH(s, &tmi->mmi_mux->mm_services, s_dvb_mux_link) {
102 if (s->s_pcr_pid)
103 tmi->mmi_tsfile_pcr_pid = s->s_pcr_pid;
104 }
105 pthread_mutex_unlock(&tsfile_lock);
106 }
107
108 /* Check for terminate */
109 nfds = tvhpoll_wait(efd, &ev, 1, 0);
110 if (nfds == 1) break;
111
112 /* Read */
113 c = sbuf_read(&buf, fd);
114 if (c < 0) {
115 if (ERRNO_AGAIN(errno))
116 continue;
117 tvherror(LS_TSFILE, "read() error %d (%s)",
118 errno, strerror(errno));
119 break;
120 }
121 len += c;
122
123 /* Reset */
124 if (len >= st.st_size) {
125 len = 0;
126 c -= rem;
127 tvhtrace(LS_TSFILE, "adapter %d reached eof, resetting", mi->mi_instance);
128 lseek(fd, 0, SEEK_SET);
129 pcr_last = PTS_UNSET;
130 }
131
132 /* Process */
133 if (c > 0) {
134 pcr.pcr_first = PTS_UNSET;
135 pcr.pcr_last = PTS_UNSET;
136 pcr.pcr_pid = tmi->mmi_tsfile_pcr_pid;
137 mpegts_input_recv_packets((mpegts_input_t*)mi, mmi, &buf, 0, &pcr);
138 if (pcr.pcr_pid)
139 tmi->mmi_tsfile_pcr_pid = pcr.pcr_pid;
140
141 /* Delay */
142 if (pcr.pcr_first != PTS_UNSET) {
143 if (pcr_last != PTS_UNSET) {
144 int64_t delta, r;
145
146 delta = pcr.pcr_first - pcr_last;
147
148 if (delta < 0)
149 delta = 0;
150 else if (delta > 90000)
151 delta = 90000;
152 delta *= 11;
153
154 do {
155 r = tvh_usleep_abs(pcr_last_mono + delta);
156 } while (ERRNO_AGAIN(r) || r > 0);
157 }
158 pcr_last = pcr.pcr_first;
159 pcr_last_mono = getfastmonoclock();
160 }
161 }
162 sched_yield();
163 }
164
165 exit:
166 sbuf_free(&buf);
167 tvhpoll_destroy(efd);
168 close(fd);
169 return NULL;
170 }
171
172 static void
tsfile_input_stop_mux(mpegts_input_t * mi,mpegts_mux_instance_t * mmi)173 tsfile_input_stop_mux ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi )
174 {
175 int err;
176 tsfile_input_t *ti = (tsfile_input_t*)mi;
177
178 /* Stop thread */
179 if (ti->ti_thread_pipe.rd != -1) {
180 tvhtrace(LS_TSFILE, "adapter %d stopping thread", mi->mi_instance);
181 err = tvh_write(ti->ti_thread_pipe.wr, "", 1);
182 assert(err != -1);
183 pthread_join(ti->ti_thread_id, NULL);
184 tvh_pipe_close(&ti->ti_thread_pipe);
185 tvhtrace(LS_TSFILE, "adapter %d stopped thread", mi->mi_instance);
186 }
187
188 mmi->mmi_mux->mm_active = NULL;
189 LIST_REMOVE(mmi, mmi_active_link);
190 }
191
192 static int
tsfile_input_start_mux(mpegts_input_t * mi,mpegts_mux_instance_t * t,int weight)193 tsfile_input_start_mux ( mpegts_input_t *mi, mpegts_mux_instance_t *t, int weight )
194 {
195 struct stat st;
196 mpegts_mux_t *mm = t->mmi_mux;
197 tsfile_mux_instance_t *mmi = (tsfile_mux_instance_t*)t;
198 tsfile_input_t *ti = (tsfile_input_t*)mi;
199 tvhtrace(LS_TSFILE, "adapter %d starting mmi %p", mi->mi_instance, mmi);
200
201 /* Already tuned */
202 if (mmi->mmi_mux->mm_active == t) {
203 tvhtrace(LS_TSFILE, "mmi %p is already active", mmi);
204 return 0;
205 }
206 assert(mmi->mmi_mux->mm_active == NULL);
207 assert(LIST_FIRST(&mi->mi_mux_active) == NULL);
208
209 /* Check file is accessible */
210 if (lstat(mmi->mmi_tsfile_path, &st)) {
211 tvherror(LS_TSFILE, "mmi %p could not stat '%s' (%i)",
212 mmi, mmi->mmi_tsfile_path, errno);
213 mmi->mmi_tune_failed = 1;
214 return SM_CODE_TUNING_FAILED;
215 }
216
217 /* Start thread */
218 if (ti->ti_thread_pipe.rd == -1) {
219 if (tvh_pipe(O_NONBLOCK, &ti->ti_thread_pipe)) {
220 mmi->mmi_tune_failed = 1;
221 tvherror(LS_TSFILE, "failed to create thread pipe");
222 return SM_CODE_TUNING_FAILED;
223 }
224 tvhtrace(LS_TSFILE, "adapter %d starting thread", mi->mi_instance);
225 tvhthread_create(&ti->ti_thread_id, NULL, tsfile_input_thread, mi, "tsfile");
226 }
227
228 /* Current */
229 mmi->mmi_mux->mm_active = t;
230
231 /* Install table handlers */
232 psi_tables_install(mi, mm, DVB_SYS_UNKNOWN);
233
234 return 0;
235 }
236
237 tsfile_input_t *
tsfile_input_create(int idx)238 tsfile_input_create ( int idx )
239 {
240 tsfile_input_t *mi;
241
242 /* Create object */
243 mi = (tsfile_input_t*)
244 mpegts_input_create0(calloc(1, sizeof(tsfile_input_t)),
245 &mpegts_input_class,
246 NULL, NULL);
247 mi->mi_instance = idx;
248 mi->mi_enabled = 1;
249 mi->mi_start_mux = tsfile_input_start_mux;
250 mi->mi_stop_mux = tsfile_input_stop_mux;
251 LIST_INSERT_HEAD(&tsfile_inputs, mi, tsi_link);
252 if (!mi->mi_name)
253 mi->mi_name = strdup("TSFile");
254
255 mi->ti_thread_pipe.rd = mi->ti_thread_pipe.wr = -1;
256
257 /* Start table thread */
258 return mi;
259 }
260
261 /******************************************************************************
262 * Editor Configuration
263 *
264 * vim:sts=2:ts=2:sw=2:et
265 *****************************************************************************/
266