1 #include "stream.h"
2
3 #include <libavformat/avformat.h>
4 #include <libavutil/time.h>
5 #include <SDL2/SDL_assert.h>
6 #include <SDL2/SDL_events.h>
7 #include <SDL2/SDL_mutex.h>
8 #include <SDL2/SDL_thread.h>
9 #include <unistd.h>
10
11 #include "compat.h"
12 #include "config.h"
13 #include "buffer_util.h"
14 #include "decoder.h"
15 #include "events.h"
16 #include "lock_util.h"
17 #include "log.h"
18 #include "recorder.h"
19
20 #define BUFSIZE 0x10000
21
22 #define HEADER_SIZE 12
23 #define NO_PTS UINT64_C(-1)
24
25 static struct frame_meta *
frame_meta_new(uint64_t pts)26 frame_meta_new(uint64_t pts) {
27 struct frame_meta *meta = SDL_malloc(sizeof(*meta));
28 if (!meta) {
29 return meta;
30 }
31 meta->pts = pts;
32 meta->next = NULL;
33 return meta;
34 }
35
36 static void
frame_meta_delete(struct frame_meta * frame_meta)37 frame_meta_delete(struct frame_meta *frame_meta) {
38 SDL_free(frame_meta);
39 }
40
41 static bool
receiver_state_push_meta(struct receiver_state * state,uint64_t pts)42 receiver_state_push_meta(struct receiver_state *state, uint64_t pts) {
43 struct frame_meta *frame_meta = frame_meta_new(pts);
44 if (!frame_meta) {
45 return false;
46 }
47
48 // append to the list
49 // (iterate to find the last item, in practice the list should be tiny)
50 struct frame_meta **p = &state->frame_meta_queue;
51 while (*p) {
52 p = &(*p)->next;
53 }
54 *p = frame_meta;
55 return true;
56 }
57
58 static uint64_t
receiver_state_take_meta(struct receiver_state * state)59 receiver_state_take_meta(struct receiver_state *state) {
60 struct frame_meta *frame_meta = state->frame_meta_queue; // first item
61 SDL_assert(frame_meta); // must not be empty
62 uint64_t pts = frame_meta->pts;
63 state->frame_meta_queue = frame_meta->next; // remove the item
64 frame_meta_delete(frame_meta);
65 return pts;
66 }
67
68 static int
read_packet_with_meta(void * opaque,uint8_t * buf,int buf_size)69 read_packet_with_meta(void *opaque, uint8_t *buf, int buf_size) {
70 struct stream *stream = opaque;
71 struct receiver_state *state = &stream->receiver_state;
72
73 // The video stream contains raw packets, without time information. When we
74 // record, we retrieve the timestamps separately, from a "meta" header
75 // added by the server before each raw packet.
76 //
77 // The "meta" header length is 12 bytes:
78 // [. . . . . . . .|. . . .]. . . . . . . . . . . . . . . ...
79 // <-------------> <-----> <-----------------------------...
80 // PTS packet raw packet
81 // size
82 //
83 // It is followed by <packet_size> bytes containing the packet/frame.
84
85 if (!state->remaining) {
86 #define HEADER_SIZE 12
87 uint8_t header[HEADER_SIZE];
88 ssize_t r = net_recv_all(stream->socket, header, HEADER_SIZE);
89 if (r == -1) {
90 return AVERROR(errno);
91 }
92 if (r == 0) {
93 return AVERROR_EOF;
94 }
95 // no partial read (net_recv_all())
96 SDL_assert_release(r == HEADER_SIZE);
97
98 uint64_t pts = buffer_read64be(header);
99 state->remaining = buffer_read32be(&header[8]);
100
101 if (pts != NO_PTS && !receiver_state_push_meta(state, pts)) {
102 LOGE("Could not store PTS for recording");
103 // we cannot save the PTS, the recording would be broken
104 return AVERROR(ENOMEM);
105 }
106 }
107
108 SDL_assert(state->remaining);
109
110 if (buf_size > state->remaining) {
111 buf_size = state->remaining;
112 }
113
114 ssize_t r = net_recv(stream->socket, buf, buf_size);
115 if (r == -1) {
116 return errno ? AVERROR(errno) : AVERROR_EOF;
117 }
118 if (r == 0) {
119 return AVERROR_EOF;
120 }
121
122 SDL_assert(state->remaining >= r);
123 state->remaining -= r;
124
125 return r;
126 }
127
128 static int
read_raw_packet(void * opaque,uint8_t * buf,int buf_size)129 read_raw_packet(void *opaque, uint8_t *buf, int buf_size) {
130 struct stream *stream = opaque;
131 ssize_t r = net_recv(stream->socket, buf, buf_size);
132 if (r == -1) {
133 return errno ? AVERROR(errno) : AVERROR_EOF;
134 }
135 if (r == 0) {
136 return AVERROR_EOF;
137 }
138 return r;
139 }
140
141 static void
notify_stopped(void)142 notify_stopped(void) {
143 SDL_Event stop_event;
144 stop_event.type = EVENT_STREAM_STOPPED;
145 SDL_PushEvent(&stop_event);
146 }
147
148 static int
run_stream(void * data)149 run_stream(void *data) {
150 struct stream *stream = data;
151
152 AVFormatContext *format_ctx = avformat_alloc_context();
153 if (!format_ctx) {
154 LOGC("Could not allocate format context");
155 goto end;
156 }
157
158 unsigned char *buffer = av_malloc(BUFSIZE);
159 if (!buffer) {
160 LOGC("Could not allocate buffer");
161 goto finally_free_format_ctx;
162 }
163
164 // initialize the receiver state
165 stream->receiver_state.frame_meta_queue = NULL;
166 stream->receiver_state.remaining = 0;
167
168 // if recording is enabled, a "header" is sent between raw packets
169 int (*read_packet)(void *, uint8_t *, int) =
170 stream->recorder ? read_packet_with_meta : read_raw_packet;
171 AVIOContext *avio_ctx = avio_alloc_context(buffer, BUFSIZE, 0, stream,
172 read_packet, NULL, NULL);
173 if (!avio_ctx) {
174 LOGC("Could not allocate avio context");
175 // avformat_open_input takes ownership of 'buffer'
176 // so only free the buffer before avformat_open_input()
177 av_free(buffer);
178 goto finally_free_format_ctx;
179 }
180
181 format_ctx->pb = avio_ctx;
182
183 if (avformat_open_input(&format_ctx, NULL, NULL, NULL) < 0) {
184 LOGE("Could not open video stream");
185 goto finally_free_avio_ctx;
186 }
187
188 AVCodec *codec = avcodec_find_decoder(AV_CODEC_ID_H264);
189 if (!codec) {
190 LOGE("H.264 decoder not found");
191 goto end;
192 }
193
194 if (stream->decoder && !decoder_open(stream->decoder, codec)) {
195 LOGE("Could not open decoder");
196 goto finally_close_input;
197 }
198
199 if (stream->recorder && !recorder_open(stream->recorder, codec)) {
200 LOGE("Could not open recorder");
201 goto finally_close_input;
202 }
203
204 AVPacket packet;
205 av_init_packet(&packet);
206 packet.data = NULL;
207 packet.size = 0;
208
209 while (!av_read_frame(format_ctx, &packet)) {
210 if (SDL_AtomicGet(&stream->stopped)) {
211 // if the stream is stopped, the socket had been shutdown, so the
212 // last packet is probably corrupted (but not detected as such by
213 // FFmpeg) and will not be decoded correctly
214 av_packet_unref(&packet);
215 goto quit;
216 }
217 if (stream->decoder && !decoder_push(stream->decoder, &packet)) {
218 av_packet_unref(&packet);
219 goto quit;
220 }
221
222 if (stream->recorder) {
223 // we retrieve the PTS in order they were received, so they will
224 // be assigned to the correct frame
225 uint64_t pts = receiver_state_take_meta(&stream->receiver_state);
226 packet.pts = pts;
227 packet.dts = pts;
228
229 // no need to rescale with av_packet_rescale_ts(), the timestamps
230 // are in microseconds both in input and output
231 if (!recorder_write(stream->recorder, &packet)) {
232 LOGE("Could not write frame to output file");
233 av_packet_unref(&packet);
234 goto quit;
235 }
236 }
237
238 av_packet_unref(&packet);
239
240 if (avio_ctx->eof_reached) {
241 break;
242 }
243 }
244
245 LOGD("End of frames");
246
247 quit:
248 if (stream->recorder) {
249 recorder_close(stream->recorder);
250 }
251 finally_close_input:
252 avformat_close_input(&format_ctx);
253 finally_free_avio_ctx:
254 av_free(avio_ctx->buffer);
255 av_free(avio_ctx);
256 finally_free_format_ctx:
257 avformat_free_context(format_ctx);
258 end:
259 notify_stopped();
260 return 0;
261 }
262
263 void
stream_init(struct stream * stream,socket_t socket,struct decoder * decoder,struct recorder * recorder)264 stream_init(struct stream *stream, socket_t socket,
265 struct decoder *decoder, struct recorder *recorder) {
266 stream->socket = socket;
267 stream->decoder = decoder,
268 stream->recorder = recorder;
269 SDL_AtomicSet(&stream->stopped, 0);
270 }
271
272 bool
stream_start(struct stream * stream)273 stream_start(struct stream *stream) {
274 LOGD("Starting stream thread");
275
276 stream->thread = SDL_CreateThread(run_stream, "stream", stream);
277 if (!stream->thread) {
278 LOGC("Could not start stream thread");
279 return false;
280 }
281 return true;
282 }
283
284 void
stream_stop(struct stream * stream)285 stream_stop(struct stream *stream) {
286 SDL_AtomicSet(&stream->stopped, 1);
287 if (stream->decoder) {
288 decoder_interrupt(stream->decoder);
289 }
290 }
291
292 void
stream_join(struct stream * stream)293 stream_join(struct stream *stream) {
294 SDL_WaitThread(stream->thread, NULL);
295 }
296