1 /******************************************************************************
2     Copyright (C) 2015 by Hugh Bailey <obs.jim@gmail.com>
3 
4     This program is free software: you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation, either version 2 of the License, or
7     (at your option) any later version.
8 
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13 
14     You should have received a copy of the GNU General Public License
15     along with this program.  If not, see <http://www.gnu.org/licenses/>.
16 ******************************************************************************/
17 #include "ffmpeg-mux/ffmpeg-mux.h"
18 #include "obs-ffmpeg-mux.h"
19 
20 #ifdef _WIN32
21 #include "util/windows/win-version.h"
22 #endif
23 
24 #include <libavformat/avformat.h>
25 
26 #define do_log(level, format, ...)                  \
27 	blog(level, "[ffmpeg muxer: '%s'] " format, \
28 	     obs_output_get_name(stream->output), ##__VA_ARGS__)
29 
30 #define warn(format, ...) do_log(LOG_WARNING, format, ##__VA_ARGS__)
31 #define info(format, ...) do_log(LOG_INFO, format, ##__VA_ARGS__)
32 
ffmpeg_mux_getname(void * type)33 static const char *ffmpeg_mux_getname(void *type)
34 {
35 	UNUSED_PARAMETER(type);
36 	return obs_module_text("FFmpegMuxer");
37 }
38 
ffmpeg_mpegts_mux_getname(void * type)39 static const char *ffmpeg_mpegts_mux_getname(void *type)
40 {
41 	UNUSED_PARAMETER(type);
42 	return obs_module_text("FFmpegMpegtsMuxer");
43 }
44 
replay_buffer_clear(struct ffmpeg_muxer * stream)45 static inline void replay_buffer_clear(struct ffmpeg_muxer *stream)
46 {
47 	while (stream->packets.size > 0) {
48 		struct encoder_packet pkt;
49 		circlebuf_pop_front(&stream->packets, &pkt, sizeof(pkt));
50 		obs_encoder_packet_release(&pkt);
51 	}
52 
53 	circlebuf_free(&stream->packets);
54 	stream->cur_size = 0;
55 	stream->cur_time = 0;
56 	stream->max_size = 0;
57 	stream->max_time = 0;
58 	stream->save_ts = 0;
59 	stream->keyframes = 0;
60 }
61 
ffmpeg_mux_destroy(void * data)62 static void ffmpeg_mux_destroy(void *data)
63 {
64 	struct ffmpeg_muxer *stream = data;
65 
66 	replay_buffer_clear(stream);
67 	if (stream->mux_thread_joinable)
68 		pthread_join(stream->mux_thread, NULL);
69 	da_free(stream->mux_packets);
70 	circlebuf_free(&stream->packets);
71 
72 	os_process_pipe_destroy(stream->pipe);
73 	dstr_free(&stream->path);
74 	dstr_free(&stream->printable_path);
75 	dstr_free(&stream->stream_key);
76 	dstr_free(&stream->muxer_settings);
77 	bfree(stream);
78 }
79 
ffmpeg_mux_create(obs_data_t * settings,obs_output_t * output)80 static void *ffmpeg_mux_create(obs_data_t *settings, obs_output_t *output)
81 {
82 	struct ffmpeg_muxer *stream = bzalloc(sizeof(*stream));
83 	stream->output = output;
84 
85 	if (obs_output_get_flags(output) & OBS_OUTPUT_SERVICE)
86 		stream->is_network = true;
87 
88 	UNUSED_PARAMETER(settings);
89 	return stream;
90 }
91 
92 #ifdef _WIN32
93 #define FFMPEG_MUX "obs-ffmpeg-mux.exe"
94 #else
95 #define FFMPEG_MUX "obs-ffmpeg-mux"
96 #endif
97 
capturing(struct ffmpeg_muxer * stream)98 static inline bool capturing(struct ffmpeg_muxer *stream)
99 {
100 	return os_atomic_load_bool(&stream->capturing);
101 }
102 
stopping(struct ffmpeg_muxer * stream)103 bool stopping(struct ffmpeg_muxer *stream)
104 {
105 	return os_atomic_load_bool(&stream->stopping);
106 }
107 
active(struct ffmpeg_muxer * stream)108 bool active(struct ffmpeg_muxer *stream)
109 {
110 	return os_atomic_load_bool(&stream->active);
111 }
112 
113 /* TODO: allow codecs other than h264 whenever we start using them */
114 
add_video_encoder_params(struct ffmpeg_muxer * stream,struct dstr * cmd,obs_encoder_t * vencoder)115 static void add_video_encoder_params(struct ffmpeg_muxer *stream,
116 				     struct dstr *cmd, obs_encoder_t *vencoder)
117 {
118 	obs_data_t *settings = obs_encoder_get_settings(vencoder);
119 	int bitrate = (int)obs_data_get_int(settings, "bitrate");
120 	video_t *video = obs_get_video();
121 	const struct video_output_info *info = video_output_get_info(video);
122 
123 	obs_data_release(settings);
124 
125 	enum AVColorPrimaries pri = AVCOL_PRI_UNSPECIFIED;
126 	enum AVColorTransferCharacteristic trc = AVCOL_TRC_UNSPECIFIED;
127 	enum AVColorSpace spc = AVCOL_SPC_UNSPECIFIED;
128 	switch (info->colorspace) {
129 	case VIDEO_CS_601:
130 		pri = AVCOL_PRI_SMPTE170M;
131 		trc = AVCOL_TRC_SMPTE170M;
132 		spc = AVCOL_SPC_SMPTE170M;
133 		break;
134 	case VIDEO_CS_DEFAULT:
135 	case VIDEO_CS_709:
136 		pri = AVCOL_PRI_BT709;
137 		trc = AVCOL_TRC_BT709;
138 		spc = AVCOL_SPC_BT709;
139 		break;
140 	case VIDEO_CS_SRGB:
141 		pri = AVCOL_PRI_BT709;
142 		trc = AVCOL_TRC_IEC61966_2_1;
143 		spc = AVCOL_SPC_BT709;
144 		break;
145 	}
146 
147 	const enum AVColorRange range = (info->range == VIDEO_RANGE_FULL)
148 						? AVCOL_RANGE_JPEG
149 						: AVCOL_RANGE_MPEG;
150 
151 	dstr_catf(cmd, "%s %d %d %d %d %d %d %d %d %d ",
152 		  obs_encoder_get_codec(vencoder), bitrate,
153 		  obs_output_get_width(stream->output),
154 		  obs_output_get_height(stream->output), (int)pri, (int)trc,
155 		  (int)spc, (int)range, (int)info->fps_num, (int)info->fps_den);
156 }
157 
add_audio_encoder_params(struct dstr * cmd,obs_encoder_t * aencoder)158 static void add_audio_encoder_params(struct dstr *cmd, obs_encoder_t *aencoder)
159 {
160 	obs_data_t *settings = obs_encoder_get_settings(aencoder);
161 	int bitrate = (int)obs_data_get_int(settings, "bitrate");
162 	audio_t *audio = obs_get_audio();
163 	struct dstr name = {0};
164 
165 	obs_data_release(settings);
166 
167 	dstr_copy(&name, obs_encoder_get_name(aencoder));
168 	dstr_replace(&name, "\"", "\"\"");
169 
170 	dstr_catf(cmd, "\"%s\" %d %d %d ", name.array, bitrate,
171 		  (int)obs_encoder_get_sample_rate(aencoder),
172 		  (int)audio_output_get_channels(audio));
173 
174 	dstr_free(&name);
175 }
176 
log_muxer_params(struct ffmpeg_muxer * stream,const char * settings)177 static void log_muxer_params(struct ffmpeg_muxer *stream, const char *settings)
178 {
179 	int ret;
180 
181 	AVDictionary *dict = NULL;
182 	if ((ret = av_dict_parse_string(&dict, settings, "=", " ", 0))) {
183 		warn("Failed to parse muxer settings: %s\n%s", av_err2str(ret),
184 		     settings);
185 
186 		av_dict_free(&dict);
187 		return;
188 	}
189 
190 	if (av_dict_count(dict) > 0) {
191 		struct dstr str = {0};
192 
193 		AVDictionaryEntry *entry = NULL;
194 		while ((entry = av_dict_get(dict, "", entry,
195 					    AV_DICT_IGNORE_SUFFIX)))
196 			dstr_catf(&str, "\n\t%s=%s", entry->key, entry->value);
197 
198 		info("Using muxer settings:%s", str.array);
199 		dstr_free(&str);
200 	}
201 
202 	av_dict_free(&dict);
203 }
204 
add_stream_key(struct dstr * cmd,struct ffmpeg_muxer * stream)205 static void add_stream_key(struct dstr *cmd, struct ffmpeg_muxer *stream)
206 {
207 	dstr_catf(cmd, "\"%s\" ",
208 		  dstr_is_empty(&stream->stream_key)
209 			  ? ""
210 			  : stream->stream_key.array);
211 }
212 
add_muxer_params(struct dstr * cmd,struct ffmpeg_muxer * stream)213 static void add_muxer_params(struct dstr *cmd, struct ffmpeg_muxer *stream)
214 {
215 	struct dstr mux = {0};
216 
217 	if (dstr_is_empty(&stream->muxer_settings)) {
218 		obs_data_t *settings = obs_output_get_settings(stream->output);
219 		dstr_copy(&mux,
220 			  obs_data_get_string(settings, "muxer_settings"));
221 		obs_data_release(settings);
222 	} else {
223 		dstr_copy(&mux, stream->muxer_settings.array);
224 	}
225 
226 	log_muxer_params(stream, mux.array);
227 
228 	dstr_replace(&mux, "\"", "\\\"");
229 
230 	dstr_catf(cmd, "\"%s\" ", mux.array ? mux.array : "");
231 
232 	dstr_free(&mux);
233 }
234 
build_command_line(struct ffmpeg_muxer * stream,struct dstr * cmd,const char * path)235 static void build_command_line(struct ffmpeg_muxer *stream, struct dstr *cmd,
236 			       const char *path)
237 {
238 	obs_encoder_t *vencoder = obs_output_get_video_encoder(stream->output);
239 	obs_encoder_t *aencoders[MAX_AUDIO_MIXES];
240 	int num_tracks = 0;
241 
242 	for (;;) {
243 		obs_encoder_t *aencoder = obs_output_get_audio_encoder(
244 			stream->output, num_tracks);
245 		if (!aencoder)
246 			break;
247 
248 		aencoders[num_tracks] = aencoder;
249 		num_tracks++;
250 	}
251 
252 	dstr_init_move_array(cmd, os_get_executable_path_ptr(FFMPEG_MUX));
253 	dstr_insert_ch(cmd, 0, '\"');
254 	dstr_cat(cmd, "\" \"");
255 
256 	dstr_copy(&stream->path, path);
257 	dstr_replace(&stream->path, "\"", "\"\"");
258 	dstr_cat_dstr(cmd, &stream->path);
259 
260 	dstr_catf(cmd, "\" %d %d ", vencoder ? 1 : 0, num_tracks);
261 
262 	if (vencoder)
263 		add_video_encoder_params(stream, cmd, vencoder);
264 
265 	if (num_tracks) {
266 		dstr_cat(cmd, "aac ");
267 
268 		for (int i = 0; i < num_tracks; i++) {
269 			add_audio_encoder_params(cmd, aencoders[i]);
270 		}
271 	}
272 
273 	add_stream_key(cmd, stream);
274 	add_muxer_params(cmd, stream);
275 }
276 
start_pipe(struct ffmpeg_muxer * stream,const char * path)277 void start_pipe(struct ffmpeg_muxer *stream, const char *path)
278 {
279 	struct dstr cmd;
280 	build_command_line(stream, &cmd, path);
281 	stream->pipe = os_process_pipe_create(cmd.array, "w");
282 	dstr_free(&cmd);
283 }
284 
set_file_not_readable_error(struct ffmpeg_muxer * stream,obs_data_t * settings,const char * path)285 static void set_file_not_readable_error(struct ffmpeg_muxer *stream,
286 					obs_data_t *settings, const char *path)
287 {
288 	struct dstr error_message;
289 	dstr_init_copy(&error_message, obs_module_text("UnableToWritePath"));
290 #ifdef _WIN32
291 	/* special warning for Windows 10 users about Defender */
292 	struct win_version_info ver;
293 	get_win_ver(&ver);
294 	if (ver.major >= 10) {
295 		dstr_cat(&error_message, "\n\n");
296 		dstr_cat(&error_message,
297 			 obs_module_text("WarnWindowsDefender"));
298 	}
299 #endif
300 	dstr_replace(&error_message, "%1", path);
301 	obs_output_set_last_error(stream->output, error_message.array);
302 	dstr_free(&error_message);
303 	obs_data_release(settings);
304 }
305 
ffmpeg_mux_start(void * data)306 static bool ffmpeg_mux_start(void *data)
307 {
308 	struct ffmpeg_muxer *stream = data;
309 	obs_data_t *settings;
310 	const char *path;
311 
312 	if (!obs_output_can_begin_data_capture(stream->output, 0))
313 		return false;
314 	if (!obs_output_initialize_encoders(stream->output, 0))
315 		return false;
316 
317 	settings = obs_output_get_settings(stream->output);
318 	if (stream->is_network) {
319 		obs_service_t *service;
320 		service = obs_output_get_service(stream->output);
321 		if (!service)
322 			return false;
323 		path = obs_service_get_url(service);
324 	} else {
325 		path = obs_data_get_string(settings, "path");
326 	}
327 
328 	if (!stream->is_network) {
329 		/* ensure output path is writable to avoid generic error
330 		 * message.
331 		 *
332 		 * TODO: remove once ffmpeg-mux is refactored to pass
333 		 * errors back */
334 		FILE *test_file = os_fopen(path, "wb");
335 		if (!test_file) {
336 			set_file_not_readable_error(stream, settings, path);
337 			return false;
338 		}
339 
340 		fclose(test_file);
341 		os_unlink(path);
342 	}
343 
344 	start_pipe(stream, path);
345 	obs_data_release(settings);
346 
347 	if (!stream->pipe) {
348 		obs_output_set_last_error(
349 			stream->output, obs_module_text("HelperProcessFailed"));
350 		warn("Failed to create process pipe");
351 		return false;
352 	}
353 
354 	/* write headers and start capture */
355 	os_atomic_set_bool(&stream->active, true);
356 	os_atomic_set_bool(&stream->capturing, true);
357 	stream->total_bytes = 0;
358 	obs_output_begin_data_capture(stream->output, 0);
359 
360 	info("Writing file '%s'...", stream->path.array);
361 	return true;
362 }
363 
deactivate(struct ffmpeg_muxer * stream,int code)364 int deactivate(struct ffmpeg_muxer *stream, int code)
365 {
366 	int ret = -1;
367 
368 	if (stream->is_hls) {
369 		if (stream->mux_thread_joinable) {
370 			os_event_signal(stream->stop_event);
371 			os_sem_post(stream->write_sem);
372 			pthread_join(stream->mux_thread, NULL);
373 			stream->mux_thread_joinable = false;
374 		}
375 	}
376 
377 	if (active(stream)) {
378 		ret = os_process_pipe_destroy(stream->pipe);
379 		stream->pipe = NULL;
380 
381 		os_atomic_set_bool(&stream->active, false);
382 		os_atomic_set_bool(&stream->sent_headers, false);
383 
384 		info("Output of file '%s' stopped",
385 		     dstr_is_empty(&stream->printable_path)
386 			     ? stream->path.array
387 			     : stream->printable_path.array);
388 	}
389 
390 	if (code) {
391 		obs_output_signal_stop(stream->output, code);
392 	} else if (stopping(stream)) {
393 		obs_output_end_data_capture(stream->output);
394 	}
395 
396 	if (stream->is_hls) {
397 		pthread_mutex_lock(&stream->write_mutex);
398 
399 		while (stream->packets.size) {
400 			struct encoder_packet packet;
401 			circlebuf_pop_front(&stream->packets, &packet,
402 					    sizeof(packet));
403 			obs_encoder_packet_release(&packet);
404 		}
405 
406 		pthread_mutex_unlock(&stream->write_mutex);
407 	}
408 
409 	os_atomic_set_bool(&stream->stopping, false);
410 	return ret;
411 }
412 
ffmpeg_mux_stop(void * data,uint64_t ts)413 void ffmpeg_mux_stop(void *data, uint64_t ts)
414 {
415 	struct ffmpeg_muxer *stream = data;
416 
417 	if (capturing(stream) || ts == 0) {
418 		stream->stop_ts = (int64_t)ts / 1000LL;
419 		os_atomic_set_bool(&stream->stopping, true);
420 		os_atomic_set_bool(&stream->capturing, false);
421 	}
422 }
423 
signal_failure(struct ffmpeg_muxer * stream)424 static void signal_failure(struct ffmpeg_muxer *stream)
425 {
426 	char error[1024];
427 	int ret;
428 	int code;
429 
430 	size_t len;
431 
432 	len = os_process_pipe_read_err(stream->pipe, (uint8_t *)error,
433 				       sizeof(error) - 1);
434 
435 	if (len > 0) {
436 		error[len] = 0;
437 		warn("ffmpeg-mux: %s", error);
438 		obs_output_set_last_error(stream->output, error);
439 	}
440 
441 	ret = deactivate(stream, 0);
442 
443 	switch (ret) {
444 	case FFM_UNSUPPORTED:
445 		code = OBS_OUTPUT_UNSUPPORTED;
446 		break;
447 	default:
448 		if (stream->is_network) {
449 			code = OBS_OUTPUT_DISCONNECTED;
450 		} else {
451 			code = OBS_OUTPUT_ENCODE_ERROR;
452 		}
453 	}
454 
455 	obs_output_signal_stop(stream->output, code);
456 	os_atomic_set_bool(&stream->capturing, false);
457 }
458 
write_packet(struct ffmpeg_muxer * stream,struct encoder_packet * packet)459 bool write_packet(struct ffmpeg_muxer *stream, struct encoder_packet *packet)
460 {
461 	bool is_video = packet->type == OBS_ENCODER_VIDEO;
462 	size_t ret;
463 
464 	struct ffm_packet_info info = {.pts = packet->pts,
465 				       .dts = packet->dts,
466 				       .size = (uint32_t)packet->size,
467 				       .index = (int)packet->track_idx,
468 				       .type = is_video ? FFM_PACKET_VIDEO
469 							: FFM_PACKET_AUDIO,
470 				       .keyframe = packet->keyframe};
471 
472 	ret = os_process_pipe_write(stream->pipe, (const uint8_t *)&info,
473 				    sizeof(info));
474 	if (ret != sizeof(info)) {
475 		warn("os_process_pipe_write for info structure failed");
476 		signal_failure(stream);
477 		return false;
478 	}
479 
480 	ret = os_process_pipe_write(stream->pipe, packet->data, packet->size);
481 	if (ret != packet->size) {
482 		warn("os_process_pipe_write for packet data failed");
483 		signal_failure(stream);
484 		return false;
485 	}
486 
487 	stream->total_bytes += packet->size;
488 	return true;
489 }
490 
send_audio_headers(struct ffmpeg_muxer * stream,obs_encoder_t * aencoder,size_t idx)491 static bool send_audio_headers(struct ffmpeg_muxer *stream,
492 			       obs_encoder_t *aencoder, size_t idx)
493 {
494 	struct encoder_packet packet = {
495 		.type = OBS_ENCODER_AUDIO, .timebase_den = 1, .track_idx = idx};
496 
497 	obs_encoder_get_extra_data(aencoder, &packet.data, &packet.size);
498 	return write_packet(stream, &packet);
499 }
500 
send_video_headers(struct ffmpeg_muxer * stream)501 static bool send_video_headers(struct ffmpeg_muxer *stream)
502 {
503 	obs_encoder_t *vencoder = obs_output_get_video_encoder(stream->output);
504 
505 	struct encoder_packet packet = {.type = OBS_ENCODER_VIDEO,
506 					.timebase_den = 1};
507 
508 	obs_encoder_get_extra_data(vencoder, &packet.data, &packet.size);
509 	return write_packet(stream, &packet);
510 }
511 
send_headers(struct ffmpeg_muxer * stream)512 bool send_headers(struct ffmpeg_muxer *stream)
513 {
514 	obs_encoder_t *aencoder;
515 	size_t idx = 0;
516 
517 	if (!send_video_headers(stream))
518 		return false;
519 
520 	do {
521 		aencoder = obs_output_get_audio_encoder(stream->output, idx);
522 		if (aencoder) {
523 			if (!send_audio_headers(stream, aencoder, idx)) {
524 				return false;
525 			}
526 			idx++;
527 		}
528 	} while (aencoder);
529 
530 	return true;
531 }
532 
ffmpeg_mux_data(void * data,struct encoder_packet * packet)533 static void ffmpeg_mux_data(void *data, struct encoder_packet *packet)
534 {
535 	struct ffmpeg_muxer *stream = data;
536 
537 	if (!active(stream))
538 		return;
539 
540 	/* encoder failure */
541 	if (!packet) {
542 		deactivate(stream, OBS_OUTPUT_ENCODE_ERROR);
543 		return;
544 	}
545 
546 	if (!stream->sent_headers) {
547 		if (!send_headers(stream))
548 			return;
549 
550 		stream->sent_headers = true;
551 	}
552 
553 	if (stopping(stream)) {
554 		if (packet->sys_dts_usec >= stream->stop_ts) {
555 			deactivate(stream, 0);
556 			return;
557 		}
558 	}
559 
560 	write_packet(stream, packet);
561 }
562 
ffmpeg_mux_properties(void * unused)563 static obs_properties_t *ffmpeg_mux_properties(void *unused)
564 {
565 	UNUSED_PARAMETER(unused);
566 
567 	obs_properties_t *props = obs_properties_create();
568 
569 	obs_properties_add_text(props, "path", obs_module_text("FilePath"),
570 				OBS_TEXT_DEFAULT);
571 	return props;
572 }
573 
ffmpeg_mux_total_bytes(void * data)574 uint64_t ffmpeg_mux_total_bytes(void *data)
575 {
576 	struct ffmpeg_muxer *stream = data;
577 	return stream->total_bytes;
578 }
579 
580 struct obs_output_info ffmpeg_muxer = {
581 	.id = "ffmpeg_muxer",
582 	.flags = OBS_OUTPUT_AV | OBS_OUTPUT_ENCODED | OBS_OUTPUT_MULTI_TRACK |
583 		 OBS_OUTPUT_CAN_PAUSE,
584 	.get_name = ffmpeg_mux_getname,
585 	.create = ffmpeg_mux_create,
586 	.destroy = ffmpeg_mux_destroy,
587 	.start = ffmpeg_mux_start,
588 	.stop = ffmpeg_mux_stop,
589 	.encoded_packet = ffmpeg_mux_data,
590 	.get_total_bytes = ffmpeg_mux_total_bytes,
591 	.get_properties = ffmpeg_mux_properties,
592 };
593 
connect_time(struct ffmpeg_muxer * stream)594 static int connect_time(struct ffmpeg_muxer *stream)
595 {
596 	UNUSED_PARAMETER(stream);
597 	/* TODO */
598 	return 0;
599 }
600 
ffmpeg_mpegts_mux_connect_time(void * data)601 static int ffmpeg_mpegts_mux_connect_time(void *data)
602 {
603 	struct ffmpeg_muxer *stream = data;
604 	/* TODO */
605 	return connect_time(stream);
606 }
607 
608 struct obs_output_info ffmpeg_mpegts_muxer = {
609 	.id = "ffmpeg_mpegts_muxer",
610 	.flags = OBS_OUTPUT_AV | OBS_OUTPUT_ENCODED | OBS_OUTPUT_MULTI_TRACK |
611 		 OBS_OUTPUT_SERVICE,
612 	.encoded_video_codecs = "h264",
613 	.encoded_audio_codecs = "aac",
614 	.get_name = ffmpeg_mpegts_mux_getname,
615 	.create = ffmpeg_mux_create,
616 	.destroy = ffmpeg_mux_destroy,
617 	.start = ffmpeg_mux_start,
618 	.stop = ffmpeg_mux_stop,
619 	.encoded_packet = ffmpeg_mux_data,
620 	.get_total_bytes = ffmpeg_mux_total_bytes,
621 	.get_properties = ffmpeg_mux_properties,
622 	.get_connect_time_ms = ffmpeg_mpegts_mux_connect_time,
623 };
624 
625 /* ------------------------------------------------------------------------ */
626 
replay_buffer_getname(void * type)627 static const char *replay_buffer_getname(void *type)
628 {
629 	UNUSED_PARAMETER(type);
630 	return obs_module_text("ReplayBuffer");
631 }
632 
replay_buffer_hotkey(void * data,obs_hotkey_id id,obs_hotkey_t * hotkey,bool pressed)633 static void replay_buffer_hotkey(void *data, obs_hotkey_id id,
634 				 obs_hotkey_t *hotkey, bool pressed)
635 {
636 	UNUSED_PARAMETER(id);
637 	UNUSED_PARAMETER(hotkey);
638 	UNUSED_PARAMETER(pressed);
639 
640 	if (!pressed)
641 		return;
642 
643 	struct ffmpeg_muxer *stream = data;
644 
645 	if (os_atomic_load_bool(&stream->active)) {
646 		obs_encoder_t *vencoder =
647 			obs_output_get_video_encoder(stream->output);
648 		if (obs_encoder_paused(vencoder)) {
649 			info("Could not save buffer because encoders paused");
650 			return;
651 		}
652 
653 		stream->save_ts = os_gettime_ns() / 1000LL;
654 	}
655 }
656 
save_replay_proc(void * data,calldata_t * cd)657 static void save_replay_proc(void *data, calldata_t *cd)
658 {
659 	replay_buffer_hotkey(data, 0, NULL, true);
660 	UNUSED_PARAMETER(cd);
661 }
662 
get_last_replay(void * data,calldata_t * cd)663 static void get_last_replay(void *data, calldata_t *cd)
664 {
665 	struct ffmpeg_muxer *stream = data;
666 	if (!os_atomic_load_bool(&stream->muxing))
667 		calldata_set_string(cd, "path", stream->path.array);
668 }
669 
replay_buffer_create(obs_data_t * settings,obs_output_t * output)670 static void *replay_buffer_create(obs_data_t *settings, obs_output_t *output)
671 {
672 	UNUSED_PARAMETER(settings);
673 	struct ffmpeg_muxer *stream = bzalloc(sizeof(*stream));
674 	stream->output = output;
675 
676 	stream->hotkey =
677 		obs_hotkey_register_output(output, "ReplayBuffer.Save",
678 					   obs_module_text("ReplayBuffer.Save"),
679 					   replay_buffer_hotkey, stream);
680 
681 	proc_handler_t *ph = obs_output_get_proc_handler(output);
682 	proc_handler_add(ph, "void save()", save_replay_proc, stream);
683 	proc_handler_add(ph, "void get_last_replay(out string path)",
684 			 get_last_replay, stream);
685 
686 	signal_handler_t *sh = obs_output_get_signal_handler(output);
687 	signal_handler_add(sh, "void saved()");
688 
689 	return stream;
690 }
691 
replay_buffer_destroy(void * data)692 static void replay_buffer_destroy(void *data)
693 {
694 	struct ffmpeg_muxer *stream = data;
695 	if (stream->hotkey)
696 		obs_hotkey_unregister(stream->hotkey);
697 	ffmpeg_mux_destroy(data);
698 }
699 
replay_buffer_start(void * data)700 static bool replay_buffer_start(void *data)
701 {
702 	struct ffmpeg_muxer *stream = data;
703 
704 	if (!obs_output_can_begin_data_capture(stream->output, 0))
705 		return false;
706 	if (!obs_output_initialize_encoders(stream->output, 0))
707 		return false;
708 
709 	obs_data_t *s = obs_output_get_settings(stream->output);
710 	stream->max_time = obs_data_get_int(s, "max_time_sec") * 1000000LL;
711 	stream->max_size = obs_data_get_int(s, "max_size_mb") * (1024 * 1024);
712 	obs_data_release(s);
713 
714 	os_atomic_set_bool(&stream->active, true);
715 	os_atomic_set_bool(&stream->capturing, true);
716 	stream->total_bytes = 0;
717 	obs_output_begin_data_capture(stream->output, 0);
718 
719 	return true;
720 }
721 
purge_front(struct ffmpeg_muxer * stream)722 static bool purge_front(struct ffmpeg_muxer *stream)
723 {
724 	struct encoder_packet pkt;
725 	bool keyframe;
726 
727 	if (!stream->packets.size)
728 		return false;
729 
730 	circlebuf_pop_front(&stream->packets, &pkt, sizeof(pkt));
731 
732 	keyframe = pkt.type == OBS_ENCODER_VIDEO && pkt.keyframe;
733 
734 	if (keyframe)
735 		stream->keyframes--;
736 
737 	if (!stream->packets.size) {
738 		stream->cur_size = 0;
739 		stream->cur_time = 0;
740 	} else {
741 		struct encoder_packet first;
742 		circlebuf_peek_front(&stream->packets, &first, sizeof(first));
743 		stream->cur_time = first.dts_usec;
744 		stream->cur_size -= (int64_t)pkt.size;
745 	}
746 
747 	obs_encoder_packet_release(&pkt);
748 	return keyframe;
749 }
750 
purge(struct ffmpeg_muxer * stream)751 static inline void purge(struct ffmpeg_muxer *stream)
752 {
753 	if (purge_front(stream)) {
754 		struct encoder_packet pkt;
755 
756 		for (;;) {
757 			if (!stream->packets.size)
758 				return;
759 			circlebuf_peek_front(&stream->packets, &pkt,
760 					     sizeof(pkt));
761 			if (pkt.type == OBS_ENCODER_VIDEO && pkt.keyframe)
762 				return;
763 
764 			purge_front(stream);
765 		}
766 	}
767 }
768 
replay_buffer_purge(struct ffmpeg_muxer * stream,struct encoder_packet * pkt)769 static inline void replay_buffer_purge(struct ffmpeg_muxer *stream,
770 				       struct encoder_packet *pkt)
771 {
772 	if (stream->max_size) {
773 		if (!stream->packets.size || stream->keyframes <= 2)
774 			return;
775 
776 		while ((stream->cur_size + (int64_t)pkt->size) >
777 		       stream->max_size)
778 			purge(stream);
779 	}
780 
781 	if (!stream->packets.size || stream->keyframes <= 2)
782 		return;
783 
784 	while ((pkt->dts_usec - stream->cur_time) > stream->max_time)
785 		purge(stream);
786 }
787 
insert_packet(struct darray * array,struct encoder_packet * packet,int64_t video_offset,int64_t * audio_offsets,int64_t video_dts_offset,int64_t * audio_dts_offsets)788 static void insert_packet(struct darray *array, struct encoder_packet *packet,
789 			  int64_t video_offset, int64_t *audio_offsets,
790 			  int64_t video_dts_offset, int64_t *audio_dts_offsets)
791 {
792 	struct encoder_packet pkt;
793 	DARRAY(struct encoder_packet) packets;
794 	packets.da = *array;
795 	size_t idx;
796 
797 	obs_encoder_packet_ref(&pkt, packet);
798 
799 	if (pkt.type == OBS_ENCODER_VIDEO) {
800 		pkt.dts_usec -= video_offset;
801 		pkt.dts -= video_dts_offset;
802 		pkt.pts -= video_dts_offset;
803 	} else {
804 		pkt.dts_usec -= audio_offsets[pkt.track_idx];
805 		pkt.dts -= audio_dts_offsets[pkt.track_idx];
806 		pkt.pts -= audio_dts_offsets[pkt.track_idx];
807 	}
808 
809 	for (idx = packets.num; idx > 0; idx--) {
810 		struct encoder_packet *p = packets.array + (idx - 1);
811 		if (p->dts_usec < pkt.dts_usec)
812 			break;
813 	}
814 
815 	da_insert(packets, idx, &pkt);
816 	*array = packets.da;
817 }
818 
replay_buffer_mux_thread(void * data)819 static void *replay_buffer_mux_thread(void *data)
820 {
821 	struct ffmpeg_muxer *stream = data;
822 	bool error = false;
823 
824 	start_pipe(stream, stream->path.array);
825 
826 	if (!stream->pipe) {
827 		warn("Failed to create process pipe");
828 		error = true;
829 		goto error;
830 	}
831 
832 	if (!send_headers(stream)) {
833 		warn("Could not write headers for file '%s'",
834 		     stream->path.array);
835 		error = true;
836 		goto error;
837 	}
838 
839 	for (size_t i = 0; i < stream->mux_packets.num; i++) {
840 		struct encoder_packet *pkt = &stream->mux_packets.array[i];
841 		write_packet(stream, pkt);
842 		obs_encoder_packet_release(pkt);
843 	}
844 
845 	info("Wrote replay buffer to '%s'", stream->path.array);
846 
847 error:
848 	os_process_pipe_destroy(stream->pipe);
849 	stream->pipe = NULL;
850 	da_free(stream->mux_packets);
851 	os_atomic_set_bool(&stream->muxing, false);
852 
853 	if (!error) {
854 		calldata_t cd = {0};
855 		signal_handler_t *sh =
856 			obs_output_get_signal_handler(stream->output);
857 		signal_handler_signal(sh, "saved", &cd);
858 	}
859 
860 	return NULL;
861 }
862 
replay_buffer_save(struct ffmpeg_muxer * stream)863 static void replay_buffer_save(struct ffmpeg_muxer *stream)
864 {
865 	const size_t size = sizeof(struct encoder_packet);
866 	size_t num_packets = stream->packets.size / size;
867 
868 	da_reserve(stream->mux_packets, num_packets);
869 
870 	/* ---------------------------- */
871 	/* reorder packets */
872 
873 	bool found_video = false;
874 	bool found_audio[MAX_AUDIO_MIXES] = {0};
875 	int64_t video_offset = 0;
876 	int64_t video_dts_offset = 0;
877 	int64_t audio_offsets[MAX_AUDIO_MIXES] = {0};
878 	int64_t audio_dts_offsets[MAX_AUDIO_MIXES] = {0};
879 
880 	for (size_t i = 0; i < num_packets; i++) {
881 		struct encoder_packet *pkt;
882 		pkt = circlebuf_data(&stream->packets, i * size);
883 
884 		if (pkt->type == OBS_ENCODER_VIDEO) {
885 			if (!found_video) {
886 				video_offset = pkt->dts_usec;
887 				video_dts_offset = pkt->dts;
888 				found_video = true;
889 			}
890 		} else {
891 			if (!found_audio[pkt->track_idx]) {
892 				found_audio[pkt->track_idx] = true;
893 				audio_offsets[pkt->track_idx] = pkt->dts_usec;
894 				audio_dts_offsets[pkt->track_idx] = pkt->dts;
895 			}
896 		}
897 
898 		insert_packet(&stream->mux_packets.da, pkt, video_offset,
899 			      audio_offsets, video_dts_offset,
900 			      audio_dts_offsets);
901 	}
902 
903 	/* ---------------------------- */
904 	/* generate filename */
905 
906 	obs_data_t *settings = obs_output_get_settings(stream->output);
907 	const char *dir = obs_data_get_string(settings, "directory");
908 	const char *fmt = obs_data_get_string(settings, "format");
909 	const char *ext = obs_data_get_string(settings, "extension");
910 	bool space = obs_data_get_bool(settings, "allow_spaces");
911 
912 	char *filename = os_generate_formatted_filename(ext, space, fmt);
913 
914 	dstr_copy(&stream->path, dir);
915 	dstr_replace(&stream->path, "\\", "/");
916 	if (dstr_end(&stream->path) != '/')
917 		dstr_cat_ch(&stream->path, '/');
918 	dstr_cat(&stream->path, filename);
919 
920 	char *slash = strrchr(stream->path.array, '/');
921 	if (slash) {
922 		*slash = 0;
923 		os_mkdirs(stream->path.array);
924 		*slash = '/';
925 	}
926 
927 	bfree(filename);
928 	obs_data_release(settings);
929 
930 	/* ---------------------------- */
931 
932 	os_atomic_set_bool(&stream->muxing, true);
933 	stream->mux_thread_joinable = pthread_create(&stream->mux_thread, NULL,
934 						     replay_buffer_mux_thread,
935 						     stream) == 0;
936 }
937 
deactivate_replay_buffer(struct ffmpeg_muxer * stream,int code)938 static void deactivate_replay_buffer(struct ffmpeg_muxer *stream, int code)
939 {
940 	if (code) {
941 		obs_output_signal_stop(stream->output, code);
942 	} else if (stopping(stream)) {
943 		obs_output_end_data_capture(stream->output);
944 	}
945 
946 	os_atomic_set_bool(&stream->active, false);
947 	os_atomic_set_bool(&stream->sent_headers, false);
948 	os_atomic_set_bool(&stream->stopping, false);
949 	replay_buffer_clear(stream);
950 }
951 
replay_buffer_data(void * data,struct encoder_packet * packet)952 static void replay_buffer_data(void *data, struct encoder_packet *packet)
953 {
954 	struct ffmpeg_muxer *stream = data;
955 	struct encoder_packet pkt;
956 
957 	if (!active(stream))
958 		return;
959 
960 	/* encoder failure */
961 	if (!packet) {
962 		deactivate_replay_buffer(stream, OBS_OUTPUT_ENCODE_ERROR);
963 		return;
964 	}
965 
966 	if (stopping(stream)) {
967 		if (packet->sys_dts_usec >= stream->stop_ts) {
968 			deactivate_replay_buffer(stream, 0);
969 			return;
970 		}
971 	}
972 
973 	obs_encoder_packet_ref(&pkt, packet);
974 	replay_buffer_purge(stream, &pkt);
975 
976 	if (!stream->packets.size)
977 		stream->cur_time = pkt.dts_usec;
978 	stream->cur_size += pkt.size;
979 
980 	circlebuf_push_back(&stream->packets, packet, sizeof(*packet));
981 
982 	if (packet->type == OBS_ENCODER_VIDEO && packet->keyframe)
983 		stream->keyframes++;
984 
985 	if (stream->save_ts && packet->sys_dts_usec >= stream->save_ts) {
986 		if (os_atomic_load_bool(&stream->muxing))
987 			return;
988 
989 		if (stream->mux_thread_joinable) {
990 			pthread_join(stream->mux_thread, NULL);
991 			stream->mux_thread_joinable = false;
992 		}
993 
994 		stream->save_ts = 0;
995 		replay_buffer_save(stream);
996 	}
997 }
998 
replay_buffer_defaults(obs_data_t * s)999 static void replay_buffer_defaults(obs_data_t *s)
1000 {
1001 	obs_data_set_default_int(s, "max_time_sec", 15);
1002 	obs_data_set_default_int(s, "max_size_mb", 500);
1003 	obs_data_set_default_string(s, "format", "%CCYY-%MM-%DD %hh-%mm-%ss");
1004 	obs_data_set_default_string(s, "extension", "mp4");
1005 	obs_data_set_default_bool(s, "allow_spaces", true);
1006 }
1007 
1008 struct obs_output_info replay_buffer = {
1009 	.id = "replay_buffer",
1010 	.flags = OBS_OUTPUT_AV | OBS_OUTPUT_ENCODED | OBS_OUTPUT_MULTI_TRACK |
1011 		 OBS_OUTPUT_CAN_PAUSE,
1012 	.get_name = replay_buffer_getname,
1013 	.create = replay_buffer_create,
1014 	.destroy = replay_buffer_destroy,
1015 	.start = replay_buffer_start,
1016 	.stop = ffmpeg_mux_stop,
1017 	.encoded_packet = replay_buffer_data,
1018 	.get_total_bytes = ffmpeg_mux_total_bytes,
1019 	.get_defaults = replay_buffer_defaults,
1020 };
1021