1 #include "stream.h"
2 
3 #include <libavformat/avformat.h>
4 #include <libavutil/time.h>
5 #include <SDL2/SDL_assert.h>
6 #include <SDL2/SDL_events.h>
7 #include <SDL2/SDL_mutex.h>
8 #include <SDL2/SDL_thread.h>
9 #include <unistd.h>
10 
11 #include "compat.h"
12 #include "config.h"
13 #include "buffer_util.h"
14 #include "decoder.h"
15 #include "events.h"
16 #include "lock_util.h"
17 #include "log.h"
18 #include "recorder.h"
19 
20 #define BUFSIZE 0x10000
21 
22 #define HEADER_SIZE 12
23 #define NO_PTS UINT64_C(-1)
24 
25 static struct frame_meta *
frame_meta_new(uint64_t pts)26 frame_meta_new(uint64_t pts) {
27     struct frame_meta *meta = SDL_malloc(sizeof(*meta));
28     if (!meta) {
29         return meta;
30     }
31     meta->pts = pts;
32     meta->next = NULL;
33     return meta;
34 }
35 
36 static void
frame_meta_delete(struct frame_meta * frame_meta)37 frame_meta_delete(struct frame_meta *frame_meta) {
38     SDL_free(frame_meta);
39 }
40 
41 static bool
receiver_state_push_meta(struct receiver_state * state,uint64_t pts)42 receiver_state_push_meta(struct receiver_state *state, uint64_t pts) {
43     struct frame_meta *frame_meta = frame_meta_new(pts);
44     if (!frame_meta) {
45         return false;
46     }
47 
48     // append to the list
49     // (iterate to find the last item, in practice the list should be tiny)
50     struct frame_meta **p = &state->frame_meta_queue;
51     while (*p) {
52         p = &(*p)->next;
53     }
54     *p = frame_meta;
55     return true;
56 }
57 
58 static uint64_t
receiver_state_take_meta(struct receiver_state * state)59 receiver_state_take_meta(struct receiver_state *state) {
60     struct frame_meta *frame_meta = state->frame_meta_queue; // first item
61     SDL_assert(frame_meta); // must not be empty
62     uint64_t pts = frame_meta->pts;
63     state->frame_meta_queue = frame_meta->next; // remove the item
64     frame_meta_delete(frame_meta);
65     return pts;
66 }
67 
68 static int
read_packet_with_meta(void * opaque,uint8_t * buf,int buf_size)69 read_packet_with_meta(void *opaque, uint8_t *buf, int buf_size) {
70     struct stream *stream = opaque;
71     struct receiver_state *state = &stream->receiver_state;
72 
73     // The video stream contains raw packets, without time information. When we
74     // record, we retrieve the timestamps separately, from a "meta" header
75     // added by the server before each raw packet.
76     //
77     // The "meta" header length is 12 bytes:
78     // [. . . . . . . .|. . . .]. . . . . . . . . . . . . . . ...
79     //  <-------------> <-----> <-----------------------------...
80     //        PTS        packet        raw packet
81     //                    size
82     //
83     // It is followed by <packet_size> bytes containing the packet/frame.
84 
85     if (!state->remaining) {
86 #define HEADER_SIZE 12
87         uint8_t header[HEADER_SIZE];
88         ssize_t r = net_recv_all(stream->socket, header, HEADER_SIZE);
89         if (r == -1) {
90             return AVERROR(errno);
91         }
92         if (r == 0) {
93             return AVERROR_EOF;
94         }
95         // no partial read (net_recv_all())
96         SDL_assert_release(r == HEADER_SIZE);
97 
98         uint64_t pts = buffer_read64be(header);
99         state->remaining = buffer_read32be(&header[8]);
100 
101         if (pts != NO_PTS && !receiver_state_push_meta(state, pts)) {
102             LOGE("Could not store PTS for recording");
103             // we cannot save the PTS, the recording would be broken
104             return AVERROR(ENOMEM);
105         }
106     }
107 
108     SDL_assert(state->remaining);
109 
110     if (buf_size > state->remaining) {
111         buf_size = state->remaining;
112     }
113 
114     ssize_t r = net_recv(stream->socket, buf, buf_size);
115     if (r == -1) {
116         return errno ? AVERROR(errno) : AVERROR_EOF;
117     }
118     if (r == 0) {
119         return AVERROR_EOF;
120     }
121 
122     SDL_assert(state->remaining >= r);
123     state->remaining -= r;
124 
125     return r;
126 }
127 
128 static int
read_raw_packet(void * opaque,uint8_t * buf,int buf_size)129 read_raw_packet(void *opaque, uint8_t *buf, int buf_size) {
130     struct stream *stream = opaque;
131     ssize_t r = net_recv(stream->socket, buf, buf_size);
132     if (r == -1) {
133         return errno ? AVERROR(errno) : AVERROR_EOF;
134     }
135     if (r == 0) {
136         return AVERROR_EOF;
137     }
138     return r;
139 }
140 
141 static void
notify_stopped(void)142 notify_stopped(void) {
143     SDL_Event stop_event;
144     stop_event.type = EVENT_STREAM_STOPPED;
145     SDL_PushEvent(&stop_event);
146 }
147 
148 static int
run_stream(void * data)149 run_stream(void *data) {
150     struct stream *stream = data;
151 
152     AVFormatContext *format_ctx = avformat_alloc_context();
153     if (!format_ctx) {
154         LOGC("Could not allocate format context");
155         goto end;
156     }
157 
158     unsigned char *buffer = av_malloc(BUFSIZE);
159     if (!buffer) {
160         LOGC("Could not allocate buffer");
161         goto finally_free_format_ctx;
162     }
163 
164     // initialize the receiver state
165     stream->receiver_state.frame_meta_queue = NULL;
166     stream->receiver_state.remaining = 0;
167 
168     // if recording is enabled, a "header" is sent between raw packets
169     int (*read_packet)(void *, uint8_t *, int) =
170             stream->recorder ? read_packet_with_meta : read_raw_packet;
171     AVIOContext *avio_ctx = avio_alloc_context(buffer, BUFSIZE, 0, stream,
172                                                read_packet, NULL, NULL);
173     if (!avio_ctx) {
174         LOGC("Could not allocate avio context");
175         // avformat_open_input takes ownership of 'buffer'
176         // so only free the buffer before avformat_open_input()
177         av_free(buffer);
178         goto finally_free_format_ctx;
179     }
180 
181     format_ctx->pb = avio_ctx;
182 
183     if (avformat_open_input(&format_ctx, NULL, NULL, NULL) < 0) {
184         LOGE("Could not open video stream");
185         goto finally_free_avio_ctx;
186     }
187 
188     AVCodec *codec = avcodec_find_decoder(AV_CODEC_ID_H264);
189     if (!codec) {
190         LOGE("H.264 decoder not found");
191         goto end;
192     }
193 
194     if (stream->decoder && !decoder_open(stream->decoder, codec)) {
195         LOGE("Could not open decoder");
196         goto finally_close_input;
197     }
198 
199     if (stream->recorder && !recorder_open(stream->recorder, codec)) {
200         LOGE("Could not open recorder");
201         goto finally_close_input;
202     }
203 
204     AVPacket packet;
205     av_init_packet(&packet);
206     packet.data = NULL;
207     packet.size = 0;
208 
209     while (!av_read_frame(format_ctx, &packet)) {
210         if (SDL_AtomicGet(&stream->stopped)) {
211             // if the stream is stopped, the socket had been shutdown, so the
212             // last packet is probably corrupted (but not detected as such by
213             // FFmpeg) and will not be decoded correctly
214             av_packet_unref(&packet);
215             goto quit;
216         }
217         if (stream->decoder && !decoder_push(stream->decoder, &packet)) {
218             av_packet_unref(&packet);
219             goto quit;
220         }
221 
222         if (stream->recorder) {
223             // we retrieve the PTS in order they were received, so they will
224             // be assigned to the correct frame
225             uint64_t pts = receiver_state_take_meta(&stream->receiver_state);
226             packet.pts = pts;
227             packet.dts = pts;
228 
229             // no need to rescale with av_packet_rescale_ts(), the timestamps
230             // are in microseconds both in input and output
231             if (!recorder_write(stream->recorder, &packet)) {
232                 LOGE("Could not write frame to output file");
233                 av_packet_unref(&packet);
234                 goto quit;
235             }
236         }
237 
238         av_packet_unref(&packet);
239 
240         if (avio_ctx->eof_reached) {
241             break;
242         }
243     }
244 
245     LOGD("End of frames");
246 
247 quit:
248     if (stream->recorder) {
249         recorder_close(stream->recorder);
250     }
251 finally_close_input:
252     avformat_close_input(&format_ctx);
253 finally_free_avio_ctx:
254     av_free(avio_ctx->buffer);
255     av_free(avio_ctx);
256 finally_free_format_ctx:
257     avformat_free_context(format_ctx);
258 end:
259     notify_stopped();
260     return 0;
261 }
262 
263 void
stream_init(struct stream * stream,socket_t socket,struct decoder * decoder,struct recorder * recorder)264 stream_init(struct stream *stream, socket_t socket,
265             struct decoder *decoder, struct recorder *recorder) {
266     stream->socket = socket;
267     stream->decoder = decoder,
268     stream->recorder = recorder;
269     SDL_AtomicSet(&stream->stopped, 0);
270 }
271 
272 bool
stream_start(struct stream * stream)273 stream_start(struct stream *stream) {
274     LOGD("Starting stream thread");
275 
276     stream->thread = SDL_CreateThread(run_stream, "stream", stream);
277     if (!stream->thread) {
278         LOGC("Could not start stream thread");
279         return false;
280     }
281     return true;
282 }
283 
284 void
stream_stop(struct stream * stream)285 stream_stop(struct stream *stream) {
286     SDL_AtomicSet(&stream->stopped, 1);
287     if (stream->decoder) {
288         decoder_interrupt(stream->decoder);
289     }
290 }
291 
292 void
stream_join(struct stream * stream)293 stream_join(struct stream *stream) {
294     SDL_WaitThread(stream->thread, NULL);
295 }
296