1 /*
2  * Copyright © 2019 Manuel Stoeckl
3  *
4  * Permission is hereby granted, free of charge, to any person obtaining
5  * a copy of this software and associated documentation files (the
6  * "Software"), to deal in the Software without restriction, including
7  * without limitation the rights to use, copy, modify, merge, publish,
8  * distribute, sublicense, and/or sell copies of the Software, and to
9  * permit persons to whom the Software is furnished to do so, subject to
10  * the following conditions:
11  *
12  * The above copyright notice and this permission notice (including the
13  * next paragraph) shall be included in all copies or substantial
14  * portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
17  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
18  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
19  * NONINFRINGEMENT.  IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
20  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
21  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
22  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
23  * SOFTWARE.
24  */
25 
26 #include "shadow.h"
27 
28 #include <errno.h>
29 #include <fcntl.h>
30 #include <inttypes.h>
31 #include <poll.h>
32 #include <stdio.h>
33 #include <stdlib.h>
34 #include <string.h>
35 #include <sys/mman.h>
36 #include <sys/socket.h>
37 #include <sys/stat.h>
38 #include <unistd.h>
39 
40 #ifdef HAS_LZ4
41 #include <lz4.h>
42 #include <lz4hc.h>
43 #endif
44 #ifdef HAS_ZSTD
45 #include <zstd.h>
46 #endif
47 
get_shadow_for_local_fd(struct fd_translation_map * map,int lfd)48 struct shadow_fd *get_shadow_for_local_fd(
49 		struct fd_translation_map *map, int lfd)
50 {
51 	for (struct shadow_fd_link *lcur = map->link.l_next,
52 				   *lnxt = lcur->l_next;
53 			lcur != &map->link; lcur = lnxt, lnxt = lcur->l_next) {
54 		struct shadow_fd *cur = (struct shadow_fd *)lcur;
55 		if (cur->fd_local == lfd) {
56 			return cur;
57 		}
58 	}
59 	return NULL;
60 }
get_shadow_for_rid(struct fd_translation_map * map,int rid)61 struct shadow_fd *get_shadow_for_rid(struct fd_translation_map *map, int rid)
62 {
63 	for (struct shadow_fd_link *lcur = map->link.l_next,
64 				   *lnxt = lcur->l_next;
65 			lcur != &map->link; lcur = lnxt, lnxt = lcur->l_next) {
66 		struct shadow_fd *cur = (struct shadow_fd *)lcur;
67 		if (cur->remote_id == rid) {
68 			return cur;
69 		}
70 	}
71 	return NULL;
72 }
destroy_unlinked_sfd(struct shadow_fd * sfd)73 static void destroy_unlinked_sfd(struct shadow_fd *sfd)
74 {
75 	wp_debug("Destroying %s RID=%d", fdcat_to_str(sfd->type),
76 			sfd->remote_id);
77 	/* video must be cleaned up before any buffers that it may rely on */
78 	destroy_video_data(sfd);
79 
80 	/* free all accumulated damage records */
81 	reset_damage(&sfd->damage);
82 	free(sfd->damage_task_interval_store);
83 
84 	if (sfd->type == FDC_FILE) {
85 		munmap(sfd->mem_local, sfd->buffer_size);
86 		zeroed_aligned_free(sfd->mem_mirror, &sfd->mem_mirror_handle);
87 	} else if (sfd->type == FDC_DMABUF || sfd->type == FDC_DMAVID_IR ||
88 			sfd->type == FDC_DMAVID_IW) {
89 		if (sfd->dmabuf_map_handle) {
90 			unmap_dmabuf(sfd->dmabuf_bo, sfd->dmabuf_map_handle);
91 		}
92 		destroy_dmabuf(sfd->dmabuf_bo);
93 		zeroed_aligned_free(sfd->mem_mirror, &sfd->mem_mirror_handle);
94 	} else if (sfd->type == FDC_PIPE) {
95 		if (sfd->pipe.fd != sfd->fd_local && sfd->pipe.fd != -1) {
96 			checked_close(sfd->pipe.fd);
97 		}
98 		free(sfd->pipe.recv.data);
99 		free(sfd->pipe.send.data);
100 	}
101 	if (sfd->fd_local != -1) {
102 		checked_close(sfd->fd_local);
103 	}
104 	free(sfd);
105 }
cleanup_thread_local(struct thread_data * data)106 static void cleanup_thread_local(struct thread_data *data)
107 {
108 #ifdef HAS_ZSTD
109 	ZSTD_freeCCtx(data->comp_ctx.zstd_ccontext);
110 	ZSTD_freeDCtx(data->comp_ctx.zstd_dcontext);
111 #endif
112 #ifdef HAS_LZ4
113 	free(data->comp_ctx.lz4_extstate);
114 #endif
115 	free(data->tmp_buf);
116 }
117 
setup_thread_local(struct thread_data * data,enum compression_mode mode,int compression_level)118 static void setup_thread_local(struct thread_data *data,
119 		enum compression_mode mode, int compression_level)
120 {
121 	struct comp_ctx *ctx = &data->comp_ctx;
122 	ctx->zstd_ccontext = NULL;
123 	ctx->zstd_dcontext = NULL;
124 	ctx->lz4_extstate = NULL;
125 #ifdef HAS_LZ4
126 	if (mode == COMP_LZ4) {
127 		/* Like LZ4Frame, integer codes indicate compression level.
128 		 * Negative numbers are acceleration, positive use the HC
129 		 * routines */
130 		if (compression_level <= 0) {
131 			ctx->lz4_extstate = malloc((size_t)LZ4_sizeofState());
132 		} else {
133 			ctx->lz4_extstate = malloc((size_t)LZ4_sizeofStateHC());
134 		}
135 	}
136 #endif
137 #ifdef HAS_ZSTD
138 	if (mode == COMP_ZSTD) {
139 		ctx->zstd_ccontext = ZSTD_createCCtx();
140 		ctx->zstd_dcontext = ZSTD_createDCtx();
141 	}
142 #endif
143 	(void)mode;
144 	(void)compression_level;
145 
146 	data->tmp_buf = NULL;
147 	data->tmp_size = 0;
148 }
cleanup_translation_map(struct fd_translation_map * map)149 void cleanup_translation_map(struct fd_translation_map *map)
150 {
151 	for (struct shadow_fd_link *lcur = map->link.l_next,
152 				   *lnxt = lcur->l_next;
153 			lcur != &map->link; lcur = lnxt, lnxt = lcur->l_next) {
154 		struct shadow_fd *cur = (struct shadow_fd *)lcur;
155 		destroy_unlinked_sfd(cur);
156 	}
157 	map->link.l_next = &map->link;
158 	map->link.l_prev = &map->link;
159 }
destroy_shadow_if_unreferenced(struct shadow_fd * sfd)160 bool destroy_shadow_if_unreferenced(struct shadow_fd *sfd)
161 {
162 	bool autodelete = sfd->has_owner;
163 	if (sfd->type == FDC_PIPE && !sfd->pipe.can_read &&
164 			!sfd->pipe.can_write && !sfd->pipe.remote_can_read &&
165 			!sfd->pipe.remote_can_write) {
166 		autodelete = true;
167 	}
168 	if (sfd->refcount.protocol == 0 && sfd->refcount.transfer == 0 &&
169 			sfd->refcount.compute == false && autodelete) {
170 		/* remove shadowfd from list */
171 		sfd->link.l_prev->l_next = sfd->link.l_next;
172 		sfd->link.l_next->l_prev = sfd->link.l_prev;
173 		sfd->link.l_next = NULL;
174 		sfd->link.l_prev = NULL;
175 
176 		destroy_unlinked_sfd(sfd);
177 		return true;
178 	} else if (sfd->refcount.protocol < 0 || sfd->refcount.transfer < 0) {
179 		wp_error("Negative refcount for rid=%d: %d protocol references, %d transfer references",
180 				sfd->remote_id, sfd->refcount.protocol,
181 				sfd->refcount.transfer);
182 	}
183 	return false;
184 }
185 
186 static void *worker_thread_main(void *arg);
setup_translation_map(struct fd_translation_map * map,bool display_side)187 void setup_translation_map(struct fd_translation_map *map, bool display_side)
188 {
189 	map->local_sign = display_side ? -1 : 1;
190 	map->link.l_next = &map->link;
191 	map->link.l_prev = &map->link;
192 	map->max_local_id = 1;
193 }
194 
shutdown_threads(struct thread_pool * pool)195 static void shutdown_threads(struct thread_pool *pool)
196 {
197 	pthread_mutex_lock(&pool->work_mutex);
198 	free(pool->stack);
199 	struct task_data task;
200 	memset(&task, 0, sizeof(task));
201 	task.type = TASK_STOP;
202 	pool->stack = &task;
203 	pool->stack_count = 1;
204 	pool->stack_size = 1;
205 	pool->do_work = true;
206 	pthread_cond_broadcast(&pool->work_cond);
207 	pthread_mutex_unlock(&pool->work_mutex);
208 
209 	if (pool->threads) {
210 		for (int i = 1; i < pool->nthreads; i++) {
211 			if (pool->threads[i].thread) {
212 				pthread_join(pool->threads[i].thread, NULL);
213 			}
214 		}
215 	}
216 	pool->stack = NULL;
217 }
218 
setup_thread_pool(struct thread_pool * pool,enum compression_mode compression,int comp_level,int n_threads)219 int setup_thread_pool(struct thread_pool *pool,
220 		enum compression_mode compression, int comp_level,
221 		int n_threads)
222 {
223 	memset(pool, 0, sizeof(struct thread_pool));
224 
225 	pool->diff_func = get_diff_function(
226 			DIFF_FASTEST, &pool->diff_alignment_bits);
227 
228 	pool->compression = compression;
229 	pool->compression_level = comp_level;
230 	if (n_threads <= 0) {
231 		// platform dependent
232 		int nt = get_hardware_thread_count();
233 		pool->nthreads = max(nt / 2, 1);
234 	} else {
235 		pool->nthreads = n_threads;
236 	}
237 	pool->stack_size = 0;
238 	pool->stack_count = 0;
239 	pool->stack = NULL;
240 	pool->tasks_in_progress = 0;
241 	pool->do_work = true;
242 
243 	/* Thread #0 is the 'main' thread */
244 	pool->threads = calloc(
245 			(size_t)pool->nthreads, sizeof(struct thread_data));
246 	if (!pool->threads) {
247 		wp_error("Failed to allocate list of thread data");
248 		return -1;
249 	}
250 
251 	int ret;
252 	ret = pthread_mutex_init(&pool->work_mutex, NULL);
253 	if (ret) {
254 		wp_error("Mutex creation failed: %s", strerror(ret));
255 		return -1;
256 	}
257 	ret = pthread_cond_init(&pool->work_cond, NULL);
258 	if (ret) {
259 		wp_error("Condition variable creation failed: %s",
260 				strerror(ret));
261 		return -1;
262 	}
263 
264 	pool->threads[0].pool = pool;
265 	pool->threads[0].thread = pthread_self();
266 	for (int i = 1; i < pool->nthreads; i++) {
267 		pool->threads[i].pool = pool;
268 		ret = pthread_create(&pool->threads[i].thread, NULL,
269 				worker_thread_main, &pool->threads[i]);
270 		if (ret) {
271 			wp_error("Thread creation failed: %s", strerror(ret));
272 			// Stop making new threads, but keep what is there
273 			pool->nthreads = i;
274 			break;
275 		}
276 	}
277 
278 	/* Setup thread local data from the main thread, to avoid requiring
279 	 * the worker threads to allocate pools, for a few fixed buffers */
280 	for (int i = 0; i < pool->nthreads; i++) {
281 		setup_thread_local(&pool->threads[i], compression, comp_level);
282 	}
283 
284 	int fds[2];
285 	if (pipe(fds) == -1) {
286 		wp_error("Failed to create pipe: %s", strerror(errno));
287 	}
288 	pool->selfpipe_r = fds[0];
289 	pool->selfpipe_w = fds[1];
290 	if (set_nonblocking(pool->selfpipe_r) == -1) {
291 		wp_error("Failed to make read end of pipe nonblocking: %s",
292 				strerror(errno));
293 	}
294 	return 0;
295 }
cleanup_thread_pool(struct thread_pool * pool)296 void cleanup_thread_pool(struct thread_pool *pool)
297 {
298 	shutdown_threads(pool);
299 	if (pool->threads) {
300 		for (int i = 0; i < pool->nthreads; i++) {
301 			cleanup_thread_local(&pool->threads[i]);
302 		}
303 	}
304 
305 	pthread_mutex_destroy(&pool->work_mutex);
306 	pthread_cond_destroy(&pool->work_cond);
307 	free(pool->threads);
308 	free(pool->stack);
309 
310 	checked_close(pool->selfpipe_r);
311 	checked_close(pool->selfpipe_w);
312 }
313 
fdcat_to_str(enum fdcat cat)314 const char *fdcat_to_str(enum fdcat cat)
315 {
316 	switch (cat) {
317 	case FDC_UNKNOWN:
318 		return "FDC_UNKNOWN";
319 	case FDC_FILE:
320 		return "FDC_FILE";
321 	case FDC_PIPE:
322 		return "FDC_PIPE";
323 	case FDC_DMABUF:
324 		return "FDC_DMABUF";
325 	case FDC_DMAVID_IR:
326 		return "FDC_DMAVID_IR";
327 	case FDC_DMAVID_IW:
328 		return "FDC_DMAVID_IW";
329 	}
330 	return "<invalid>";
331 }
332 
compression_mode_to_str(enum compression_mode mode)333 const char *compression_mode_to_str(enum compression_mode mode)
334 {
335 	switch (mode) {
336 	case COMP_NONE:
337 		return "NONE";
338 	case COMP_LZ4:
339 		return "LZ4";
340 	case COMP_ZSTD:
341 		return "ZSTD";
342 	default:
343 		return "<invalid>";
344 	}
345 }
346 
get_fd_type(int fd,size_t * size)347 enum fdcat get_fd_type(int fd, size_t *size)
348 {
349 	struct stat fsdata;
350 	memset(&fsdata, 0, sizeof(fsdata));
351 	int ret = fstat(fd, &fsdata);
352 	if (ret == -1) {
353 		wp_error("The fd %d is not file-like: %s", fd, strerror(errno));
354 		return FDC_UNKNOWN;
355 	} else if (S_ISREG(fsdata.st_mode)) {
356 		if (size) {
357 			*size = (size_t)fsdata.st_size;
358 		}
359 		return FDC_FILE;
360 	} else if (S_ISFIFO(fsdata.st_mode) || S_ISCHR(fsdata.st_mode) ||
361 			S_ISSOCK(fsdata.st_mode)) {
362 		if (S_ISCHR(fsdata.st_mode)) {
363 			wp_error("The fd %d, size %" PRId64
364 				 ", mode %x is a character device. Proceeding under the assumption that it is pipe-like.",
365 					fd, (int64_t)fsdata.st_size,
366 					fsdata.st_mode);
367 		}
368 		if (S_ISSOCK(fsdata.st_mode)) {
369 			wp_error("The fd %d, size %" PRId64
370 				 ", mode %x is a socket. Proceeding under the assumption that it is pipe-like.",
371 					fd, (int64_t)fsdata.st_size,
372 					fsdata.st_mode);
373 		}
374 		return FDC_PIPE;
375 	} else if (is_dmabuf(fd) == 1) {
376 		return FDC_DMABUF;
377 	} else {
378 		wp_error("The fd %d has an unusual mode %x (type=%x): blk=%d chr=%d dir=%d lnk=%d reg=%d fifo=%d sock=%d; expect an application crash!",
379 				fd, fsdata.st_mode, fsdata.st_mode & S_IFMT,
380 				S_ISBLK(fsdata.st_mode),
381 				S_ISCHR(fsdata.st_mode),
382 				S_ISDIR(fsdata.st_mode),
383 				S_ISLNK(fsdata.st_mode),
384 				S_ISREG(fsdata.st_mode),
385 				S_ISFIFO(fsdata.st_mode),
386 				S_ISSOCK(fsdata.st_mode), strerror(errno));
387 		return FDC_UNKNOWN;
388 	}
389 }
390 
compress_bufsize(struct thread_pool * pool,size_t max_input)391 static size_t compress_bufsize(struct thread_pool *pool, size_t max_input)
392 {
393 	switch (pool->compression) {
394 	default:
395 	case COMP_NONE:
396 		(void)max_input;
397 		return 0;
398 #ifdef HAS_LZ4
399 	case COMP_LZ4:
400 		/* This bound applies for both LZ4 and LZ4HC compressors */
401 		return (size_t)LZ4_compressBound((int)max_input);
402 #endif
403 #ifdef HAS_ZSTD
404 	case COMP_ZSTD:
405 		return ZSTD_compressBound(max_input);
406 #endif
407 	}
408 	return 0;
409 }
410 
411 /* With the selected compression method, compress the buffer
412  * {isize,ibuf}, possibly modifying {msize,mbuf}, and setting
413  * {wsize,wbuf} to indicate the result */
compress_buffer(struct thread_pool * pool,struct comp_ctx * ctx,size_t isize,const char * ibuf,size_t msize,char * mbuf,struct bytebuf * dst)414 static void compress_buffer(struct thread_pool *pool, struct comp_ctx *ctx,
415 		size_t isize, const char *ibuf, size_t msize, char *mbuf,
416 		struct bytebuf *dst)
417 {
418 	(void)ctx;
419 	// Ensure inputs always nontrivial
420 	if (isize == 0) {
421 		dst->size = 0;
422 		dst->data = (char *)ibuf;
423 		return;
424 	}
425 
426 	DTRACE_PROBE1(waypipe, compress_buffer_enter, isize);
427 	switch (pool->compression) {
428 	default:
429 	case COMP_NONE:
430 		(void)msize;
431 		(void)mbuf;
432 		dst->size = isize;
433 		dst->data = (char *)ibuf;
434 		break;
435 #ifdef HAS_LZ4
436 	case COMP_LZ4: {
437 		int ws;
438 		if (pool->compression_level <= 0) {
439 			ws = LZ4_compress_fast_extState(ctx->lz4_extstate, ibuf,
440 					mbuf, (int)isize, (int)msize,
441 					-pool->compression_level);
442 		} else {
443 			ws = LZ4_compress_HC_extStateHC(ctx->lz4_extstate, ibuf,
444 					mbuf, (int)isize, (int)msize,
445 					pool->compression_level);
446 		}
447 		if (ws == 0) {
448 			wp_error("LZ4 compression failed for %zu bytes in %zu of space",
449 					isize, msize);
450 		}
451 		dst->size = (size_t)ws;
452 		dst->data = (char *)mbuf;
453 		break;
454 	}
455 #endif
456 #ifdef HAS_ZSTD
457 	case COMP_ZSTD: {
458 		size_t ws = ZSTD_compressCCtx(ctx->zstd_ccontext, mbuf, msize,
459 				ibuf, isize, pool->compression_level);
460 		if (ZSTD_isError(ws)) {
461 			wp_error("Zstd compression failed for %d bytes in %d of space: %s",
462 					(int)isize, (int)msize,
463 					ZSTD_getErrorName(ws));
464 		}
465 		dst->size = (size_t)ws;
466 		dst->data = (char *)mbuf;
467 		break;
468 	}
469 #endif
470 	}
471 	DTRACE_PROBE1(waypipe, compress_buffer_exit, dst->size);
472 }
473 /* With the selected compression method, uncompress the buffer {isize,ibuf},
474  * to precisely msize bytes, setting {wsize,wbuf} to indicate the result.
475  * If the compression mode requires it. */
uncompress_buffer(struct thread_pool * pool,struct comp_ctx * ctx,size_t isize,const char * ibuf,size_t msize,char * mbuf,size_t * wsize,const char ** wbuf)476 static void uncompress_buffer(struct thread_pool *pool, struct comp_ctx *ctx,
477 		size_t isize, const char *ibuf, size_t msize, char *mbuf,
478 		size_t *wsize, const char **wbuf)
479 {
480 	(void)ctx;
481 	// Ensure inputs always nontrivial
482 	if (isize == 0) {
483 		*wsize = 0;
484 		*wbuf = ibuf;
485 		return;
486 	}
487 
488 	DTRACE_PROBE1(waypipe, uncompress_buffer_enter, isize);
489 	switch (pool->compression) {
490 	default:
491 	case COMP_NONE:
492 		(void)mbuf;
493 		(void)msize;
494 		*wsize = isize;
495 		*wbuf = ibuf;
496 		break;
497 #ifdef HAS_LZ4
498 	case COMP_LZ4: {
499 		int ws = LZ4_decompress_safe(
500 				ibuf, mbuf, (int)isize, (int)msize);
501 		if (ws < 0 || (size_t)ws != msize) {
502 			wp_error("Lz4 decompression failed for %d bytes to %d of space, used %d",
503 					(int)isize, (int)msize, ws);
504 		}
505 		*wsize = (size_t)ws;
506 		*wbuf = mbuf;
507 		break;
508 	}
509 #endif
510 #ifdef HAS_ZSTD
511 	case COMP_ZSTD: {
512 		size_t ws = ZSTD_decompressDCtx(
513 				ctx->zstd_dcontext, mbuf, msize, ibuf, isize);
514 		if (ZSTD_isError(ws) || (size_t)ws != msize) {
515 			wp_error("Zstd decompression failed for %d bytes to %d of space: %s",
516 					(int)isize, (int)msize,
517 					ZSTD_getErrorName(ws));
518 			ws = 0;
519 		}
520 		*wsize = ws;
521 		*wbuf = mbuf;
522 		break;
523 	}
524 #endif
525 	}
526 	DTRACE_PROBE1(waypipe, uncompress_buffer_exit, *wsize);
527 }
528 
translate_fd(struct fd_translation_map * map,struct render_data * render,int fd,enum fdcat type,size_t file_sz,const struct dmabuf_slice_data * info,bool read_modifier,bool force_pipe_iw)529 struct shadow_fd *translate_fd(struct fd_translation_map *map,
530 		struct render_data *render, int fd, enum fdcat type,
531 		size_t file_sz, const struct dmabuf_slice_data *info,
532 		bool read_modifier, bool force_pipe_iw)
533 {
534 	struct shadow_fd *sfd = get_shadow_for_local_fd(map, fd);
535 	if (sfd) {
536 		return sfd;
537 	}
538 	if (type == FDC_DMAVID_IR || type == FDC_DMAVID_IW) {
539 		if (!info) {
540 			wp_error("No dmabuf info provided");
541 			return NULL;
542 		}
543 	}
544 
545 	// Create a new translation map.
546 	sfd = calloc(1, sizeof(struct shadow_fd));
547 	if (!sfd) {
548 		wp_error("Failed to allocate shadow_fd structure");
549 		return NULL;
550 	}
551 	sfd->link.l_prev = &map->link;
552 	sfd->link.l_next = map->link.l_next;
553 	sfd->link.l_prev->l_next = &sfd->link;
554 	sfd->link.l_next->l_prev = &sfd->link;
555 
556 	sfd->fd_local = fd;
557 	sfd->mem_local = NULL;
558 	sfd->mem_mirror = NULL;
559 	sfd->mem_mirror_handle = NULL;
560 	sfd->buffer_size = 0;
561 	sfd->remote_id = (map->max_local_id++) * map->local_sign;
562 	sfd->type = type;
563 	// File changes must be propagated
564 	sfd->is_dirty = true;
565 	/* files/dmabufs are damaged by default; shm_pools are explicitly
566 	 * undamaged in handlers.c */
567 	damage_everything(&sfd->damage);
568 	sfd->has_owner = false;
569 	/* Start the number of expected transfers to channel remaining
570 	 * at one, and number of protocol objects referencing this
571 	 * shadow_fd at zero.*/
572 	sfd->refcount.transfer = 1;
573 	sfd->refcount.protocol = 0;
574 	sfd->refcount.compute = false;
575 
576 	sfd->only_here = true;
577 
578 	wp_debug("Creating new %s shadow RID=%d for local fd %d",
579 			fdcat_to_str(sfd->type), sfd->remote_id, fd);
580 	switch (sfd->type) {
581 	case FDC_FILE: {
582 		if (file_sz >= UINT32_MAX / 2) {
583 			wp_error("Failed to create shadow structure, file size %zu too large to transfer",
584 					file_sz);
585 			return sfd;
586 		}
587 		sfd->buffer_size = file_sz;
588 		sfd->file_readonly = false;
589 		// both r/w permissions, because the side which allocates the
590 		// memory does not always have to be the side that modifies it
591 		sfd->mem_local = mmap(NULL, sfd->buffer_size,
592 				PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
593 		if (sfd->mem_local == MAP_FAILED &&
594 				(errno == EPERM || errno == EACCES)) {
595 			wp_debug("Initial mmap for RID=%d failed, trying private+readonly",
596 					sfd->remote_id);
597 			// Some files are memfds that are sealed
598 			// to be read-only
599 			sfd->mem_local = mmap(NULL, sfd->buffer_size, PROT_READ,
600 					MAP_PRIVATE, fd, 0);
601 			if (sfd->mem_local != MAP_FAILED) {
602 				sfd->file_readonly = true;
603 			}
604 		}
605 
606 		if (sfd->mem_local == MAP_FAILED) {
607 			wp_error("Mmap failed when creating shadow RID=%d: %s",
608 					sfd->remote_id, strerror(errno));
609 			return sfd;
610 		}
611 		// This will be created at the first transfer.
612 		// todo: why not create it now?
613 		sfd->mem_mirror = NULL;
614 	} break;
615 	case FDC_PIPE: {
616 		// Make this end of the pipe nonblocking, so that we can
617 		// include it in our main loop.
618 		if (set_nonblocking(sfd->fd_local) == -1) {
619 			wp_error("Failed to make fd nonblocking");
620 		}
621 		sfd->pipe.fd = sfd->fd_local;
622 
623 		if (force_pipe_iw) {
624 			sfd->pipe.can_write = true;
625 		} else {
626 			/* this classification overestimates with
627 			 * socketpairs that have partially been shutdown.
628 			 * what about platform-specific RW pipes? */
629 			int flags = fcntl(fd, F_GETFL, 0);
630 			if (flags == -1) {
631 				wp_error("fctnl F_GETFL failed!");
632 			}
633 			if ((flags & O_ACCMODE) == O_RDONLY) {
634 				sfd->pipe.can_read = true;
635 			} else if ((flags & O_ACCMODE) == O_WRONLY) {
636 				sfd->pipe.can_write = true;
637 			} else {
638 				sfd->pipe.can_read = true;
639 				sfd->pipe.can_write = true;
640 			}
641 		}
642 	} break;
643 	case FDC_DMAVID_IR: {
644 		sfd->video_fmt = render->av_video_fmt;
645 
646 		memcpy(&sfd->dmabuf_info, info,
647 				sizeof(struct dmabuf_slice_data));
648 		init_render_data(render);
649 		sfd->dmabuf_bo = import_dmabuf(render, sfd->fd_local,
650 				&sfd->buffer_size, &sfd->dmabuf_info,
651 				read_modifier);
652 		if (!sfd->dmabuf_bo) {
653 			return sfd;
654 		}
655 		if (setup_video_encode(sfd, render) == -1) {
656 			wp_error("Video encoding setup failed for RID=%d",
657 					sfd->remote_id);
658 		}
659 	} break;
660 	case FDC_DMAVID_IW: {
661 		sfd->video_fmt = render->av_video_fmt;
662 
663 		memcpy(&sfd->dmabuf_info, info,
664 				sizeof(struct dmabuf_slice_data));
665 		// TODO: multifd-dmabuf video surface
666 		init_render_data(render);
667 		sfd->dmabuf_bo = import_dmabuf(render, sfd->fd_local,
668 				&sfd->buffer_size, &sfd->dmabuf_info,
669 				read_modifier);
670 		if (!sfd->dmabuf_bo) {
671 			return sfd;
672 		}
673 		if (setup_video_decode(sfd, render) == -1) {
674 			wp_error("Video decoding setup failed for RID=%d",
675 					sfd->remote_id);
676 		}
677 	} break;
678 	case FDC_DMABUF: {
679 		sfd->buffer_size = 0;
680 
681 		init_render_data(render);
682 		if (info) {
683 			memcpy(&sfd->dmabuf_info, info,
684 					sizeof(struct dmabuf_slice_data));
685 		} else {
686 			// already zero initialized (no information).
687 		}
688 		sfd->dmabuf_bo = import_dmabuf(render, sfd->fd_local,
689 				&sfd->buffer_size, &sfd->dmabuf_info,
690 				read_modifier);
691 		if (!sfd->dmabuf_bo) {
692 			return sfd;
693 		}
694 		// to be created on first transfer
695 		sfd->mem_mirror = NULL;
696 	} break;
697 	case FDC_UNKNOWN:
698 		wp_error("Trying to create shadow_fd for unknown filedesc type");
699 		break;
700 	}
701 	return sfd;
702 }
703 
shrink_buffer(void * buf,size_t sz)704 static void *shrink_buffer(void *buf, size_t sz)
705 {
706 	void *nbuf = realloc(buf, sz);
707 	if (nbuf) {
708 		return nbuf;
709 	} else {
710 		wp_debug("Failed to shrink buffer with realloc, not a problem");
711 		return buf;
712 	}
713 }
714 
715 /* Construct and optionally compress a diff between sfd->mem_mirror and
716  * the actual memmap'd data */
worker_run_compress_diff(struct task_data * task,struct thread_data * local)717 static void worker_run_compress_diff(
718 		struct task_data *task, struct thread_data *local)
719 {
720 	struct shadow_fd *sfd = task->sfd;
721 	struct thread_pool *pool = local->pool;
722 
723 	size_t damage_space = 0;
724 	for (int i = 0; i < task->damage_len; i++) {
725 		int range = task->damage_intervals[i].end -
726 			    task->damage_intervals[i].start;
727 		damage_space += (size_t)range + 8;
728 	}
729 	if (task->damaged_end) {
730 		damage_space += 1u << pool->diff_alignment_bits;
731 	}
732 
733 	DTRACE_PROBE1(waypipe, worker_compdiff_enter, damage_space);
734 
735 	char *diff_buffer = NULL;
736 	char *diff_target = NULL;
737 	if (pool->compression == COMP_NONE) {
738 		diff_buffer = malloc(
739 				damage_space + sizeof(struct wmsg_buffer_diff));
740 		if (!diff_buffer) {
741 			wp_error("Allocation failed, dropping diff transfer block");
742 			goto end;
743 		}
744 		diff_target = diff_buffer + sizeof(struct wmsg_buffer_diff);
745 	} else {
746 		if (buf_ensure_size((int)damage_space, 1, &local->tmp_size,
747 				    &local->tmp_buf) == -1) {
748 			wp_error("Allocation failed, dropping diff transfer block");
749 			goto end;
750 		}
751 		diff_target = local->tmp_buf;
752 	}
753 
754 	DTRACE_PROBE1(waypipe, construct_diff_enter, task->damage_len);
755 	size_t diffsize = construct_diff_core(pool->diff_func,
756 			pool->diff_alignment_bits, task->damage_intervals,
757 			task->damage_len, sfd->mem_mirror, sfd->mem_local,
758 			diff_target);
759 	size_t ntrailing = 0;
760 	if (task->damaged_end) {
761 		ntrailing = construct_diff_trailing(sfd->buffer_size,
762 				pool->diff_alignment_bits, sfd->mem_mirror,
763 				sfd->mem_local, diff_target + diffsize);
764 	}
765 	DTRACE_PROBE1(waypipe, construct_diff_exit, diffsize);
766 
767 	if (diffsize == 0 && ntrailing == 0) {
768 		free(diff_buffer);
769 		goto end;
770 	}
771 
772 	uint8_t *msg;
773 	size_t sz;
774 	size_t net_diff_sz = diffsize + ntrailing;
775 	if (pool->compression == COMP_NONE) {
776 		sz = net_diff_sz + sizeof(struct wmsg_buffer_diff);
777 		msg = (uint8_t *)diff_buffer;
778 	} else {
779 		struct bytebuf dst;
780 		size_t comp_size = compress_bufsize(pool, net_diff_sz);
781 		char *comp_buf = malloc(alignz(comp_size, 4) +
782 					sizeof(struct wmsg_buffer_diff));
783 		if (!comp_buf) {
784 			wp_error("Allocation failed, dropping diff transfer block");
785 			goto end;
786 		}
787 		compress_buffer(pool, &local->comp_ctx, net_diff_sz,
788 				diff_target, comp_size,
789 				comp_buf + sizeof(struct wmsg_buffer_diff),
790 				&dst);
791 		sz = dst.size + sizeof(struct wmsg_buffer_diff);
792 		msg = (uint8_t *)comp_buf;
793 	}
794 	msg = shrink_buffer(msg, alignz(sz, 4));
795 	memset(msg + sz, 0, alignz(sz, 4) - sz);
796 	struct wmsg_buffer_diff header;
797 	header.size_and_type = transfer_header(sz, WMSG_BUFFER_DIFF);
798 	header.remote_id = sfd->remote_id;
799 	header.diff_size = (uint32_t)diffsize;
800 	header.ntrailing = (uint32_t)ntrailing;
801 	memcpy(msg, &header, sizeof(struct wmsg_buffer_diff));
802 
803 	transfer_async_add(task->msg_queue, msg, alignz(sz, 4));
804 
805 end:
806 	DTRACE_PROBE1(waypipe, worker_compdiff_exit, diffsize);
807 }
808 
809 /* Compress data for sfd->mem_mirror */
worker_run_compress_block(struct task_data * task,struct thread_data * local)810 static void worker_run_compress_block(
811 		struct task_data *task, struct thread_data *local)
812 {
813 
814 	struct shadow_fd *sfd = task->sfd;
815 	struct thread_pool *pool = local->pool;
816 	if (task->zone_end == task->zone_start) {
817 		wp_error("Skipping task");
818 		return;
819 	}
820 
821 	/* Allocate a disjoint target interval to each worker */
822 	size_t source_start = (size_t)task->zone_start;
823 	size_t source_end = (size_t)task->zone_end;
824 	DTRACE_PROBE1(waypipe, worker_comp_enter, source_end - source_start);
825 
826 	size_t sz = 0;
827 	uint8_t *msg;
828 	if (pool->compression == COMP_NONE) {
829 		sz = sizeof(struct wmsg_buffer_fill) +
830 		     (source_end - source_start);
831 
832 		msg = malloc(alignz(sz, 4));
833 		if (!msg) {
834 			wp_error("Allocation failed, dropping fill transfer block");
835 			goto end;
836 		}
837 		memcpy(msg + sizeof(struct wmsg_buffer_fill),
838 				sfd->mem_mirror + source_start,
839 				source_end - source_start);
840 	} else {
841 		size_t comp_size = compress_bufsize(
842 				pool, source_end - source_start);
843 		msg = malloc(alignz(comp_size, 4) +
844 				sizeof(struct wmsg_buffer_fill));
845 		if (!msg) {
846 			wp_error("Allocation failed, dropping fill transfer block");
847 			goto end;
848 		}
849 		struct bytebuf dst;
850 		compress_buffer(pool, &local->comp_ctx,
851 				source_end - source_start,
852 				&sfd->mem_mirror[source_start], comp_size,
853 				(char *)msg + sizeof(struct wmsg_buffer_fill),
854 				&dst);
855 		sz = dst.size + sizeof(struct wmsg_buffer_fill);
856 		msg = shrink_buffer(msg, alignz(sz, 4));
857 	}
858 	memset(msg + sz, 0, alignz(sz, 4) - sz);
859 	struct wmsg_buffer_fill header;
860 	header.size_and_type = transfer_header(sz, WMSG_BUFFER_FILL);
861 	header.remote_id = sfd->remote_id;
862 	header.start = (uint32_t)source_start;
863 	header.end = (uint32_t)source_end;
864 	memcpy(msg, &header, sizeof(struct wmsg_buffer_fill));
865 
866 	transfer_async_add(task->msg_queue, msg, alignz(sz, 4));
867 
868 end:
869 	DTRACE_PROBE1(waypipe, worker_comp_exit,
870 			sz - sizeof(struct wmsg_buffer_fill));
871 }
872 
873 /* Optionally compress the data in mem_mirror, and set up the initial
874  * transfer blocks */
queue_fill_transfers(struct thread_pool * threads,struct shadow_fd * sfd,struct transfer_queue * transfers)875 static void queue_fill_transfers(struct thread_pool *threads,
876 		struct shadow_fd *sfd, struct transfer_queue *transfers)
877 {
878 	// new transfer, we send file contents verbatim
879 	const int chunksize = 262144;
880 
881 	int region_start = (int)sfd->remote_bufsize;
882 	int region_end = (int)sfd->buffer_size;
883 	if (region_start > region_end) {
884 		wp_error("Cannot queue fill transfers for a size reduction from %d to %d bytes",
885 				region_start, region_end);
886 		return;
887 	}
888 	if (region_start == region_end) {
889 		return;
890 	}
891 
892 	/* Keep sfd alive at least until write to channel is done */
893 	sfd->refcount.compute = true;
894 
895 	int nshards = ceildiv((region_end - region_start), chunksize);
896 
897 	pthread_mutex_lock(&threads->work_mutex);
898 	if (buf_ensure_size(threads->stack_count + nshards,
899 			    sizeof(struct task_data), &threads->stack_size,
900 			    (void **)&threads->stack) == -1) {
901 		wp_error("Allocation failed, dropping some fill tasks");
902 		pthread_mutex_unlock(&threads->work_mutex);
903 		return;
904 	}
905 
906 	for (int i = 0; i < nshards; i++) {
907 		struct task_data task;
908 		memset(&task, 0, sizeof(task));
909 		task.type = TASK_COMPRESS_BLOCK;
910 		task.sfd = sfd;
911 		task.msg_queue = &transfers->async_recv_queue;
912 
913 		task.zone_start = split_interval(
914 				region_start, region_end, nshards, i);
915 		task.zone_end = split_interval(
916 				region_start, region_end, nshards, i + 1);
917 		threads->stack[threads->stack_count++] = task;
918 	}
919 	pthread_mutex_unlock(&threads->work_mutex);
920 }
921 
queue_diff_transfers(struct thread_pool * threads,struct shadow_fd * sfd,struct transfer_queue * transfers)922 static void queue_diff_transfers(struct thread_pool *threads,
923 		struct shadow_fd *sfd, struct transfer_queue *transfers)
924 {
925 	const int chunksize = 262144;
926 	if (!sfd->damage.damage) {
927 		return;
928 	}
929 
930 	/* Keep sfd alive at least until write to channel is done */
931 	sfd->refcount.compute = true;
932 
933 	int bs = 1 << threads->diff_alignment_bits;
934 	int align_end = bs * ((int)sfd->buffer_size / bs);
935 	bool check_tail = false;
936 
937 	int net_damage = 0;
938 	if (sfd->damage.damage == DAMAGE_EVERYTHING) {
939 		reset_damage(&sfd->damage);
940 		struct ext_interval all = {.start = 0,
941 				.width = align_end,
942 				.rep = 1,
943 				.stride = 0};
944 		merge_damage_records(&sfd->damage, 1, &all,
945 				threads->diff_alignment_bits);
946 		check_tail = true;
947 		net_damage = align_end;
948 	} else {
949 		for (int ir = 0, iw = 0; ir < sfd->damage.ndamage_intvs; ir++) {
950 			/* Extend all damage to the nearest alignment block */
951 			struct interval e = sfd->damage.damage[ir];
952 			check_tail |= e.end > align_end;
953 			e.end = min(e.end, align_end);
954 			if (e.start < e.end) {
955 				/* End clipping may produce empty/degenerate
956 				 * intervals, so filter them out now */
957 				sfd->damage.damage[iw++] = e;
958 				net_damage += e.end - e.start;
959 			}
960 			if (e.end & (bs - 1) || e.start & (bs - 1)) {
961 				wp_error("Interval [%d, %d) is not aligned",
962 						e.start, e.end);
963 			}
964 		}
965 	}
966 	int nshards = ceildiv(net_damage, chunksize);
967 
968 	/* Instead of allocating individual buffers for each task, create a
969 	 * global damage tracking buffer into which tasks index. It will be
970 	 * deleted in `finish_update`. */
971 	struct interval *intvs = malloc(
972 			sizeof(struct interval) *
973 			(size_t)(sfd->damage.ndamage_intvs + nshards));
974 	int *offsets = calloc((size_t)nshards + 1, sizeof(int));
975 	if (!offsets || !intvs) {
976 		// TODO: avoid making this allocation entirely
977 		wp_error("Failed to allocate diff region control buffer, dropping diff tasks");
978 		free(intvs);
979 		free(offsets);
980 		return;
981 	}
982 
983 	sfd->damage_task_interval_store = intvs;
984 	int tot_blocks = net_damage / bs;
985 	int ir = 0, iw = 0, acc_prev_blocks = 0;
986 	for (int shard = 0; shard < nshards; shard++) {
987 		int s_lower = split_interval(0, tot_blocks, nshards, shard);
988 		int s_upper = split_interval(0, tot_blocks, nshards, shard + 1);
989 
990 		while (acc_prev_blocks < s_upper &&
991 				ir < sfd->damage.ndamage_intvs) {
992 			struct interval e = sfd->damage.damage[ir];
993 			const int w = (e.end - e.start) / bs;
994 
995 			int a_low = max(0, s_lower - acc_prev_blocks);
996 			int a_high = min(w, s_upper - acc_prev_blocks);
997 
998 			struct interval r = {
999 					.start = e.start + bs * a_low,
1000 					.end = e.start + bs * a_high,
1001 			};
1002 			intvs[iw++] = r;
1003 
1004 			if (acc_prev_blocks + w > s_upper) {
1005 				break;
1006 			} else {
1007 				acc_prev_blocks += w;
1008 				ir++;
1009 			}
1010 		}
1011 
1012 		offsets[shard + 1] = iw;
1013 	}
1014 	/* Reset damage, once it has been applied */
1015 	reset_damage(&sfd->damage);
1016 
1017 	pthread_mutex_lock(&threads->work_mutex);
1018 	if (buf_ensure_size(threads->stack_count + nshards,
1019 			    sizeof(struct task_data), &threads->stack_size,
1020 			    (void **)&threads->stack) == -1) {
1021 		wp_error("Allocation failed, dropping some diff tasks");
1022 		pthread_mutex_unlock(&threads->work_mutex);
1023 		free(offsets);
1024 		return;
1025 	}
1026 
1027 	for (int i = 0; i < nshards; i++) {
1028 		struct task_data task;
1029 		memset(&task, 0, sizeof(task));
1030 		task.type = TASK_COMPRESS_DIFF;
1031 		task.sfd = sfd;
1032 		task.msg_queue = &transfers->async_recv_queue;
1033 
1034 		task.damage_len = offsets[i + 1] - offsets[i];
1035 		task.damage_intervals =
1036 				&sfd->damage_task_interval_store[offsets[i]];
1037 		task.damaged_end = (i == nshards - 1) && check_tail;
1038 
1039 		threads->stack[threads->stack_count++] = task;
1040 	}
1041 	pthread_mutex_unlock(&threads->work_mutex);
1042 	free(offsets);
1043 }
1044 
add_dmabuf_create_request(struct transfer_queue * transfers,struct shadow_fd * sfd,enum wmsg_type variant)1045 static void add_dmabuf_create_request(struct transfer_queue *transfers,
1046 		struct shadow_fd *sfd, enum wmsg_type variant)
1047 {
1048 	size_t actual_len = sizeof(struct wmsg_open_dmabuf) +
1049 			    sizeof(struct dmabuf_slice_data);
1050 	size_t padded_len = alignz(actual_len, 4);
1051 
1052 	uint8_t *data = calloc(1, padded_len);
1053 	struct wmsg_open_dmabuf *header = (struct wmsg_open_dmabuf *)data;
1054 	header->file_size = (uint32_t)sfd->buffer_size;
1055 	header->remote_id = sfd->remote_id;
1056 	header->size_and_type = transfer_header(actual_len, variant);
1057 	memcpy(data + sizeof(struct wmsg_open_dmabuf), &sfd->dmabuf_info,
1058 			sizeof(struct dmabuf_slice_data));
1059 
1060 	transfer_add(transfers, padded_len, data);
1061 }
1062 
add_dmabuf_create_request_v2(struct transfer_queue * transfers,struct shadow_fd * sfd,enum wmsg_type variant,enum video_coding_fmt fmt)1063 static void add_dmabuf_create_request_v2(struct transfer_queue *transfers,
1064 		struct shadow_fd *sfd, enum wmsg_type variant,
1065 		enum video_coding_fmt fmt)
1066 {
1067 	size_t actual_len = sizeof(struct wmsg_open_dmavid) +
1068 			    sizeof(struct dmabuf_slice_data);
1069 	static_assert((sizeof(struct wmsg_open_dmavid) +
1070 				      sizeof(struct dmabuf_slice_data)) %
1071 							4 ==
1072 					0,
1073 			"alignment");
1074 
1075 	uint8_t *data = calloc(1, actual_len);
1076 	struct wmsg_open_dmavid *header = (struct wmsg_open_dmavid *)data;
1077 	header->file_size = (uint32_t)sfd->buffer_size;
1078 	header->remote_id = sfd->remote_id;
1079 	header->size_and_type = transfer_header(actual_len, variant);
1080 	header->vid_flags = (fmt == VIDEO_H264) ? DMAVID_H264 : DMAVID_VP9;
1081 
1082 	memcpy(data + sizeof(*header), &sfd->dmabuf_info,
1083 			sizeof(struct dmabuf_slice_data));
1084 
1085 	transfer_add(transfers, actual_len, data);
1086 }
add_file_create_request(struct transfer_queue * transfers,struct shadow_fd * sfd)1087 static void add_file_create_request(
1088 		struct transfer_queue *transfers, struct shadow_fd *sfd)
1089 {
1090 	struct wmsg_open_file *header =
1091 			calloc(1, sizeof(struct wmsg_open_file));
1092 	header->file_size = (uint32_t)sfd->buffer_size;
1093 	header->remote_id = sfd->remote_id;
1094 	header->size_and_type = transfer_header(
1095 			sizeof(struct wmsg_open_file), WMSG_OPEN_FILE);
1096 
1097 	transfer_add(transfers, sizeof(struct wmsg_open_file), header);
1098 }
1099 
finish_update(struct shadow_fd * sfd)1100 void finish_update(struct shadow_fd *sfd)
1101 {
1102 	if (!sfd->refcount.compute) {
1103 		return;
1104 	}
1105 	if (sfd->type == FDC_DMABUF && sfd->dmabuf_map_handle) {
1106 		// if this fails, unmap_dmabuf will print error
1107 		(void)unmap_dmabuf(sfd->dmabuf_bo, sfd->dmabuf_map_handle);
1108 		sfd->dmabuf_map_handle = NULL;
1109 		sfd->mem_local = NULL;
1110 	}
1111 	if (sfd->damage_task_interval_store) {
1112 		free(sfd->damage_task_interval_store);
1113 		sfd->damage_task_interval_store = NULL;
1114 	}
1115 	sfd->refcount.compute = false;
1116 }
1117 
collect_update(struct thread_pool * threads,struct shadow_fd * sfd,struct transfer_queue * transfers,bool use_old_dmavid_req)1118 void collect_update(struct thread_pool *threads, struct shadow_fd *sfd,
1119 		struct transfer_queue *transfers, bool use_old_dmavid_req)
1120 {
1121 	switch (sfd->type) {
1122 	case FDC_FILE: {
1123 		if (!sfd->is_dirty) {
1124 			// File is clean, we have no reason to believe
1125 			// that its contents could have changed
1126 			return;
1127 		}
1128 		// Clear dirty state
1129 		sfd->is_dirty = false;
1130 		if (sfd->only_here) {
1131 			// increase space, to avoid overflow when
1132 			// writing this buffer along with padding
1133 			size_t alignment = 1u << threads->diff_alignment_bits;
1134 			sfd->mem_mirror = zeroed_aligned_alloc(
1135 					alignz(sfd->buffer_size, alignment),
1136 					alignment, &sfd->mem_mirror_handle);
1137 			if (!sfd->mem_mirror) {
1138 				wp_error("Failed to allocate mirror");
1139 				return;
1140 			}
1141 
1142 			sfd->only_here = false;
1143 
1144 			sfd->remote_bufsize = 0;
1145 
1146 			add_file_create_request(transfers, sfd);
1147 			sfd->remote_bufsize = sfd->buffer_size;
1148 			queue_diff_transfers(threads, sfd, transfers);
1149 			return;
1150 		}
1151 
1152 		if (sfd->remote_bufsize < sfd->buffer_size) {
1153 			struct wmsg_open_file *header = calloc(
1154 					1, sizeof(struct wmsg_open_file));
1155 			header->file_size = (uint32_t)sfd->buffer_size;
1156 			header->remote_id = sfd->remote_id;
1157 			header->size_and_type = transfer_header(
1158 					sizeof(struct wmsg_open_file),
1159 					WMSG_EXTEND_FILE);
1160 
1161 			transfer_add(transfers, sizeof(struct wmsg_open_file),
1162 					header);
1163 
1164 			sfd->remote_bufsize = sfd->buffer_size;
1165 		}
1166 
1167 		queue_diff_transfers(threads, sfd, transfers);
1168 	} break;
1169 	case FDC_DMABUF: {
1170 		// If buffer is clean, do not check for changes
1171 		if (!sfd->is_dirty) {
1172 			return;
1173 		}
1174 		sfd->is_dirty = false;
1175 
1176 		bool first = false;
1177 		if (sfd->only_here) {
1178 			sfd->only_here = false;
1179 			first = true;
1180 
1181 			add_dmabuf_create_request(
1182 					transfers, sfd, WMSG_OPEN_DMABUF);
1183 		}
1184 		if (!sfd->dmabuf_bo) {
1185 			// ^ was not previously able to create buffer
1186 			return;
1187 		}
1188 		if (!sfd->mem_local) {
1189 			sfd->mem_local = map_dmabuf(sfd->dmabuf_bo, false,
1190 					&sfd->dmabuf_map_handle, NULL, NULL);
1191 			if (!sfd->mem_local) {
1192 				return;
1193 			}
1194 		}
1195 		if (first) {
1196 			size_t alignment = 1u << threads->diff_alignment_bits;
1197 			sfd->mem_mirror = zeroed_aligned_alloc(
1198 					alignz(sfd->buffer_size, alignment),
1199 					alignment, &sfd->mem_mirror_handle);
1200 			if (!sfd->mem_mirror) {
1201 				wp_error("Failed to allocate mirror");
1202 				return;
1203 			}
1204 			memcpy(sfd->mem_mirror, sfd->mem_local,
1205 					sfd->buffer_size);
1206 
1207 			sfd->remote_bufsize = 0;
1208 			queue_fill_transfers(threads, sfd, transfers);
1209 			sfd->remote_bufsize = sfd->buffer_size;
1210 		} else {
1211 			// TODO: detailed damage tracking
1212 			damage_everything(&sfd->damage);
1213 
1214 			queue_diff_transfers(threads, sfd, transfers);
1215 		}
1216 		/* Unmapping will be handled by finish_update() */
1217 	} break;
1218 	case FDC_DMAVID_IR: {
1219 		if (!sfd->is_dirty) {
1220 			return;
1221 		}
1222 		sfd->is_dirty = false;
1223 		if (!sfd->dmabuf_bo || !sfd->video_context) {
1224 			// ^ was not previously able to create buffer
1225 			return;
1226 		}
1227 		if (sfd->only_here) {
1228 			sfd->only_here = false;
1229 			if (use_old_dmavid_req) {
1230 				add_dmabuf_create_request(transfers, sfd,
1231 						WMSG_OPEN_DMAVID_DST);
1232 			} else {
1233 				add_dmabuf_create_request_v2(transfers, sfd,
1234 						WMSG_OPEN_DMAVID_DST_V2,
1235 						sfd->video_fmt);
1236 			}
1237 		}
1238 		collect_video_from_mirror(sfd, transfers);
1239 	} break;
1240 	case FDC_DMAVID_IW: {
1241 		sfd->is_dirty = false;
1242 		if (sfd->only_here) {
1243 			sfd->only_here = false;
1244 			if (use_old_dmavid_req) {
1245 				add_dmabuf_create_request(transfers, sfd,
1246 						WMSG_OPEN_DMAVID_SRC);
1247 			} else {
1248 				add_dmabuf_create_request_v2(transfers, sfd,
1249 						WMSG_OPEN_DMAVID_SRC_V2,
1250 						sfd->video_fmt);
1251 			}
1252 		}
1253 	} break;
1254 	case FDC_PIPE: {
1255 		// Pipes always update, no matter what the message
1256 		// stream indicates.
1257 		if (sfd->only_here) {
1258 			sfd->only_here = false;
1259 
1260 			struct wmsg_basic *createh =
1261 					calloc(1, sizeof(struct wmsg_basic));
1262 			enum wmsg_type type;
1263 			if (sfd->pipe.can_read && !sfd->pipe.can_write) {
1264 				type = WMSG_OPEN_IW_PIPE;
1265 				sfd->pipe.remote_can_write = true;
1266 			} else if (sfd->pipe.can_write && !sfd->pipe.can_read) {
1267 				type = WMSG_OPEN_IR_PIPE;
1268 				sfd->pipe.remote_can_read = true;
1269 			} else {
1270 				type = WMSG_OPEN_RW_PIPE;
1271 				sfd->pipe.remote_can_read = true;
1272 				sfd->pipe.remote_can_write = true;
1273 			}
1274 			createh->size_and_type = transfer_header(
1275 					sizeof(struct wmsg_basic), type);
1276 			createh->remote_id = sfd->remote_id;
1277 
1278 			transfer_add(transfers, sizeof(struct wmsg_basic),
1279 					createh);
1280 		}
1281 
1282 		if (sfd->pipe.recv.used > 0) {
1283 			size_t msgsz = sizeof(struct wmsg_basic) +
1284 				       (size_t)sfd->pipe.recv.used;
1285 			char *buf = malloc(alignz(msgsz, 4));
1286 			struct wmsg_basic *header = (struct wmsg_basic *)buf;
1287 			header->size_and_type = transfer_header(
1288 					msgsz, WMSG_PIPE_TRANSFER);
1289 			header->remote_id = sfd->remote_id;
1290 			memcpy(buf + sizeof(struct wmsg_basic),
1291 					sfd->pipe.recv.data,
1292 					(size_t)sfd->pipe.recv.used);
1293 			memset(buf + msgsz, 0, alignz(msgsz, 4) - msgsz);
1294 
1295 			transfer_add(transfers, alignz(msgsz, 4), buf);
1296 
1297 			sfd->pipe.recv.used = 0;
1298 		}
1299 
1300 		if (!sfd->pipe.can_read && sfd->pipe.remote_can_write) {
1301 			struct wmsg_basic *header =
1302 					calloc(1, sizeof(struct wmsg_basic));
1303 			header->size_and_type = transfer_header(
1304 					sizeof(struct wmsg_basic),
1305 					WMSG_PIPE_SHUTDOWN_W);
1306 			header->remote_id = sfd->remote_id;
1307 			transfer_add(transfers, sizeof(struct wmsg_basic),
1308 					header);
1309 			sfd->pipe.remote_can_write = false;
1310 		}
1311 		if (!sfd->pipe.can_write && sfd->pipe.remote_can_read) {
1312 			struct wmsg_basic *header =
1313 					calloc(1, sizeof(struct wmsg_basic));
1314 			header->size_and_type = transfer_header(
1315 					sizeof(struct wmsg_basic),
1316 					WMSG_PIPE_SHUTDOWN_R);
1317 			header->remote_id = sfd->remote_id;
1318 			transfer_add(transfers, sizeof(struct wmsg_basic),
1319 					header);
1320 			sfd->pipe.remote_can_read = false;
1321 		}
1322 	} break;
1323 	case FDC_UNKNOWN:
1324 		break;
1325 	}
1326 }
1327 
increase_buffer_sizes(struct shadow_fd * sfd,struct thread_pool * threads,size_t new_size)1328 static void increase_buffer_sizes(struct shadow_fd *sfd,
1329 		struct thread_pool *threads, size_t new_size)
1330 {
1331 	size_t old_size = sfd->buffer_size;
1332 	munmap(sfd->mem_local, old_size);
1333 	sfd->buffer_size = new_size;
1334 	sfd->mem_local = mmap(NULL, sfd->buffer_size, PROT_READ | PROT_WRITE,
1335 			MAP_SHARED, sfd->fd_local, 0);
1336 	if (sfd->mem_local == MAP_FAILED) {
1337 		wp_error("Mmap failed to remap increased buffer for RID=%d: %s",
1338 				sfd->remote_id, strerror(errno));
1339 		return;
1340 	}
1341 	/* if resize happens before any transfers, mirror may still be zero */
1342 	if (sfd->mem_mirror) {
1343 		// todo: handle allocation failures
1344 		size_t alignment = 1u << threads->diff_alignment_bits;
1345 		void *new_mirror = zeroed_aligned_realloc(
1346 				alignz(old_size, alignment),
1347 				alignz(sfd->buffer_size, alignment), alignment,
1348 				sfd->mem_mirror, &sfd->mem_mirror_handle);
1349 		if (!new_mirror) {
1350 			wp_error("Failed to reallocate mirror");
1351 			return;
1352 		}
1353 		sfd->mem_mirror = new_mirror;
1354 	}
1355 }
1356 
pipe_close_write(struct shadow_fd * sfd)1357 static void pipe_close_write(struct shadow_fd *sfd)
1358 {
1359 	if (sfd->pipe.can_read) {
1360 		/* if pipe.fd is both readable and writable, assume
1361 		 * socket
1362 		 */
1363 		shutdown(sfd->pipe.fd, SHUT_WR);
1364 	} else {
1365 		checked_close(sfd->pipe.fd);
1366 		if (sfd->fd_local == sfd->pipe.fd) {
1367 			sfd->fd_local = -1;
1368 		}
1369 		sfd->pipe.fd = -1;
1370 	}
1371 	sfd->pipe.can_write = false;
1372 
1373 	/* Also free any accumulated data that was not delivered */
1374 	free(sfd->pipe.send.data);
1375 	memset(&sfd->pipe.send, 0, sizeof(sfd->pipe.send));
1376 }
pipe_close_read(struct shadow_fd * sfd)1377 static void pipe_close_read(struct shadow_fd *sfd)
1378 {
1379 	if (sfd->pipe.can_write) {
1380 		/* if pipe.fd is both readable and writable, assume
1381 		 * socket */
1382 		// TODO: check return value, can legitimately fail with ENOBUFS
1383 		shutdown(sfd->pipe.fd, SHUT_RD);
1384 	} else {
1385 		checked_close(sfd->pipe.fd);
1386 		if (sfd->fd_local == sfd->pipe.fd) {
1387 			sfd->fd_local = -1;
1388 		}
1389 		sfd->pipe.fd = -1;
1390 	}
1391 	sfd->pipe.can_read = false;
1392 }
1393 
open_sfd(struct fd_translation_map * map,struct shadow_fd ** sfd_ptr,int remote_id)1394 static int open_sfd(struct fd_translation_map *map, struct shadow_fd **sfd_ptr,
1395 		int remote_id)
1396 {
1397 	if (*sfd_ptr) {
1398 		wp_error("shadow structure for RID=%d was already created",
1399 				remote_id);
1400 		return ERR_FATAL;
1401 	}
1402 
1403 	wp_debug("Introducing new fd, remoteid=%d", remote_id);
1404 	struct shadow_fd *sfd = calloc(1, sizeof(struct shadow_fd));
1405 	if (!sfd) {
1406 		wp_error("failed to allocate shadow structure for RID=%d",
1407 				remote_id);
1408 		return ERR_FATAL;
1409 	}
1410 	sfd->link.l_prev = &map->link;
1411 	sfd->link.l_next = map->link.l_next;
1412 	sfd->link.l_prev->l_next = &sfd->link;
1413 	sfd->link.l_next->l_prev = &sfd->link;
1414 
1415 	sfd->remote_id = remote_id;
1416 	sfd->fd_local = -1;
1417 	sfd->is_dirty = false;
1418 	/* a received file descriptor is up to date by default */
1419 	reset_damage(&sfd->damage);
1420 	sfd->only_here = false;
1421 	/* Start the object reference at one, so that, if it is owned by
1422 	 * some known protocol object, it can not be deleted until the
1423 	 * fd has at least be transferred over the Wayland connection */
1424 	sfd->refcount.transfer = 1;
1425 	sfd->refcount.protocol = 0;
1426 	sfd->refcount.compute = false;
1427 	*sfd_ptr = sfd;
1428 	return 0;
1429 }
check_message_min_size(enum wmsg_type type,const struct bytebuf * msg,size_t min_size)1430 static int check_message_min_size(
1431 		enum wmsg_type type, const struct bytebuf *msg, size_t min_size)
1432 {
1433 	if (msg->size < min_size) {
1434 		wp_error("Message size for %s is smaller than expected (%zu bytes vs %zu bytes)",
1435 				wmsg_type_to_str(type), msg->size, min_size);
1436 		return ERR_FATAL;
1437 	}
1438 
1439 	return 0;
1440 }
1441 
check_sfd_type_2(struct shadow_fd * sfd,int remote_id,enum wmsg_type mtype,enum fdcat ftype1,enum fdcat ftype2)1442 static int check_sfd_type_2(struct shadow_fd *sfd, int remote_id,
1443 		enum wmsg_type mtype, enum fdcat ftype1, enum fdcat ftype2)
1444 {
1445 	if (!sfd) {
1446 		wp_error("shadow structure for RID=%d was not available",
1447 				remote_id);
1448 		return ERR_FATAL;
1449 	}
1450 	if (sfd->type != ftype1 && sfd->type != ftype2) {
1451 		wp_error("Trying to apply %s to RID=%d which has incompatible type=%s",
1452 				wmsg_type_to_str(mtype), remote_id,
1453 				fdcat_to_str(sfd->type));
1454 		return ERR_FATAL;
1455 	}
1456 	return 0;
1457 }
check_sfd_type(struct shadow_fd * sfd,int remote_id,enum wmsg_type mtype,enum fdcat ftype)1458 static int check_sfd_type(struct shadow_fd *sfd, int remote_id,
1459 		enum wmsg_type mtype, enum fdcat ftype)
1460 {
1461 	return check_sfd_type_2(sfd, remote_id, mtype, ftype, ftype);
1462 }
1463 
apply_update(struct fd_translation_map * map,struct thread_pool * threads,struct render_data * render,enum wmsg_type type,int remote_id,const struct bytebuf * msg)1464 int apply_update(struct fd_translation_map *map, struct thread_pool *threads,
1465 		struct render_data *render, enum wmsg_type type, int remote_id,
1466 		const struct bytebuf *msg)
1467 {
1468 	struct shadow_fd *sfd = get_shadow_for_rid(map, remote_id);
1469 	int ret = 0;
1470 	switch (type) {
1471 	default:
1472 	case WMSG_RESTART:
1473 	case WMSG_CLOSE:
1474 	case WMSG_ACK_NBLOCKS:
1475 	case WMSG_INJECT_RIDS:
1476 	case WMSG_PROTOCOL: {
1477 		if (wmsg_type_is_known(type)) {
1478 			wp_error("Unexpected update type: %s",
1479 					wmsg_type_to_str(type));
1480 		} else {
1481 			wp_error("Unidentified update type, number %u. "
1482 				 "This may be caused by the Waypipe instances "
1483 				 "on different sides of the connection having "
1484 				 "incompatible versions or options.",
1485 					(unsigned)type);
1486 		}
1487 		return ERR_FATAL;
1488 	}
1489 	/* SFD creation messages */
1490 	case WMSG_OPEN_FILE: {
1491 		if ((ret = check_message_min_size(type, msg,
1492 				     sizeof(struct wmsg_open_file))) < 0) {
1493 			return ret;
1494 		}
1495 
1496 		if ((ret = open_sfd(map, &sfd, remote_id)) < 0) {
1497 			return ret;
1498 		}
1499 
1500 		const struct wmsg_open_file header =
1501 				*(const struct wmsg_open_file *)msg->data;
1502 
1503 		sfd->type = FDC_FILE;
1504 		sfd->mem_local = NULL;
1505 		sfd->buffer_size = header.file_size;
1506 		sfd->remote_bufsize = sfd->buffer_size;
1507 		size_t alignment = 1u << threads->diff_alignment_bits;
1508 		sfd->mem_mirror = zeroed_aligned_alloc(
1509 				alignz(sfd->buffer_size, alignment), alignment,
1510 				&sfd->mem_mirror_handle);
1511 		if (!sfd->mem_mirror) {
1512 			wp_error("Failed to allocate mirror");
1513 			return 0;
1514 		}
1515 
1516 		sfd->fd_local = create_anon_file();
1517 		if (sfd->fd_local == -1) {
1518 			wp_error("Failed to create anon file for object %d: %s",
1519 					sfd->remote_id, strerror(errno));
1520 			return 0;
1521 		}
1522 		/* ftruncate zero initializes the file by default, matching
1523 		 * the zeroed mem_mirror buffer */
1524 		if (ftruncate(sfd->fd_local, (off_t)sfd->buffer_size) == -1) {
1525 			wp_error("Failed to resize anon file to size %zu for reason: %s",
1526 					sfd->buffer_size, strerror(errno));
1527 			return 0;
1528 		}
1529 		sfd->mem_local = mmap(NULL, sfd->buffer_size,
1530 				PROT_READ | PROT_WRITE, MAP_SHARED,
1531 				sfd->fd_local, 0);
1532 		if (sfd->mem_local == MAP_FAILED) {
1533 			wp_error("Failed to mmap newly created shm file for object %d: %s",
1534 					sfd->remote_id, strerror(errno));
1535 			sfd->mem_local = NULL;
1536 			return 0;
1537 		}
1538 
1539 		return 0;
1540 	}
1541 	case WMSG_OPEN_DMABUF: {
1542 		if ((ret = check_message_min_size(type, msg,
1543 				     sizeof(struct wmsg_open_dmabuf) +
1544 						     sizeof(struct dmabuf_slice_data))) <
1545 				0) {
1546 			return ret;
1547 		}
1548 
1549 		if ((ret = open_sfd(map, &sfd, remote_id)) < 0) {
1550 			return ret;
1551 		}
1552 
1553 		sfd->type = FDC_DMABUF;
1554 		const struct wmsg_open_dmabuf header =
1555 				*(const struct wmsg_open_dmabuf *)msg->data;
1556 		sfd->buffer_size = header.file_size;
1557 
1558 		memcpy(&sfd->dmabuf_info,
1559 				msg->data + sizeof(struct wmsg_open_dmabuf),
1560 				sizeof(struct dmabuf_slice_data));
1561 		size_t alignment = 1u << threads->diff_alignment_bits;
1562 		sfd->mem_mirror = zeroed_aligned_alloc(
1563 				alignz(sfd->buffer_size, alignment), alignment,
1564 				&sfd->mem_mirror_handle);
1565 		if (!sfd->mem_mirror) {
1566 			wp_error("Failed to allocate mirror");
1567 			return 0;
1568 		}
1569 
1570 		wp_debug("Creating remote DMAbuf of %d bytes",
1571 				(int)sfd->buffer_size);
1572 		// Create mirror from first transfer
1573 		// The file can only actually be created when we know
1574 		// what type it is?
1575 		if (init_render_data(render) == -1) {
1576 			sfd->fd_local = -1;
1577 			return 0;
1578 		}
1579 
1580 		sfd->dmabuf_bo = make_dmabuf(
1581 				render, sfd->buffer_size, &sfd->dmabuf_info);
1582 		if (!sfd->dmabuf_bo) {
1583 			sfd->fd_local = -1;
1584 			return 0;
1585 		}
1586 		sfd->fd_local = export_dmabuf(sfd->dmabuf_bo);
1587 
1588 		return 0;
1589 	}
1590 	case WMSG_OPEN_DMAVID_DST:
1591 	case WMSG_OPEN_DMAVID_DST_V2: {
1592 		const size_t min_msg_size =
1593 				sizeof(struct dmabuf_slice_data) +
1594 				((type == WMSG_OPEN_DMAVID_DST_V2)
1595 								? sizeof(struct wmsg_open_dmavid)
1596 								: sizeof(struct wmsg_open_dmabuf));
1597 		if ((ret = check_message_min_size(type, msg, min_msg_size)) <
1598 				0) {
1599 			return ret;
1600 		}
1601 
1602 		if ((ret = open_sfd(map, &sfd, remote_id)) < 0) {
1603 			return ret;
1604 		}
1605 
1606 		/* remote read data, this side writes data */
1607 		sfd->type = FDC_DMAVID_IW;
1608 		if (type == WMSG_OPEN_DMAVID_DST) {
1609 			const struct wmsg_open_dmabuf header =
1610 					*(const struct wmsg_open_dmabuf *)
1611 							 msg->data;
1612 			sfd->buffer_size = header.file_size;
1613 
1614 			memcpy(&sfd->dmabuf_info,
1615 					msg->data + sizeof(struct wmsg_open_dmabuf),
1616 					sizeof(struct dmabuf_slice_data));
1617 			sfd->video_fmt = VIDEO_H264;
1618 		} else {
1619 			const struct wmsg_open_dmavid header =
1620 					*(const struct wmsg_open_dmavid *)
1621 							 msg->data;
1622 			sfd->buffer_size = header.file_size;
1623 
1624 			memcpy(&sfd->dmabuf_info,
1625 					msg->data + sizeof(struct wmsg_open_dmavid),
1626 					sizeof(struct dmabuf_slice_data));
1627 			if ((header.vid_flags & 0xff) == DMAVID_H264) {
1628 				sfd->video_fmt = VIDEO_H264;
1629 			} else if ((header.vid_flags & 0xff) == DMAVID_VP9) {
1630 				sfd->video_fmt = VIDEO_VP9;
1631 			} else {
1632 				sfd->video_fmt = VIDEO_H264;
1633 				wp_error("Unidentified video format for RID=%d",
1634 						sfd->remote_id);
1635 				return 0;
1636 			}
1637 		}
1638 
1639 		if (init_render_data(render) == -1) {
1640 			sfd->fd_local = -1;
1641 			return 0;
1642 		}
1643 		sfd->dmabuf_bo = make_dmabuf(
1644 				render, sfd->buffer_size, &sfd->dmabuf_info);
1645 		if (!sfd->dmabuf_bo) {
1646 			wp_error("FDC_DMAVID_IW: RID=%d make_dmabuf failure, sz=%d (%d)",
1647 					sfd->remote_id, (int)sfd->buffer_size,
1648 					sizeof(struct dmabuf_slice_data));
1649 			return 0;
1650 		}
1651 		sfd->fd_local = export_dmabuf(sfd->dmabuf_bo);
1652 
1653 		if (setup_video_decode(sfd, render) == -1) {
1654 			wp_error("Video decoding setup failed for RID=%d",
1655 					sfd->remote_id);
1656 		}
1657 		return 0;
1658 	}
1659 	case WMSG_OPEN_DMAVID_SRC:
1660 	case WMSG_OPEN_DMAVID_SRC_V2: {
1661 		const size_t min_msg_size =
1662 				sizeof(struct dmabuf_slice_data) +
1663 				((type == WMSG_OPEN_DMAVID_SRC_V2)
1664 								? sizeof(struct wmsg_open_dmavid)
1665 								: sizeof(struct wmsg_open_dmabuf));
1666 		if ((ret = check_message_min_size(type, msg, min_msg_size)) <
1667 				0) {
1668 			return ret;
1669 		}
1670 		if ((ret = open_sfd(map, &sfd, remote_id)) < 0) {
1671 			return ret;
1672 		}
1673 
1674 		/* remote writes data, this side reads data */
1675 		sfd->type = FDC_DMAVID_IR;
1676 		// TODO: deduplicate this section with WMSG_OPEN_DMAVID_DST,
1677 		// or stop handling V1 and V2 in the same branch
1678 		if (type == WMSG_OPEN_DMAVID_SRC) {
1679 			const struct wmsg_open_dmabuf header =
1680 					*(const struct wmsg_open_dmabuf *)
1681 							 msg->data;
1682 			sfd->buffer_size = header.file_size;
1683 
1684 			memcpy(&sfd->dmabuf_info,
1685 					msg->data + sizeof(struct wmsg_open_dmabuf),
1686 					sizeof(struct dmabuf_slice_data));
1687 			sfd->video_fmt = VIDEO_H264;
1688 		} else {
1689 			const struct wmsg_open_dmavid header =
1690 					*(const struct wmsg_open_dmavid *)
1691 							 msg->data;
1692 			sfd->buffer_size = header.file_size;
1693 
1694 			memcpy(&sfd->dmabuf_info,
1695 					msg->data + sizeof(struct wmsg_open_dmavid),
1696 					sizeof(struct dmabuf_slice_data));
1697 			if ((header.vid_flags & 0xff) == DMAVID_H264) {
1698 				sfd->video_fmt = VIDEO_H264;
1699 			} else if ((header.vid_flags & 0xff) == DMAVID_VP9) {
1700 				sfd->video_fmt = VIDEO_VP9;
1701 			} else {
1702 				sfd->video_fmt = VIDEO_H264;
1703 				wp_error("Unidentified video format for RID=%d",
1704 						sfd->remote_id);
1705 				return 0;
1706 			}
1707 		}
1708 
1709 		if (init_render_data(render) == -1) {
1710 			sfd->fd_local = -1;
1711 			return 0;
1712 		}
1713 
1714 		sfd->dmabuf_bo = make_dmabuf(
1715 				render, sfd->buffer_size, &sfd->dmabuf_info);
1716 		if (!sfd->dmabuf_bo) {
1717 			wp_error("FDC_DMAVID_IR: RID=%d make_dmabuf failure",
1718 					sfd->remote_id);
1719 			return 0;
1720 		}
1721 		sfd->fd_local = export_dmabuf(sfd->dmabuf_bo);
1722 
1723 		if (setup_video_encode(sfd, render) == -1) {
1724 			wp_error("Video encoding setup failed for RID=%d",
1725 					sfd->remote_id);
1726 		}
1727 		return 0;
1728 	}
1729 	case WMSG_OPEN_RW_PIPE:
1730 	case WMSG_OPEN_IW_PIPE:
1731 	case WMSG_OPEN_IR_PIPE: {
1732 		if ((ret = open_sfd(map, &sfd, remote_id)) < 0) {
1733 			return ret;
1734 		}
1735 		sfd->type = FDC_PIPE;
1736 
1737 		int pipedes[2];
1738 		if (type == WMSG_OPEN_RW_PIPE) {
1739 			if (socketpair(AF_UNIX, SOCK_STREAM, 0, pipedes) ==
1740 					-1) {
1741 				wp_error("Failed to create a socketpair: %s",
1742 						strerror(errno));
1743 				return 0;
1744 			}
1745 		} else {
1746 			if (pipe(pipedes) == -1) {
1747 				wp_error("Failed to create a pipe: %s",
1748 						strerror(errno));
1749 				return 0;
1750 			}
1751 		}
1752 
1753 		/* We pass 'fd_local' to the client, although we only
1754 		 * read and write from pipe_fd if it exists. */
1755 		if (type == WMSG_OPEN_IR_PIPE) {
1756 			// Read end is 0; the other process writes
1757 			sfd->fd_local = pipedes[1];
1758 			sfd->pipe.fd = pipedes[0];
1759 			sfd->pipe.can_read = true;
1760 			sfd->pipe.remote_can_write = true;
1761 		} else if (type == WMSG_OPEN_IW_PIPE) {
1762 			// Write end is 1; the other process reads
1763 			sfd->fd_local = pipedes[0];
1764 			sfd->pipe.fd = pipedes[1];
1765 			sfd->pipe.can_write = true;
1766 			sfd->pipe.remote_can_read = true;
1767 		} else { // FDC_PIPE_RW
1768 			// Here, it doesn't matter which end is which
1769 			sfd->fd_local = pipedes[0];
1770 			sfd->pipe.fd = pipedes[1];
1771 			sfd->pipe.can_read = true;
1772 			sfd->pipe.can_write = true;
1773 			sfd->pipe.remote_can_read = true;
1774 			sfd->pipe.remote_can_write = true;
1775 		}
1776 
1777 		if (set_nonblocking(sfd->pipe.fd) == -1) {
1778 			wp_error("Failed to make private pipe end nonblocking: %s",
1779 					strerror(errno));
1780 			return 0;
1781 		}
1782 		return 0;
1783 	}
1784 	/* SFD update messages */
1785 	case WMSG_EXTEND_FILE: {
1786 		if ((ret = check_message_min_size(type, msg,
1787 				     sizeof(struct wmsg_open_file))) < 0) {
1788 			return ret;
1789 		}
1790 		if ((ret = check_sfd_type(sfd, remote_id, type, FDC_FILE)) <
1791 				0) {
1792 			return ret;
1793 		}
1794 
1795 		const struct wmsg_open_file *header =
1796 				(const struct wmsg_open_file *)msg->data;
1797 		if (header->file_size <= sfd->buffer_size) {
1798 			wp_error("File extend message for RID=%d does not increase size %u %z",
1799 					remote_id, header->file_size,
1800 					sfd->buffer_size);
1801 			return ERR_FATAL;
1802 		}
1803 
1804 		if (ftruncate(sfd->fd_local, (off_t)header->file_size) == -1) {
1805 			wp_error("Failed to resize file buffer: %s",
1806 					strerror(errno));
1807 			return 0;
1808 		}
1809 		increase_buffer_sizes(sfd, threads, (size_t)header->file_size);
1810 		// the extension implies the remote buffer is at least as large
1811 		sfd->remote_bufsize = sfd->buffer_size;
1812 		return 0;
1813 	}
1814 	case WMSG_BUFFER_FILL: {
1815 		if ((ret = check_message_min_size(type, msg,
1816 				     sizeof(struct wmsg_buffer_fill))) < 0) {
1817 			return ret;
1818 		}
1819 		if ((ret = check_sfd_type_2(sfd, remote_id, type, FDC_FILE,
1820 				     FDC_DMABUF)) < 0) {
1821 			return ret;
1822 		}
1823 		if (sfd->type == FDC_FILE && sfd->file_readonly) {
1824 			wp_debug("Ignoring a fill update to readonly file at RID=%d",
1825 					remote_id);
1826 			return 0;
1827 		}
1828 
1829 		const struct wmsg_buffer_fill *header =
1830 				(const struct wmsg_buffer_fill *)msg->data;
1831 
1832 		size_t uncomp_size = header->end - header->start;
1833 		struct thread_data *local = &threads->threads[0];
1834 		if (buf_ensure_size((int)uncomp_size, 1, &local->tmp_size,
1835 				    &local->tmp_buf) == -1) {
1836 			wp_error("Failed to expand temporary decompression buffer, dropping update");
1837 			return 0;
1838 		}
1839 
1840 		const char *act_buffer = NULL;
1841 		size_t act_size = 0;
1842 		uncompress_buffer(threads, &threads->threads[0].comp_ctx,
1843 				msg->size - sizeof(struct wmsg_buffer_fill),
1844 				msg->data + sizeof(struct wmsg_buffer_fill),
1845 				uncomp_size, local->tmp_buf, &act_size,
1846 				&act_buffer);
1847 
1848 		// `memsize+8*remote_nthreads` is the worst-case diff
1849 		// expansion
1850 		if (header->end > sfd->buffer_size) {
1851 			wp_error("Transfer end overflow %" PRIu32 " > %zu",
1852 					header->end, sfd->buffer_size);
1853 			return ERR_FATAL;
1854 		}
1855 		if (act_size != header->end - header->start) {
1856 			wp_error("Transfer size mismatch %zu %" PRIu32,
1857 					act_size, header->end - header->start);
1858 			return ERR_FATAL;
1859 		}
1860 		memcpy(sfd->mem_mirror + header->start, act_buffer,
1861 				header->end - header->start);
1862 
1863 		void *handle = NULL;
1864 		bool already_mapped = sfd->mem_local != NULL;
1865 		if (sfd->type == FDC_DMABUF && !already_mapped) {
1866 			uint32_t stride = 0, height = 0;
1867 			sfd->mem_local = map_dmabuf(sfd->dmabuf_bo, true,
1868 					&handle, &stride, &height);
1869 			if (stride * height < sfd->buffer_size) {
1870 				wp_error("DMABUF mapped with stride %" PRIu32
1871 					 " height %" PRIu32
1872 					 ", but expected ize=%zu>%zu, ignoring overlarge update",
1873 						stride, height,
1874 						sfd->buffer_size,
1875 						(size_t)(stride * height));
1876 				unmap_dmabuf(sfd->dmabuf_bo, handle);
1877 				return 0;
1878 			}
1879 		}
1880 		if (!sfd->mem_local) {
1881 			wp_error("Failed to fill RID=%d, fd not mapped",
1882 					sfd->remote_id);
1883 			return 0;
1884 		}
1885 		memcpy(sfd->mem_local + header->start,
1886 				sfd->mem_mirror + header->start,
1887 				header->end - header->start);
1888 
1889 		if (sfd->type == FDC_DMABUF && !already_mapped) {
1890 			sfd->mem_local = NULL;
1891 			if (unmap_dmabuf(sfd->dmabuf_bo, handle) == -1) {
1892 				return 0;
1893 			}
1894 		}
1895 		return 0;
1896 	}
1897 	case WMSG_BUFFER_DIFF: {
1898 		if ((ret = check_message_min_size(type, msg,
1899 				     sizeof(struct wmsg_buffer_diff))) < 0) {
1900 			return ret;
1901 		}
1902 		if ((ret = check_sfd_type_2(sfd, remote_id, type, FDC_FILE,
1903 				     FDC_DMABUF)) < 0) {
1904 			return ret;
1905 		}
1906 		if (sfd->type == FDC_FILE && sfd->file_readonly) {
1907 			wp_debug("Ignoring a diff update to readonly file at RID=%d",
1908 					remote_id);
1909 			return 0;
1910 		}
1911 		const struct wmsg_buffer_diff *header =
1912 				(const struct wmsg_buffer_diff *)msg->data;
1913 
1914 		struct thread_data *local = &threads->threads[0];
1915 		if (buf_ensure_size((int)(header->diff_size +
1916 						    header->ntrailing),
1917 				    1, &local->tmp_size,
1918 				    &local->tmp_buf) == -1) {
1919 			wp_error("Failed to expand temporary decompression buffer, dropping update");
1920 			return 0;
1921 		}
1922 
1923 		const char *act_buffer = NULL;
1924 		size_t act_size = 0;
1925 		uncompress_buffer(threads, &threads->threads[0].comp_ctx,
1926 				msg->size - sizeof(struct wmsg_buffer_diff),
1927 				msg->data + sizeof(struct wmsg_buffer_diff),
1928 				header->diff_size + header->ntrailing,
1929 				local->tmp_buf, &act_size, &act_buffer);
1930 
1931 		// `memsize+8*remote_nthreads` is the worst-case diff
1932 		// expansion
1933 		if (act_size != header->diff_size + header->ntrailing) {
1934 			wp_error("Transfer size mismatch %zu %u", act_size,
1935 					header->diff_size + header->ntrailing);
1936 			return ERR_FATAL;
1937 		}
1938 
1939 		void *handle = NULL;
1940 		bool already_mapped = sfd->mem_local != NULL;
1941 		if (sfd->type == FDC_DMABUF && !already_mapped) {
1942 			uint32_t stride = 0, height = 0;
1943 			sfd->mem_local = map_dmabuf(sfd->dmabuf_bo, true,
1944 					&handle, &stride, &height);
1945 			if (stride * height < sfd->buffer_size) {
1946 				wp_error("DMABUF mapped with stride %" PRIu32
1947 					 " height %" PRIu32
1948 					 ", but expected size=%zu>%zu, ignoring overlarge update",
1949 						stride, height,
1950 						sfd->buffer_size,
1951 						(size_t)(stride * height));
1952 				unmap_dmabuf(sfd->dmabuf_bo, handle);
1953 				return 0;
1954 			}
1955 		}
1956 		if (!sfd->mem_local) {
1957 			wp_error("Failed to apply diff to RID=%d, fd not mapped",
1958 					sfd->remote_id);
1959 			return 0;
1960 		}
1961 
1962 		DTRACE_PROBE2(waypipe, apply_diff_enter, sfd->buffer_size,
1963 				header->diff_size);
1964 		apply_diff(sfd->buffer_size, sfd->mem_mirror, sfd->mem_local,
1965 				header->diff_size, header->ntrailing,
1966 				act_buffer);
1967 		DTRACE_PROBE(waypipe, apply_diff_exit);
1968 
1969 		if (sfd->type == FDC_DMABUF && !already_mapped) {
1970 			sfd->mem_local = NULL;
1971 			if (unmap_dmabuf(sfd->dmabuf_bo, handle) == -1) {
1972 				return 0;
1973 			}
1974 		}
1975 
1976 		return 0;
1977 	}
1978 	case WMSG_PIPE_TRANSFER: {
1979 		if ((ret = check_sfd_type(sfd, remote_id, type, FDC_PIPE)) <
1980 				0) {
1981 			return ret;
1982 		}
1983 		if (!sfd->pipe.can_write || sfd->pipe.pending_w_shutdown) {
1984 			wp_debug("Discarding transfer to pipe RID=%d, because pipe cannot be written to",
1985 					remote_id);
1986 			return 0;
1987 		}
1988 
1989 		size_t transf_data_sz = msg->size - sizeof(struct wmsg_basic);
1990 
1991 		int netsize = sfd->pipe.send.used + (int)transf_data_sz;
1992 		if (buf_ensure_size(netsize, 1, &sfd->pipe.send.size,
1993 				    (void **)&sfd->pipe.send.data) == -1) {
1994 			wp_error("Failed to expand pipe transfer buffer, dropping data");
1995 			return 0;
1996 		}
1997 
1998 		memcpy(sfd->pipe.send.data + sfd->pipe.send.used,
1999 				msg->data + sizeof(struct wmsg_basic),
2000 				transf_data_sz);
2001 		sfd->pipe.send.used = netsize;
2002 
2003 		// The pipe itself will be flushed/or closed later by
2004 		// flush_writable_pipes
2005 		sfd->pipe.writable = true;
2006 		return 0;
2007 	}
2008 	case WMSG_PIPE_SHUTDOWN_R: {
2009 		if ((ret = check_sfd_type(sfd, remote_id, type, FDC_PIPE)) <
2010 				0) {
2011 			return ret;
2012 		}
2013 		sfd->pipe.remote_can_write = false;
2014 		if (!sfd->pipe.can_read) {
2015 			wp_debug("Discarding read shutdown to pipe RID=%d, which cannot read",
2016 					remote_id);
2017 			return 0;
2018 		}
2019 		pipe_close_read(sfd);
2020 		return 0;
2021 	}
2022 	case WMSG_PIPE_SHUTDOWN_W: {
2023 		if ((ret = check_sfd_type(sfd, remote_id, type, FDC_PIPE)) <
2024 				0) {
2025 			return ret;
2026 		}
2027 		sfd->pipe.remote_can_read = false;
2028 		if (!sfd->pipe.can_write) {
2029 			wp_debug("Discarding write shutdown to pipe RID=%d, which cannot write",
2030 					remote_id);
2031 			return 0;
2032 		}
2033 		if (sfd->pipe.send.used <= 0) {
2034 			pipe_close_write(sfd);
2035 		} else {
2036 			/* Shutdown as soon as the current data has been written
2037 			 */
2038 			sfd->pipe.pending_w_shutdown = true;
2039 		}
2040 
2041 		return 0;
2042 	}
2043 	case WMSG_SEND_DMAVID_PACKET: {
2044 		if ((ret = check_sfd_type(sfd, remote_id, type,
2045 				     FDC_DMAVID_IW)) < 0) {
2046 			return ret;
2047 		}
2048 		if (!sfd->dmabuf_bo) {
2049 			wp_error("Applying update to nonexistent dma buffer object rid=%d",
2050 					sfd->remote_id);
2051 			return 0;
2052 		}
2053 		struct bytebuf data = {
2054 				.data = msg->data + sizeof(struct wmsg_basic),
2055 				.size = msg->size - sizeof(struct wmsg_basic)};
2056 		apply_video_packet(sfd, render, &data);
2057 		return 0;
2058 	}
2059 	};
2060 	/* all returns should happen inside switch, so none here */
2061 }
2062 
shadow_decref_protocol(struct shadow_fd * sfd)2063 bool shadow_decref_protocol(struct shadow_fd *sfd)
2064 {
2065 	sfd->refcount.protocol--;
2066 	return destroy_shadow_if_unreferenced(sfd);
2067 }
2068 
shadow_decref_transfer(struct shadow_fd * sfd)2069 bool shadow_decref_transfer(struct shadow_fd *sfd)
2070 {
2071 	sfd->refcount.transfer--;
2072 	if (sfd->refcount.transfer == 0 && sfd->type == FDC_PIPE) {
2073 		/* fd_local has been transferred for the last time, so close
2074 		 * it and make it match pipe.fd, just as on the side where
2075 		 * the original pipe was introduced */
2076 		if (sfd->pipe.fd != sfd->fd_local) {
2077 			checked_close(sfd->fd_local);
2078 			sfd->fd_local = sfd->pipe.fd;
2079 		}
2080 	}
2081 	return destroy_shadow_if_unreferenced(sfd);
2082 }
shadow_incref_protocol(struct shadow_fd * sfd)2083 struct shadow_fd *shadow_incref_protocol(struct shadow_fd *sfd)
2084 {
2085 	sfd->has_owner = true;
2086 	sfd->refcount.protocol++;
2087 	return sfd;
2088 }
shadow_incref_transfer(struct shadow_fd * sfd)2089 struct shadow_fd *shadow_incref_transfer(struct shadow_fd *sfd)
2090 {
2091 	sfd->has_owner = true;
2092 	if (sfd->type == FDC_PIPE && sfd->refcount.transfer == 0) {
2093 		wp_error("The other pipe end may have been closed");
2094 	}
2095 	sfd->refcount.transfer++;
2096 	return sfd;
2097 }
decref_transferred_fds(struct fd_translation_map * map,int nfds,int fds[])2098 void decref_transferred_fds(struct fd_translation_map *map, int nfds, int fds[])
2099 {
2100 	for (int i = 0; i < nfds; i++) {
2101 		struct shadow_fd *sfd = get_shadow_for_local_fd(map, fds[i]);
2102 		shadow_decref_transfer(sfd);
2103 	}
2104 }
decref_transferred_rids(struct fd_translation_map * map,int nids,int ids[])2105 void decref_transferred_rids(
2106 		struct fd_translation_map *map, int nids, int ids[])
2107 {
2108 	for (int i = 0; i < nids; i++) {
2109 		struct shadow_fd *sfd = get_shadow_for_rid(map, ids[i]);
2110 		shadow_decref_transfer(sfd);
2111 	}
2112 }
2113 
count_npipes(const struct fd_translation_map * map)2114 int count_npipes(const struct fd_translation_map *map)
2115 {
2116 	int np = 0;
2117 	for (struct shadow_fd_link *lcur = map->link.l_next,
2118 				   *lnxt = lcur->l_next;
2119 			lcur != &map->link; lcur = lnxt, lnxt = lcur->l_next) {
2120 		struct shadow_fd *cur = (struct shadow_fd *)lcur;
2121 		if (cur->type == FDC_PIPE) {
2122 			np++;
2123 		}
2124 	}
2125 	return np;
2126 }
fill_with_pipes(const struct fd_translation_map * map,struct pollfd * pfds,bool check_read)2127 int fill_with_pipes(const struct fd_translation_map *map, struct pollfd *pfds,
2128 		bool check_read)
2129 {
2130 	int np = 0;
2131 	for (struct shadow_fd_link *lcur = map->link.l_next,
2132 				   *lnxt = lcur->l_next;
2133 			lcur != &map->link; lcur = lnxt, lnxt = lcur->l_next) {
2134 		struct shadow_fd *cur = (struct shadow_fd *)lcur;
2135 		if (cur->type == FDC_PIPE && cur->pipe.fd != -1) {
2136 			pfds[np].fd = cur->pipe.fd;
2137 			pfds[np].events = 0;
2138 			if (check_read && cur->pipe.readable) {
2139 				pfds[np].events |= POLLIN;
2140 			}
2141 			if (cur->pipe.send.used > 0) {
2142 				pfds[np].events |= POLLOUT;
2143 			}
2144 			np++;
2145 		}
2146 	}
2147 	return np;
2148 }
2149 
get_shadow_for_pipe_fd(struct fd_translation_map * map,int pipefd)2150 static struct shadow_fd *get_shadow_for_pipe_fd(
2151 		struct fd_translation_map *map, int pipefd)
2152 {
2153 	for (struct shadow_fd_link *lcur = map->link.l_next,
2154 				   *lnxt = lcur->l_next;
2155 			lcur != &map->link; lcur = lnxt, lnxt = lcur->l_next) {
2156 		struct shadow_fd *cur = (struct shadow_fd *)lcur;
2157 		if (cur->type == FDC_PIPE && cur->pipe.fd == pipefd) {
2158 			return cur;
2159 		}
2160 	}
2161 	return NULL;
2162 }
2163 
mark_pipe_object_statuses(struct fd_translation_map * map,int nfds,struct pollfd * pfds)2164 void mark_pipe_object_statuses(
2165 		struct fd_translation_map *map, int nfds, struct pollfd *pfds)
2166 {
2167 	for (int i = 0; i < nfds; i++) {
2168 		int lfd = pfds[i].fd;
2169 		struct shadow_fd *sfd = get_shadow_for_pipe_fd(map, lfd);
2170 		if (!sfd) {
2171 			wp_error("Failed to find shadow struct for .pipe_fd=%d",
2172 					lfd);
2173 			continue;
2174 		}
2175 		if (pfds[i].revents & POLLIN || pfds[i].revents & POLLHUP) {
2176 			/* In */
2177 			sfd->pipe.readable = true;
2178 		}
2179 		if (pfds[i].revents & POLLOUT) {
2180 			sfd->pipe.writable = true;
2181 		}
2182 	}
2183 }
2184 
flush_writable_pipes(struct fd_translation_map * map)2185 void flush_writable_pipes(struct fd_translation_map *map)
2186 {
2187 	for (struct shadow_fd_link *lcur = map->link.l_next,
2188 				   *lnxt = lcur->l_next;
2189 			lcur != &map->link; lcur = lnxt, lnxt = lcur->l_next) {
2190 		struct shadow_fd *sfd = (struct shadow_fd *)lcur;
2191 		if (sfd->type != FDC_PIPE || !sfd->pipe.writable ||
2192 				sfd->pipe.send.used <= 0) {
2193 			continue;
2194 		}
2195 
2196 		sfd->pipe.writable = false;
2197 		wp_debug("Flushing %zd bytes into RID=%d", sfd->pipe.send.used,
2198 				sfd->remote_id);
2199 		ssize_t changed = write(sfd->pipe.fd, sfd->pipe.send.data,
2200 				(size_t)sfd->pipe.send.used);
2201 
2202 		if (changed == -1 &&
2203 				(errno == EAGAIN || errno == EWOULDBLOCK)) {
2204 			wp_debug("Writing to pipe RID=%d would block",
2205 					sfd->remote_id);
2206 			continue;
2207 		} else if (changed == -1 &&
2208 				(errno == EPIPE || errno == EBADF)) {
2209 			/* No process has access to the other end of the pipe,
2210 			 * or the file descriptor is otherwise permanently
2211 			 * unwriteable */
2212 			pipe_close_write(sfd);
2213 		} else if (changed == -1) {
2214 			wp_error("Failed to write into pipe with remote_id=%d: %s",
2215 					sfd->remote_id, strerror(errno));
2216 		} else {
2217 			wp_debug("Wrote %zd more bytes into pipe RID=%d",
2218 					changed, sfd->remote_id);
2219 			sfd->pipe.send.used -= (int)changed;
2220 			if (sfd->pipe.send.used > 0) {
2221 				memmove(sfd->pipe.send.data,
2222 						sfd->pipe.send.data + changed,
2223 						(size_t)sfd->pipe.send.used);
2224 			}
2225 			if (sfd->pipe.send.used <= 0 &&
2226 					sfd->pipe.pending_w_shutdown) {
2227 				/* A shutdown request was made, but can only be
2228 				 * applied now that the write buffer has been
2229 				 * cleared */
2230 				pipe_close_write(sfd);
2231 				sfd->pipe.pending_w_shutdown = false;
2232 			}
2233 		}
2234 	}
2235 	/* Destroy any new unreferenced objects */
2236 	for (struct shadow_fd_link *lcur = map->link.l_next,
2237 				   *lnxt = lcur->l_next;
2238 			lcur != &map->link; lcur = lnxt, lnxt = lcur->l_next) {
2239 		struct shadow_fd *cur = (struct shadow_fd *)lcur;
2240 		destroy_shadow_if_unreferenced(cur);
2241 	}
2242 }
read_readable_pipes(struct fd_translation_map * map)2243 void read_readable_pipes(struct fd_translation_map *map)
2244 {
2245 	for (struct shadow_fd_link *lcur = map->link.l_next,
2246 				   *lnxt = lcur->l_next;
2247 			lcur != &map->link; lcur = lnxt, lnxt = lcur->l_next) {
2248 		struct shadow_fd *sfd = (struct shadow_fd *)lcur;
2249 		if (sfd->type != FDC_PIPE || !sfd->pipe.readable) {
2250 			continue;
2251 		}
2252 
2253 		if (sfd->pipe.recv.size == 0) {
2254 			sfd->pipe.recv.size = 32768;
2255 			sfd->pipe.recv.data =
2256 					malloc((size_t)sfd->pipe.recv.size);
2257 		}
2258 		if (sfd->pipe.recv.size > sfd->pipe.recv.used) {
2259 			sfd->pipe.readable = false;
2260 			ssize_t changed = read(sfd->pipe.fd,
2261 					sfd->pipe.recv.data +
2262 							sfd->pipe.recv.used,
2263 					(size_t)(sfd->pipe.recv.size -
2264 							sfd->pipe.recv.used));
2265 			if (changed == 0) {
2266 				/* No process has access to the other end of the
2267 				 * pipe */
2268 				pipe_close_read(sfd);
2269 			} else if (changed == -1 &&
2270 					(errno == EAGAIN ||
2271 							errno == EWOULDBLOCK)) {
2272 				wp_debug("Reading from pipe RID=%d would block",
2273 						sfd->remote_id);
2274 				continue;
2275 			} else if (changed == -1) {
2276 				wp_error("Failed to read from pipe with remote_id=%d: %s",
2277 						sfd->remote_id,
2278 						strerror(errno));
2279 			} else {
2280 				wp_debug("Read %zd more bytes from pipe RID=%d",
2281 						changed, sfd->remote_id);
2282 				sfd->pipe.recv.used += (int)changed;
2283 			}
2284 		}
2285 	}
2286 
2287 	/* Destroy any new unreferenced objects */
2288 	for (struct shadow_fd_link *lcur = map->link.l_next,
2289 				   *lnxt = lcur->l_next;
2290 			lcur != &map->link; lcur = lnxt, lnxt = lcur->l_next) {
2291 		struct shadow_fd *cur = (struct shadow_fd *)lcur;
2292 		destroy_shadow_if_unreferenced(cur);
2293 	}
2294 }
2295 
extend_shm_shadow(struct fd_translation_map * map,struct thread_pool * threads,struct shadow_fd * sfd,size_t new_size)2296 void extend_shm_shadow(struct fd_translation_map *map,
2297 		struct thread_pool *threads, struct shadow_fd *sfd,
2298 		size_t new_size)
2299 {
2300 	if (sfd->buffer_size >= new_size) {
2301 		return;
2302 	}
2303 
2304 	// Verify that the file size actually increased
2305 	struct stat st;
2306 	int fs = fstat(sfd->fd_local, &st);
2307 	if (fs == -1) {
2308 		wp_error("Checking file size failed: %s", strerror(errno));
2309 		return;
2310 	}
2311 	if ((size_t)st.st_size < new_size) {
2312 		wp_error("Trying to resize file larger (%d) than the actual file size (%d), ignoring",
2313 				(int)new_size, (int)st.st_size);
2314 		return;
2315 	}
2316 
2317 	increase_buffer_sizes(sfd, threads, new_size);
2318 	(void)map;
2319 
2320 	// leave `sfd->remote_bufsize` unchanged, and mark dirty
2321 	sfd->is_dirty = true;
2322 }
2323 
run_task(struct task_data * task,struct thread_data * local)2324 void run_task(struct task_data *task, struct thread_data *local)
2325 {
2326 	if (task->type == TASK_COMPRESS_BLOCK) {
2327 		worker_run_compress_block(task, local);
2328 	} else if (task->type == TASK_COMPRESS_DIFF) {
2329 		worker_run_compress_diff(task, local);
2330 	} else {
2331 		wp_error("Unidentified task type");
2332 	}
2333 }
2334 
start_parallel_work(struct thread_pool * pool,struct thread_msg_recv_buf * recv_queue)2335 int start_parallel_work(struct thread_pool *pool,
2336 		struct thread_msg_recv_buf *recv_queue)
2337 {
2338 	pthread_mutex_lock(&pool->work_mutex);
2339 	if (recv_queue->zone_start != recv_queue->zone_end) {
2340 		wp_error("Some async messages not yet sent");
2341 	}
2342 	recv_queue->zone_start = 0;
2343 	recv_queue->zone_end = 0;
2344 	int num_mt_tasks = pool->stack_count;
2345 	if (buf_ensure_size(num_mt_tasks, sizeof(struct iovec),
2346 			    &recv_queue->size,
2347 			    (void **)&recv_queue->data) == -1) {
2348 		wp_error("Failed to provide enough space for receive queue, skipping all work tasks");
2349 		num_mt_tasks = 0;
2350 	}
2351 	pool->do_work = num_mt_tasks > 0;
2352 
2353 	/* Start the work tasks here */
2354 	if (num_mt_tasks > 0) {
2355 		pthread_cond_broadcast(&pool->work_cond);
2356 	}
2357 	pthread_mutex_unlock(&pool->work_mutex);
2358 
2359 	return num_mt_tasks;
2360 }
2361 
request_work_task(struct thread_pool * pool,struct task_data * task,bool * is_done)2362 bool request_work_task(
2363 		struct thread_pool *pool, struct task_data *task, bool *is_done)
2364 {
2365 	pthread_mutex_lock(&pool->work_mutex);
2366 	*is_done = pool->stack_count == 0 && pool->tasks_in_progress == 0;
2367 	bool has_task = false;
2368 	if (pool->stack_count > 0 && pool->do_work) {
2369 		int i = pool->stack_count - 1;
2370 		if (pool->stack[i].type != TASK_STOP) {
2371 			*task = pool->stack[i];
2372 			has_task = true;
2373 			pool->stack_count--;
2374 			pool->tasks_in_progress++;
2375 			if (pool->stack_count <= 0) {
2376 				pool->do_work = false;
2377 			}
2378 		}
2379 	}
2380 	pthread_mutex_unlock(&pool->work_mutex);
2381 	return has_task;
2382 }
2383 
worker_thread_main(void * arg)2384 static void *worker_thread_main(void *arg)
2385 {
2386 	struct thread_data *data = arg;
2387 	struct thread_pool *pool = data->pool;
2388 
2389 	/* The loop is globally locked by default, and only unlocked in
2390 	 * pthread_cond_wait. Yes, there are fancier and faster schemes.
2391 	 */
2392 	pthread_mutex_lock(&pool->work_mutex);
2393 	while (1) {
2394 		while (!pool->do_work) {
2395 			pthread_cond_wait(&pool->work_cond, &pool->work_mutex);
2396 		}
2397 		if (pool->stack_count <= 0) {
2398 			pool->do_work = false;
2399 			continue;
2400 		}
2401 		/* Copy task, since the queue may be resized */
2402 		int i = pool->stack_count - 1;
2403 		struct task_data task = pool->stack[i];
2404 		if (task.type == TASK_STOP) {
2405 			break;
2406 		}
2407 		pool->tasks_in_progress++;
2408 		pool->stack_count--;
2409 		if (pool->stack_count <= 0) {
2410 			pool->do_work = false;
2411 		}
2412 		pthread_mutex_unlock(&pool->work_mutex);
2413 		run_task(&task, data);
2414 		pthread_mutex_lock(&pool->work_mutex);
2415 
2416 		uint8_t triv = 0;
2417 		pool->tasks_in_progress--;
2418 		if (write(pool->selfpipe_w, &triv, 1) == -1) {
2419 			wp_error("Failed to write to self-pipe");
2420 		}
2421 	}
2422 	pthread_mutex_unlock(&pool->work_mutex);
2423 
2424 	return NULL;
2425 }
2426