1 /*
2 * Copyright © 2019 Manuel Stoeckl
3 *
4 * Permission is hereby granted, free of charge, to any person obtaining
5 * a copy of this software and associated documentation files (the
6 * "Software"), to deal in the Software without restriction, including
7 * without limitation the rights to use, copy, modify, merge, publish,
8 * distribute, sublicense, and/or sell copies of the Software, and to
9 * permit persons to whom the Software is furnished to do so, subject to
10 * the following conditions:
11 *
12 * The above copyright notice and this permission notice (including the
13 * next paragraph) shall be included in all copies or substantial
14 * portions of the Software.
15 *
16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
17 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
18 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
19 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
20 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
21 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
22 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
23 * SOFTWARE.
24 */
25
26 #include "shadow.h"
27
28 #include <errno.h>
29 #include <fcntl.h>
30 #include <inttypes.h>
31 #include <poll.h>
32 #include <stdio.h>
33 #include <stdlib.h>
34 #include <string.h>
35 #include <sys/mman.h>
36 #include <sys/socket.h>
37 #include <sys/stat.h>
38 #include <unistd.h>
39
40 #ifdef HAS_LZ4
41 #include <lz4.h>
42 #include <lz4hc.h>
43 #endif
44 #ifdef HAS_ZSTD
45 #include <zstd.h>
46 #endif
47
get_shadow_for_local_fd(struct fd_translation_map * map,int lfd)48 struct shadow_fd *get_shadow_for_local_fd(
49 struct fd_translation_map *map, int lfd)
50 {
51 for (struct shadow_fd_link *lcur = map->link.l_next,
52 *lnxt = lcur->l_next;
53 lcur != &map->link; lcur = lnxt, lnxt = lcur->l_next) {
54 struct shadow_fd *cur = (struct shadow_fd *)lcur;
55 if (cur->fd_local == lfd) {
56 return cur;
57 }
58 }
59 return NULL;
60 }
get_shadow_for_rid(struct fd_translation_map * map,int rid)61 struct shadow_fd *get_shadow_for_rid(struct fd_translation_map *map, int rid)
62 {
63 for (struct shadow_fd_link *lcur = map->link.l_next,
64 *lnxt = lcur->l_next;
65 lcur != &map->link; lcur = lnxt, lnxt = lcur->l_next) {
66 struct shadow_fd *cur = (struct shadow_fd *)lcur;
67 if (cur->remote_id == rid) {
68 return cur;
69 }
70 }
71 return NULL;
72 }
destroy_unlinked_sfd(struct shadow_fd * sfd)73 static void destroy_unlinked_sfd(struct shadow_fd *sfd)
74 {
75 wp_debug("Destroying %s RID=%d", fdcat_to_str(sfd->type),
76 sfd->remote_id);
77 /* video must be cleaned up before any buffers that it may rely on */
78 destroy_video_data(sfd);
79
80 /* free all accumulated damage records */
81 reset_damage(&sfd->damage);
82 free(sfd->damage_task_interval_store);
83
84 if (sfd->type == FDC_FILE) {
85 munmap(sfd->mem_local, sfd->buffer_size);
86 zeroed_aligned_free(sfd->mem_mirror, &sfd->mem_mirror_handle);
87 } else if (sfd->type == FDC_DMABUF || sfd->type == FDC_DMAVID_IR ||
88 sfd->type == FDC_DMAVID_IW) {
89 if (sfd->dmabuf_map_handle) {
90 unmap_dmabuf(sfd->dmabuf_bo, sfd->dmabuf_map_handle);
91 }
92 destroy_dmabuf(sfd->dmabuf_bo);
93 zeroed_aligned_free(sfd->mem_mirror, &sfd->mem_mirror_handle);
94 } else if (sfd->type == FDC_PIPE) {
95 if (sfd->pipe.fd != sfd->fd_local && sfd->pipe.fd != -1) {
96 checked_close(sfd->pipe.fd);
97 }
98 free(sfd->pipe.recv.data);
99 free(sfd->pipe.send.data);
100 }
101 if (sfd->fd_local != -1) {
102 checked_close(sfd->fd_local);
103 }
104 free(sfd);
105 }
cleanup_thread_local(struct thread_data * data)106 static void cleanup_thread_local(struct thread_data *data)
107 {
108 #ifdef HAS_ZSTD
109 ZSTD_freeCCtx(data->comp_ctx.zstd_ccontext);
110 ZSTD_freeDCtx(data->comp_ctx.zstd_dcontext);
111 #endif
112 #ifdef HAS_LZ4
113 free(data->comp_ctx.lz4_extstate);
114 #endif
115 free(data->tmp_buf);
116 }
117
setup_thread_local(struct thread_data * data,enum compression_mode mode,int compression_level)118 static void setup_thread_local(struct thread_data *data,
119 enum compression_mode mode, int compression_level)
120 {
121 struct comp_ctx *ctx = &data->comp_ctx;
122 ctx->zstd_ccontext = NULL;
123 ctx->zstd_dcontext = NULL;
124 ctx->lz4_extstate = NULL;
125 #ifdef HAS_LZ4
126 if (mode == COMP_LZ4) {
127 /* Like LZ4Frame, integer codes indicate compression level.
128 * Negative numbers are acceleration, positive use the HC
129 * routines */
130 if (compression_level <= 0) {
131 ctx->lz4_extstate = malloc((size_t)LZ4_sizeofState());
132 } else {
133 ctx->lz4_extstate = malloc((size_t)LZ4_sizeofStateHC());
134 }
135 }
136 #endif
137 #ifdef HAS_ZSTD
138 if (mode == COMP_ZSTD) {
139 ctx->zstd_ccontext = ZSTD_createCCtx();
140 ctx->zstd_dcontext = ZSTD_createDCtx();
141 }
142 #endif
143 (void)mode;
144 (void)compression_level;
145
146 data->tmp_buf = NULL;
147 data->tmp_size = 0;
148 }
cleanup_translation_map(struct fd_translation_map * map)149 void cleanup_translation_map(struct fd_translation_map *map)
150 {
151 for (struct shadow_fd_link *lcur = map->link.l_next,
152 *lnxt = lcur->l_next;
153 lcur != &map->link; lcur = lnxt, lnxt = lcur->l_next) {
154 struct shadow_fd *cur = (struct shadow_fd *)lcur;
155 destroy_unlinked_sfd(cur);
156 }
157 map->link.l_next = &map->link;
158 map->link.l_prev = &map->link;
159 }
destroy_shadow_if_unreferenced(struct shadow_fd * sfd)160 bool destroy_shadow_if_unreferenced(struct shadow_fd *sfd)
161 {
162 bool autodelete = sfd->has_owner;
163 if (sfd->type == FDC_PIPE && !sfd->pipe.can_read &&
164 !sfd->pipe.can_write && !sfd->pipe.remote_can_read &&
165 !sfd->pipe.remote_can_write) {
166 autodelete = true;
167 }
168 if (sfd->refcount.protocol == 0 && sfd->refcount.transfer == 0 &&
169 sfd->refcount.compute == false && autodelete) {
170 /* remove shadowfd from list */
171 sfd->link.l_prev->l_next = sfd->link.l_next;
172 sfd->link.l_next->l_prev = sfd->link.l_prev;
173 sfd->link.l_next = NULL;
174 sfd->link.l_prev = NULL;
175
176 destroy_unlinked_sfd(sfd);
177 return true;
178 } else if (sfd->refcount.protocol < 0 || sfd->refcount.transfer < 0) {
179 wp_error("Negative refcount for rid=%d: %d protocol references, %d transfer references",
180 sfd->remote_id, sfd->refcount.protocol,
181 sfd->refcount.transfer);
182 }
183 return false;
184 }
185
186 static void *worker_thread_main(void *arg);
setup_translation_map(struct fd_translation_map * map,bool display_side)187 void setup_translation_map(struct fd_translation_map *map, bool display_side)
188 {
189 map->local_sign = display_side ? -1 : 1;
190 map->link.l_next = &map->link;
191 map->link.l_prev = &map->link;
192 map->max_local_id = 1;
193 }
194
shutdown_threads(struct thread_pool * pool)195 static void shutdown_threads(struct thread_pool *pool)
196 {
197 pthread_mutex_lock(&pool->work_mutex);
198 free(pool->stack);
199 struct task_data task;
200 memset(&task, 0, sizeof(task));
201 task.type = TASK_STOP;
202 pool->stack = &task;
203 pool->stack_count = 1;
204 pool->stack_size = 1;
205 pool->do_work = true;
206 pthread_cond_broadcast(&pool->work_cond);
207 pthread_mutex_unlock(&pool->work_mutex);
208
209 if (pool->threads) {
210 for (int i = 1; i < pool->nthreads; i++) {
211 if (pool->threads[i].thread) {
212 pthread_join(pool->threads[i].thread, NULL);
213 }
214 }
215 }
216 pool->stack = NULL;
217 }
218
setup_thread_pool(struct thread_pool * pool,enum compression_mode compression,int comp_level,int n_threads)219 int setup_thread_pool(struct thread_pool *pool,
220 enum compression_mode compression, int comp_level,
221 int n_threads)
222 {
223 memset(pool, 0, sizeof(struct thread_pool));
224
225 pool->diff_func = get_diff_function(
226 DIFF_FASTEST, &pool->diff_alignment_bits);
227
228 pool->compression = compression;
229 pool->compression_level = comp_level;
230 if (n_threads <= 0) {
231 // platform dependent
232 int nt = get_hardware_thread_count();
233 pool->nthreads = max(nt / 2, 1);
234 } else {
235 pool->nthreads = n_threads;
236 }
237 pool->stack_size = 0;
238 pool->stack_count = 0;
239 pool->stack = NULL;
240 pool->tasks_in_progress = 0;
241 pool->do_work = true;
242
243 /* Thread #0 is the 'main' thread */
244 pool->threads = calloc(
245 (size_t)pool->nthreads, sizeof(struct thread_data));
246 if (!pool->threads) {
247 wp_error("Failed to allocate list of thread data");
248 return -1;
249 }
250
251 int ret;
252 ret = pthread_mutex_init(&pool->work_mutex, NULL);
253 if (ret) {
254 wp_error("Mutex creation failed: %s", strerror(ret));
255 return -1;
256 }
257 ret = pthread_cond_init(&pool->work_cond, NULL);
258 if (ret) {
259 wp_error("Condition variable creation failed: %s",
260 strerror(ret));
261 return -1;
262 }
263
264 pool->threads[0].pool = pool;
265 pool->threads[0].thread = pthread_self();
266 for (int i = 1; i < pool->nthreads; i++) {
267 pool->threads[i].pool = pool;
268 ret = pthread_create(&pool->threads[i].thread, NULL,
269 worker_thread_main, &pool->threads[i]);
270 if (ret) {
271 wp_error("Thread creation failed: %s", strerror(ret));
272 // Stop making new threads, but keep what is there
273 pool->nthreads = i;
274 break;
275 }
276 }
277
278 /* Setup thread local data from the main thread, to avoid requiring
279 * the worker threads to allocate pools, for a few fixed buffers */
280 for (int i = 0; i < pool->nthreads; i++) {
281 setup_thread_local(&pool->threads[i], compression, comp_level);
282 }
283
284 int fds[2];
285 if (pipe(fds) == -1) {
286 wp_error("Failed to create pipe: %s", strerror(errno));
287 }
288 pool->selfpipe_r = fds[0];
289 pool->selfpipe_w = fds[1];
290 if (set_nonblocking(pool->selfpipe_r) == -1) {
291 wp_error("Failed to make read end of pipe nonblocking: %s",
292 strerror(errno));
293 }
294 return 0;
295 }
cleanup_thread_pool(struct thread_pool * pool)296 void cleanup_thread_pool(struct thread_pool *pool)
297 {
298 shutdown_threads(pool);
299 if (pool->threads) {
300 for (int i = 0; i < pool->nthreads; i++) {
301 cleanup_thread_local(&pool->threads[i]);
302 }
303 }
304
305 pthread_mutex_destroy(&pool->work_mutex);
306 pthread_cond_destroy(&pool->work_cond);
307 free(pool->threads);
308 free(pool->stack);
309
310 checked_close(pool->selfpipe_r);
311 checked_close(pool->selfpipe_w);
312 }
313
fdcat_to_str(enum fdcat cat)314 const char *fdcat_to_str(enum fdcat cat)
315 {
316 switch (cat) {
317 case FDC_UNKNOWN:
318 return "FDC_UNKNOWN";
319 case FDC_FILE:
320 return "FDC_FILE";
321 case FDC_PIPE:
322 return "FDC_PIPE";
323 case FDC_DMABUF:
324 return "FDC_DMABUF";
325 case FDC_DMAVID_IR:
326 return "FDC_DMAVID_IR";
327 case FDC_DMAVID_IW:
328 return "FDC_DMAVID_IW";
329 }
330 return "<invalid>";
331 }
332
compression_mode_to_str(enum compression_mode mode)333 const char *compression_mode_to_str(enum compression_mode mode)
334 {
335 switch (mode) {
336 case COMP_NONE:
337 return "NONE";
338 case COMP_LZ4:
339 return "LZ4";
340 case COMP_ZSTD:
341 return "ZSTD";
342 default:
343 return "<invalid>";
344 }
345 }
346
get_fd_type(int fd,size_t * size)347 enum fdcat get_fd_type(int fd, size_t *size)
348 {
349 struct stat fsdata;
350 memset(&fsdata, 0, sizeof(fsdata));
351 int ret = fstat(fd, &fsdata);
352 if (ret == -1) {
353 wp_error("The fd %d is not file-like: %s", fd, strerror(errno));
354 return FDC_UNKNOWN;
355 } else if (S_ISREG(fsdata.st_mode)) {
356 if (size) {
357 *size = (size_t)fsdata.st_size;
358 }
359 return FDC_FILE;
360 } else if (S_ISFIFO(fsdata.st_mode) || S_ISCHR(fsdata.st_mode) ||
361 S_ISSOCK(fsdata.st_mode)) {
362 if (S_ISCHR(fsdata.st_mode)) {
363 wp_error("The fd %d, size %" PRId64
364 ", mode %x is a character device. Proceeding under the assumption that it is pipe-like.",
365 fd, (int64_t)fsdata.st_size,
366 fsdata.st_mode);
367 }
368 if (S_ISSOCK(fsdata.st_mode)) {
369 wp_error("The fd %d, size %" PRId64
370 ", mode %x is a socket. Proceeding under the assumption that it is pipe-like.",
371 fd, (int64_t)fsdata.st_size,
372 fsdata.st_mode);
373 }
374 return FDC_PIPE;
375 } else if (is_dmabuf(fd) == 1) {
376 return FDC_DMABUF;
377 } else {
378 wp_error("The fd %d has an unusual mode %x (type=%x): blk=%d chr=%d dir=%d lnk=%d reg=%d fifo=%d sock=%d; expect an application crash!",
379 fd, fsdata.st_mode, fsdata.st_mode & S_IFMT,
380 S_ISBLK(fsdata.st_mode),
381 S_ISCHR(fsdata.st_mode),
382 S_ISDIR(fsdata.st_mode),
383 S_ISLNK(fsdata.st_mode),
384 S_ISREG(fsdata.st_mode),
385 S_ISFIFO(fsdata.st_mode),
386 S_ISSOCK(fsdata.st_mode), strerror(errno));
387 return FDC_UNKNOWN;
388 }
389 }
390
compress_bufsize(struct thread_pool * pool,size_t max_input)391 static size_t compress_bufsize(struct thread_pool *pool, size_t max_input)
392 {
393 switch (pool->compression) {
394 default:
395 case COMP_NONE:
396 (void)max_input;
397 return 0;
398 #ifdef HAS_LZ4
399 case COMP_LZ4:
400 /* This bound applies for both LZ4 and LZ4HC compressors */
401 return (size_t)LZ4_compressBound((int)max_input);
402 #endif
403 #ifdef HAS_ZSTD
404 case COMP_ZSTD:
405 return ZSTD_compressBound(max_input);
406 #endif
407 }
408 return 0;
409 }
410
411 /* With the selected compression method, compress the buffer
412 * {isize,ibuf}, possibly modifying {msize,mbuf}, and setting
413 * {wsize,wbuf} to indicate the result */
compress_buffer(struct thread_pool * pool,struct comp_ctx * ctx,size_t isize,const char * ibuf,size_t msize,char * mbuf,struct bytebuf * dst)414 static void compress_buffer(struct thread_pool *pool, struct comp_ctx *ctx,
415 size_t isize, const char *ibuf, size_t msize, char *mbuf,
416 struct bytebuf *dst)
417 {
418 (void)ctx;
419 // Ensure inputs always nontrivial
420 if (isize == 0) {
421 dst->size = 0;
422 dst->data = (char *)ibuf;
423 return;
424 }
425
426 DTRACE_PROBE1(waypipe, compress_buffer_enter, isize);
427 switch (pool->compression) {
428 default:
429 case COMP_NONE:
430 (void)msize;
431 (void)mbuf;
432 dst->size = isize;
433 dst->data = (char *)ibuf;
434 break;
435 #ifdef HAS_LZ4
436 case COMP_LZ4: {
437 int ws;
438 if (pool->compression_level <= 0) {
439 ws = LZ4_compress_fast_extState(ctx->lz4_extstate, ibuf,
440 mbuf, (int)isize, (int)msize,
441 -pool->compression_level);
442 } else {
443 ws = LZ4_compress_HC_extStateHC(ctx->lz4_extstate, ibuf,
444 mbuf, (int)isize, (int)msize,
445 pool->compression_level);
446 }
447 if (ws == 0) {
448 wp_error("LZ4 compression failed for %zu bytes in %zu of space",
449 isize, msize);
450 }
451 dst->size = (size_t)ws;
452 dst->data = (char *)mbuf;
453 break;
454 }
455 #endif
456 #ifdef HAS_ZSTD
457 case COMP_ZSTD: {
458 size_t ws = ZSTD_compressCCtx(ctx->zstd_ccontext, mbuf, msize,
459 ibuf, isize, pool->compression_level);
460 if (ZSTD_isError(ws)) {
461 wp_error("Zstd compression failed for %d bytes in %d of space: %s",
462 (int)isize, (int)msize,
463 ZSTD_getErrorName(ws));
464 }
465 dst->size = (size_t)ws;
466 dst->data = (char *)mbuf;
467 break;
468 }
469 #endif
470 }
471 DTRACE_PROBE1(waypipe, compress_buffer_exit, dst->size);
472 }
473 /* With the selected compression method, uncompress the buffer {isize,ibuf},
474 * to precisely msize bytes, setting {wsize,wbuf} to indicate the result.
475 * If the compression mode requires it. */
uncompress_buffer(struct thread_pool * pool,struct comp_ctx * ctx,size_t isize,const char * ibuf,size_t msize,char * mbuf,size_t * wsize,const char ** wbuf)476 static void uncompress_buffer(struct thread_pool *pool, struct comp_ctx *ctx,
477 size_t isize, const char *ibuf, size_t msize, char *mbuf,
478 size_t *wsize, const char **wbuf)
479 {
480 (void)ctx;
481 // Ensure inputs always nontrivial
482 if (isize == 0) {
483 *wsize = 0;
484 *wbuf = ibuf;
485 return;
486 }
487
488 DTRACE_PROBE1(waypipe, uncompress_buffer_enter, isize);
489 switch (pool->compression) {
490 default:
491 case COMP_NONE:
492 (void)mbuf;
493 (void)msize;
494 *wsize = isize;
495 *wbuf = ibuf;
496 break;
497 #ifdef HAS_LZ4
498 case COMP_LZ4: {
499 int ws = LZ4_decompress_safe(
500 ibuf, mbuf, (int)isize, (int)msize);
501 if (ws < 0 || (size_t)ws != msize) {
502 wp_error("Lz4 decompression failed for %d bytes to %d of space, used %d",
503 (int)isize, (int)msize, ws);
504 }
505 *wsize = (size_t)ws;
506 *wbuf = mbuf;
507 break;
508 }
509 #endif
510 #ifdef HAS_ZSTD
511 case COMP_ZSTD: {
512 size_t ws = ZSTD_decompressDCtx(
513 ctx->zstd_dcontext, mbuf, msize, ibuf, isize);
514 if (ZSTD_isError(ws) || (size_t)ws != msize) {
515 wp_error("Zstd decompression failed for %d bytes to %d of space: %s",
516 (int)isize, (int)msize,
517 ZSTD_getErrorName(ws));
518 ws = 0;
519 }
520 *wsize = ws;
521 *wbuf = mbuf;
522 break;
523 }
524 #endif
525 }
526 DTRACE_PROBE1(waypipe, uncompress_buffer_exit, *wsize);
527 }
528
translate_fd(struct fd_translation_map * map,struct render_data * render,int fd,enum fdcat type,size_t file_sz,const struct dmabuf_slice_data * info,bool read_modifier,bool force_pipe_iw)529 struct shadow_fd *translate_fd(struct fd_translation_map *map,
530 struct render_data *render, int fd, enum fdcat type,
531 size_t file_sz, const struct dmabuf_slice_data *info,
532 bool read_modifier, bool force_pipe_iw)
533 {
534 struct shadow_fd *sfd = get_shadow_for_local_fd(map, fd);
535 if (sfd) {
536 return sfd;
537 }
538 if (type == FDC_DMAVID_IR || type == FDC_DMAVID_IW) {
539 if (!info) {
540 wp_error("No dmabuf info provided");
541 return NULL;
542 }
543 }
544
545 // Create a new translation map.
546 sfd = calloc(1, sizeof(struct shadow_fd));
547 if (!sfd) {
548 wp_error("Failed to allocate shadow_fd structure");
549 return NULL;
550 }
551 sfd->link.l_prev = &map->link;
552 sfd->link.l_next = map->link.l_next;
553 sfd->link.l_prev->l_next = &sfd->link;
554 sfd->link.l_next->l_prev = &sfd->link;
555
556 sfd->fd_local = fd;
557 sfd->mem_local = NULL;
558 sfd->mem_mirror = NULL;
559 sfd->mem_mirror_handle = NULL;
560 sfd->buffer_size = 0;
561 sfd->remote_id = (map->max_local_id++) * map->local_sign;
562 sfd->type = type;
563 // File changes must be propagated
564 sfd->is_dirty = true;
565 /* files/dmabufs are damaged by default; shm_pools are explicitly
566 * undamaged in handlers.c */
567 damage_everything(&sfd->damage);
568 sfd->has_owner = false;
569 /* Start the number of expected transfers to channel remaining
570 * at one, and number of protocol objects referencing this
571 * shadow_fd at zero.*/
572 sfd->refcount.transfer = 1;
573 sfd->refcount.protocol = 0;
574 sfd->refcount.compute = false;
575
576 sfd->only_here = true;
577
578 wp_debug("Creating new %s shadow RID=%d for local fd %d",
579 fdcat_to_str(sfd->type), sfd->remote_id, fd);
580 switch (sfd->type) {
581 case FDC_FILE: {
582 if (file_sz >= UINT32_MAX / 2) {
583 wp_error("Failed to create shadow structure, file size %zu too large to transfer",
584 file_sz);
585 return sfd;
586 }
587 sfd->buffer_size = file_sz;
588 sfd->file_readonly = false;
589 // both r/w permissions, because the side which allocates the
590 // memory does not always have to be the side that modifies it
591 sfd->mem_local = mmap(NULL, sfd->buffer_size,
592 PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
593 if (sfd->mem_local == MAP_FAILED &&
594 (errno == EPERM || errno == EACCES)) {
595 wp_debug("Initial mmap for RID=%d failed, trying private+readonly",
596 sfd->remote_id);
597 // Some files are memfds that are sealed
598 // to be read-only
599 sfd->mem_local = mmap(NULL, sfd->buffer_size, PROT_READ,
600 MAP_PRIVATE, fd, 0);
601 if (sfd->mem_local != MAP_FAILED) {
602 sfd->file_readonly = true;
603 }
604 }
605
606 if (sfd->mem_local == MAP_FAILED) {
607 wp_error("Mmap failed when creating shadow RID=%d: %s",
608 sfd->remote_id, strerror(errno));
609 return sfd;
610 }
611 // This will be created at the first transfer.
612 // todo: why not create it now?
613 sfd->mem_mirror = NULL;
614 } break;
615 case FDC_PIPE: {
616 // Make this end of the pipe nonblocking, so that we can
617 // include it in our main loop.
618 if (set_nonblocking(sfd->fd_local) == -1) {
619 wp_error("Failed to make fd nonblocking");
620 }
621 sfd->pipe.fd = sfd->fd_local;
622
623 if (force_pipe_iw) {
624 sfd->pipe.can_write = true;
625 } else {
626 /* this classification overestimates with
627 * socketpairs that have partially been shutdown.
628 * what about platform-specific RW pipes? */
629 int flags = fcntl(fd, F_GETFL, 0);
630 if (flags == -1) {
631 wp_error("fctnl F_GETFL failed!");
632 }
633 if ((flags & O_ACCMODE) == O_RDONLY) {
634 sfd->pipe.can_read = true;
635 } else if ((flags & O_ACCMODE) == O_WRONLY) {
636 sfd->pipe.can_write = true;
637 } else {
638 sfd->pipe.can_read = true;
639 sfd->pipe.can_write = true;
640 }
641 }
642 } break;
643 case FDC_DMAVID_IR: {
644 sfd->video_fmt = render->av_video_fmt;
645
646 memcpy(&sfd->dmabuf_info, info,
647 sizeof(struct dmabuf_slice_data));
648 init_render_data(render);
649 sfd->dmabuf_bo = import_dmabuf(render, sfd->fd_local,
650 &sfd->buffer_size, &sfd->dmabuf_info,
651 read_modifier);
652 if (!sfd->dmabuf_bo) {
653 return sfd;
654 }
655 if (setup_video_encode(sfd, render) == -1) {
656 wp_error("Video encoding setup failed for RID=%d",
657 sfd->remote_id);
658 }
659 } break;
660 case FDC_DMAVID_IW: {
661 sfd->video_fmt = render->av_video_fmt;
662
663 memcpy(&sfd->dmabuf_info, info,
664 sizeof(struct dmabuf_slice_data));
665 // TODO: multifd-dmabuf video surface
666 init_render_data(render);
667 sfd->dmabuf_bo = import_dmabuf(render, sfd->fd_local,
668 &sfd->buffer_size, &sfd->dmabuf_info,
669 read_modifier);
670 if (!sfd->dmabuf_bo) {
671 return sfd;
672 }
673 if (setup_video_decode(sfd, render) == -1) {
674 wp_error("Video decoding setup failed for RID=%d",
675 sfd->remote_id);
676 }
677 } break;
678 case FDC_DMABUF: {
679 sfd->buffer_size = 0;
680
681 init_render_data(render);
682 if (info) {
683 memcpy(&sfd->dmabuf_info, info,
684 sizeof(struct dmabuf_slice_data));
685 } else {
686 // already zero initialized (no information).
687 }
688 sfd->dmabuf_bo = import_dmabuf(render, sfd->fd_local,
689 &sfd->buffer_size, &sfd->dmabuf_info,
690 read_modifier);
691 if (!sfd->dmabuf_bo) {
692 return sfd;
693 }
694 // to be created on first transfer
695 sfd->mem_mirror = NULL;
696 } break;
697 case FDC_UNKNOWN:
698 wp_error("Trying to create shadow_fd for unknown filedesc type");
699 break;
700 }
701 return sfd;
702 }
703
shrink_buffer(void * buf,size_t sz)704 static void *shrink_buffer(void *buf, size_t sz)
705 {
706 void *nbuf = realloc(buf, sz);
707 if (nbuf) {
708 return nbuf;
709 } else {
710 wp_debug("Failed to shrink buffer with realloc, not a problem");
711 return buf;
712 }
713 }
714
715 /* Construct and optionally compress a diff between sfd->mem_mirror and
716 * the actual memmap'd data */
worker_run_compress_diff(struct task_data * task,struct thread_data * local)717 static void worker_run_compress_diff(
718 struct task_data *task, struct thread_data *local)
719 {
720 struct shadow_fd *sfd = task->sfd;
721 struct thread_pool *pool = local->pool;
722
723 size_t damage_space = 0;
724 for (int i = 0; i < task->damage_len; i++) {
725 int range = task->damage_intervals[i].end -
726 task->damage_intervals[i].start;
727 damage_space += (size_t)range + 8;
728 }
729 if (task->damaged_end) {
730 damage_space += 1u << pool->diff_alignment_bits;
731 }
732
733 DTRACE_PROBE1(waypipe, worker_compdiff_enter, damage_space);
734
735 char *diff_buffer = NULL;
736 char *diff_target = NULL;
737 if (pool->compression == COMP_NONE) {
738 diff_buffer = malloc(
739 damage_space + sizeof(struct wmsg_buffer_diff));
740 if (!diff_buffer) {
741 wp_error("Allocation failed, dropping diff transfer block");
742 goto end;
743 }
744 diff_target = diff_buffer + sizeof(struct wmsg_buffer_diff);
745 } else {
746 if (buf_ensure_size((int)damage_space, 1, &local->tmp_size,
747 &local->tmp_buf) == -1) {
748 wp_error("Allocation failed, dropping diff transfer block");
749 goto end;
750 }
751 diff_target = local->tmp_buf;
752 }
753
754 DTRACE_PROBE1(waypipe, construct_diff_enter, task->damage_len);
755 size_t diffsize = construct_diff_core(pool->diff_func,
756 pool->diff_alignment_bits, task->damage_intervals,
757 task->damage_len, sfd->mem_mirror, sfd->mem_local,
758 diff_target);
759 size_t ntrailing = 0;
760 if (task->damaged_end) {
761 ntrailing = construct_diff_trailing(sfd->buffer_size,
762 pool->diff_alignment_bits, sfd->mem_mirror,
763 sfd->mem_local, diff_target + diffsize);
764 }
765 DTRACE_PROBE1(waypipe, construct_diff_exit, diffsize);
766
767 if (diffsize == 0 && ntrailing == 0) {
768 free(diff_buffer);
769 goto end;
770 }
771
772 uint8_t *msg;
773 size_t sz;
774 size_t net_diff_sz = diffsize + ntrailing;
775 if (pool->compression == COMP_NONE) {
776 sz = net_diff_sz + sizeof(struct wmsg_buffer_diff);
777 msg = (uint8_t *)diff_buffer;
778 } else {
779 struct bytebuf dst;
780 size_t comp_size = compress_bufsize(pool, net_diff_sz);
781 char *comp_buf = malloc(alignz(comp_size, 4) +
782 sizeof(struct wmsg_buffer_diff));
783 if (!comp_buf) {
784 wp_error("Allocation failed, dropping diff transfer block");
785 goto end;
786 }
787 compress_buffer(pool, &local->comp_ctx, net_diff_sz,
788 diff_target, comp_size,
789 comp_buf + sizeof(struct wmsg_buffer_diff),
790 &dst);
791 sz = dst.size + sizeof(struct wmsg_buffer_diff);
792 msg = (uint8_t *)comp_buf;
793 }
794 msg = shrink_buffer(msg, alignz(sz, 4));
795 memset(msg + sz, 0, alignz(sz, 4) - sz);
796 struct wmsg_buffer_diff header;
797 header.size_and_type = transfer_header(sz, WMSG_BUFFER_DIFF);
798 header.remote_id = sfd->remote_id;
799 header.diff_size = (uint32_t)diffsize;
800 header.ntrailing = (uint32_t)ntrailing;
801 memcpy(msg, &header, sizeof(struct wmsg_buffer_diff));
802
803 transfer_async_add(task->msg_queue, msg, alignz(sz, 4));
804
805 end:
806 DTRACE_PROBE1(waypipe, worker_compdiff_exit, diffsize);
807 }
808
809 /* Compress data for sfd->mem_mirror */
worker_run_compress_block(struct task_data * task,struct thread_data * local)810 static void worker_run_compress_block(
811 struct task_data *task, struct thread_data *local)
812 {
813
814 struct shadow_fd *sfd = task->sfd;
815 struct thread_pool *pool = local->pool;
816 if (task->zone_end == task->zone_start) {
817 wp_error("Skipping task");
818 return;
819 }
820
821 /* Allocate a disjoint target interval to each worker */
822 size_t source_start = (size_t)task->zone_start;
823 size_t source_end = (size_t)task->zone_end;
824 DTRACE_PROBE1(waypipe, worker_comp_enter, source_end - source_start);
825
826 size_t sz = 0;
827 uint8_t *msg;
828 if (pool->compression == COMP_NONE) {
829 sz = sizeof(struct wmsg_buffer_fill) +
830 (source_end - source_start);
831
832 msg = malloc(alignz(sz, 4));
833 if (!msg) {
834 wp_error("Allocation failed, dropping fill transfer block");
835 goto end;
836 }
837 memcpy(msg + sizeof(struct wmsg_buffer_fill),
838 sfd->mem_mirror + source_start,
839 source_end - source_start);
840 } else {
841 size_t comp_size = compress_bufsize(
842 pool, source_end - source_start);
843 msg = malloc(alignz(comp_size, 4) +
844 sizeof(struct wmsg_buffer_fill));
845 if (!msg) {
846 wp_error("Allocation failed, dropping fill transfer block");
847 goto end;
848 }
849 struct bytebuf dst;
850 compress_buffer(pool, &local->comp_ctx,
851 source_end - source_start,
852 &sfd->mem_mirror[source_start], comp_size,
853 (char *)msg + sizeof(struct wmsg_buffer_fill),
854 &dst);
855 sz = dst.size + sizeof(struct wmsg_buffer_fill);
856 msg = shrink_buffer(msg, alignz(sz, 4));
857 }
858 memset(msg + sz, 0, alignz(sz, 4) - sz);
859 struct wmsg_buffer_fill header;
860 header.size_and_type = transfer_header(sz, WMSG_BUFFER_FILL);
861 header.remote_id = sfd->remote_id;
862 header.start = (uint32_t)source_start;
863 header.end = (uint32_t)source_end;
864 memcpy(msg, &header, sizeof(struct wmsg_buffer_fill));
865
866 transfer_async_add(task->msg_queue, msg, alignz(sz, 4));
867
868 end:
869 DTRACE_PROBE1(waypipe, worker_comp_exit,
870 sz - sizeof(struct wmsg_buffer_fill));
871 }
872
873 /* Optionally compress the data in mem_mirror, and set up the initial
874 * transfer blocks */
queue_fill_transfers(struct thread_pool * threads,struct shadow_fd * sfd,struct transfer_queue * transfers)875 static void queue_fill_transfers(struct thread_pool *threads,
876 struct shadow_fd *sfd, struct transfer_queue *transfers)
877 {
878 // new transfer, we send file contents verbatim
879 const int chunksize = 262144;
880
881 int region_start = (int)sfd->remote_bufsize;
882 int region_end = (int)sfd->buffer_size;
883 if (region_start > region_end) {
884 wp_error("Cannot queue fill transfers for a size reduction from %d to %d bytes",
885 region_start, region_end);
886 return;
887 }
888 if (region_start == region_end) {
889 return;
890 }
891
892 /* Keep sfd alive at least until write to channel is done */
893 sfd->refcount.compute = true;
894
895 int nshards = ceildiv((region_end - region_start), chunksize);
896
897 pthread_mutex_lock(&threads->work_mutex);
898 if (buf_ensure_size(threads->stack_count + nshards,
899 sizeof(struct task_data), &threads->stack_size,
900 (void **)&threads->stack) == -1) {
901 wp_error("Allocation failed, dropping some fill tasks");
902 pthread_mutex_unlock(&threads->work_mutex);
903 return;
904 }
905
906 for (int i = 0; i < nshards; i++) {
907 struct task_data task;
908 memset(&task, 0, sizeof(task));
909 task.type = TASK_COMPRESS_BLOCK;
910 task.sfd = sfd;
911 task.msg_queue = &transfers->async_recv_queue;
912
913 task.zone_start = split_interval(
914 region_start, region_end, nshards, i);
915 task.zone_end = split_interval(
916 region_start, region_end, nshards, i + 1);
917 threads->stack[threads->stack_count++] = task;
918 }
919 pthread_mutex_unlock(&threads->work_mutex);
920 }
921
queue_diff_transfers(struct thread_pool * threads,struct shadow_fd * sfd,struct transfer_queue * transfers)922 static void queue_diff_transfers(struct thread_pool *threads,
923 struct shadow_fd *sfd, struct transfer_queue *transfers)
924 {
925 const int chunksize = 262144;
926 if (!sfd->damage.damage) {
927 return;
928 }
929
930 /* Keep sfd alive at least until write to channel is done */
931 sfd->refcount.compute = true;
932
933 int bs = 1 << threads->diff_alignment_bits;
934 int align_end = bs * ((int)sfd->buffer_size / bs);
935 bool check_tail = false;
936
937 int net_damage = 0;
938 if (sfd->damage.damage == DAMAGE_EVERYTHING) {
939 reset_damage(&sfd->damage);
940 struct ext_interval all = {.start = 0,
941 .width = align_end,
942 .rep = 1,
943 .stride = 0};
944 merge_damage_records(&sfd->damage, 1, &all,
945 threads->diff_alignment_bits);
946 check_tail = true;
947 net_damage = align_end;
948 } else {
949 for (int ir = 0, iw = 0; ir < sfd->damage.ndamage_intvs; ir++) {
950 /* Extend all damage to the nearest alignment block */
951 struct interval e = sfd->damage.damage[ir];
952 check_tail |= e.end > align_end;
953 e.end = min(e.end, align_end);
954 if (e.start < e.end) {
955 /* End clipping may produce empty/degenerate
956 * intervals, so filter them out now */
957 sfd->damage.damage[iw++] = e;
958 net_damage += e.end - e.start;
959 }
960 if (e.end & (bs - 1) || e.start & (bs - 1)) {
961 wp_error("Interval [%d, %d) is not aligned",
962 e.start, e.end);
963 }
964 }
965 }
966 int nshards = ceildiv(net_damage, chunksize);
967
968 /* Instead of allocating individual buffers for each task, create a
969 * global damage tracking buffer into which tasks index. It will be
970 * deleted in `finish_update`. */
971 struct interval *intvs = malloc(
972 sizeof(struct interval) *
973 (size_t)(sfd->damage.ndamage_intvs + nshards));
974 int *offsets = calloc((size_t)nshards + 1, sizeof(int));
975 if (!offsets || !intvs) {
976 // TODO: avoid making this allocation entirely
977 wp_error("Failed to allocate diff region control buffer, dropping diff tasks");
978 free(intvs);
979 free(offsets);
980 return;
981 }
982
983 sfd->damage_task_interval_store = intvs;
984 int tot_blocks = net_damage / bs;
985 int ir = 0, iw = 0, acc_prev_blocks = 0;
986 for (int shard = 0; shard < nshards; shard++) {
987 int s_lower = split_interval(0, tot_blocks, nshards, shard);
988 int s_upper = split_interval(0, tot_blocks, nshards, shard + 1);
989
990 while (acc_prev_blocks < s_upper &&
991 ir < sfd->damage.ndamage_intvs) {
992 struct interval e = sfd->damage.damage[ir];
993 const int w = (e.end - e.start) / bs;
994
995 int a_low = max(0, s_lower - acc_prev_blocks);
996 int a_high = min(w, s_upper - acc_prev_blocks);
997
998 struct interval r = {
999 .start = e.start + bs * a_low,
1000 .end = e.start + bs * a_high,
1001 };
1002 intvs[iw++] = r;
1003
1004 if (acc_prev_blocks + w > s_upper) {
1005 break;
1006 } else {
1007 acc_prev_blocks += w;
1008 ir++;
1009 }
1010 }
1011
1012 offsets[shard + 1] = iw;
1013 }
1014 /* Reset damage, once it has been applied */
1015 reset_damage(&sfd->damage);
1016
1017 pthread_mutex_lock(&threads->work_mutex);
1018 if (buf_ensure_size(threads->stack_count + nshards,
1019 sizeof(struct task_data), &threads->stack_size,
1020 (void **)&threads->stack) == -1) {
1021 wp_error("Allocation failed, dropping some diff tasks");
1022 pthread_mutex_unlock(&threads->work_mutex);
1023 free(offsets);
1024 return;
1025 }
1026
1027 for (int i = 0; i < nshards; i++) {
1028 struct task_data task;
1029 memset(&task, 0, sizeof(task));
1030 task.type = TASK_COMPRESS_DIFF;
1031 task.sfd = sfd;
1032 task.msg_queue = &transfers->async_recv_queue;
1033
1034 task.damage_len = offsets[i + 1] - offsets[i];
1035 task.damage_intervals =
1036 &sfd->damage_task_interval_store[offsets[i]];
1037 task.damaged_end = (i == nshards - 1) && check_tail;
1038
1039 threads->stack[threads->stack_count++] = task;
1040 }
1041 pthread_mutex_unlock(&threads->work_mutex);
1042 free(offsets);
1043 }
1044
add_dmabuf_create_request(struct transfer_queue * transfers,struct shadow_fd * sfd,enum wmsg_type variant)1045 static void add_dmabuf_create_request(struct transfer_queue *transfers,
1046 struct shadow_fd *sfd, enum wmsg_type variant)
1047 {
1048 size_t actual_len = sizeof(struct wmsg_open_dmabuf) +
1049 sizeof(struct dmabuf_slice_data);
1050 size_t padded_len = alignz(actual_len, 4);
1051
1052 uint8_t *data = calloc(1, padded_len);
1053 struct wmsg_open_dmabuf *header = (struct wmsg_open_dmabuf *)data;
1054 header->file_size = (uint32_t)sfd->buffer_size;
1055 header->remote_id = sfd->remote_id;
1056 header->size_and_type = transfer_header(actual_len, variant);
1057 memcpy(data + sizeof(struct wmsg_open_dmabuf), &sfd->dmabuf_info,
1058 sizeof(struct dmabuf_slice_data));
1059
1060 transfer_add(transfers, padded_len, data);
1061 }
1062
add_dmabuf_create_request_v2(struct transfer_queue * transfers,struct shadow_fd * sfd,enum wmsg_type variant,enum video_coding_fmt fmt)1063 static void add_dmabuf_create_request_v2(struct transfer_queue *transfers,
1064 struct shadow_fd *sfd, enum wmsg_type variant,
1065 enum video_coding_fmt fmt)
1066 {
1067 size_t actual_len = sizeof(struct wmsg_open_dmavid) +
1068 sizeof(struct dmabuf_slice_data);
1069 static_assert((sizeof(struct wmsg_open_dmavid) +
1070 sizeof(struct dmabuf_slice_data)) %
1071 4 ==
1072 0,
1073 "alignment");
1074
1075 uint8_t *data = calloc(1, actual_len);
1076 struct wmsg_open_dmavid *header = (struct wmsg_open_dmavid *)data;
1077 header->file_size = (uint32_t)sfd->buffer_size;
1078 header->remote_id = sfd->remote_id;
1079 header->size_and_type = transfer_header(actual_len, variant);
1080 header->vid_flags = (fmt == VIDEO_H264) ? DMAVID_H264 : DMAVID_VP9;
1081
1082 memcpy(data + sizeof(*header), &sfd->dmabuf_info,
1083 sizeof(struct dmabuf_slice_data));
1084
1085 transfer_add(transfers, actual_len, data);
1086 }
add_file_create_request(struct transfer_queue * transfers,struct shadow_fd * sfd)1087 static void add_file_create_request(
1088 struct transfer_queue *transfers, struct shadow_fd *sfd)
1089 {
1090 struct wmsg_open_file *header =
1091 calloc(1, sizeof(struct wmsg_open_file));
1092 header->file_size = (uint32_t)sfd->buffer_size;
1093 header->remote_id = sfd->remote_id;
1094 header->size_and_type = transfer_header(
1095 sizeof(struct wmsg_open_file), WMSG_OPEN_FILE);
1096
1097 transfer_add(transfers, sizeof(struct wmsg_open_file), header);
1098 }
1099
finish_update(struct shadow_fd * sfd)1100 void finish_update(struct shadow_fd *sfd)
1101 {
1102 if (!sfd->refcount.compute) {
1103 return;
1104 }
1105 if (sfd->type == FDC_DMABUF && sfd->dmabuf_map_handle) {
1106 // if this fails, unmap_dmabuf will print error
1107 (void)unmap_dmabuf(sfd->dmabuf_bo, sfd->dmabuf_map_handle);
1108 sfd->dmabuf_map_handle = NULL;
1109 sfd->mem_local = NULL;
1110 }
1111 if (sfd->damage_task_interval_store) {
1112 free(sfd->damage_task_interval_store);
1113 sfd->damage_task_interval_store = NULL;
1114 }
1115 sfd->refcount.compute = false;
1116 }
1117
collect_update(struct thread_pool * threads,struct shadow_fd * sfd,struct transfer_queue * transfers,bool use_old_dmavid_req)1118 void collect_update(struct thread_pool *threads, struct shadow_fd *sfd,
1119 struct transfer_queue *transfers, bool use_old_dmavid_req)
1120 {
1121 switch (sfd->type) {
1122 case FDC_FILE: {
1123 if (!sfd->is_dirty) {
1124 // File is clean, we have no reason to believe
1125 // that its contents could have changed
1126 return;
1127 }
1128 // Clear dirty state
1129 sfd->is_dirty = false;
1130 if (sfd->only_here) {
1131 // increase space, to avoid overflow when
1132 // writing this buffer along with padding
1133 size_t alignment = 1u << threads->diff_alignment_bits;
1134 sfd->mem_mirror = zeroed_aligned_alloc(
1135 alignz(sfd->buffer_size, alignment),
1136 alignment, &sfd->mem_mirror_handle);
1137 if (!sfd->mem_mirror) {
1138 wp_error("Failed to allocate mirror");
1139 return;
1140 }
1141
1142 sfd->only_here = false;
1143
1144 sfd->remote_bufsize = 0;
1145
1146 add_file_create_request(transfers, sfd);
1147 sfd->remote_bufsize = sfd->buffer_size;
1148 queue_diff_transfers(threads, sfd, transfers);
1149 return;
1150 }
1151
1152 if (sfd->remote_bufsize < sfd->buffer_size) {
1153 struct wmsg_open_file *header = calloc(
1154 1, sizeof(struct wmsg_open_file));
1155 header->file_size = (uint32_t)sfd->buffer_size;
1156 header->remote_id = sfd->remote_id;
1157 header->size_and_type = transfer_header(
1158 sizeof(struct wmsg_open_file),
1159 WMSG_EXTEND_FILE);
1160
1161 transfer_add(transfers, sizeof(struct wmsg_open_file),
1162 header);
1163
1164 sfd->remote_bufsize = sfd->buffer_size;
1165 }
1166
1167 queue_diff_transfers(threads, sfd, transfers);
1168 } break;
1169 case FDC_DMABUF: {
1170 // If buffer is clean, do not check for changes
1171 if (!sfd->is_dirty) {
1172 return;
1173 }
1174 sfd->is_dirty = false;
1175
1176 bool first = false;
1177 if (sfd->only_here) {
1178 sfd->only_here = false;
1179 first = true;
1180
1181 add_dmabuf_create_request(
1182 transfers, sfd, WMSG_OPEN_DMABUF);
1183 }
1184 if (!sfd->dmabuf_bo) {
1185 // ^ was not previously able to create buffer
1186 return;
1187 }
1188 if (!sfd->mem_local) {
1189 sfd->mem_local = map_dmabuf(sfd->dmabuf_bo, false,
1190 &sfd->dmabuf_map_handle, NULL, NULL);
1191 if (!sfd->mem_local) {
1192 return;
1193 }
1194 }
1195 if (first) {
1196 size_t alignment = 1u << threads->diff_alignment_bits;
1197 sfd->mem_mirror = zeroed_aligned_alloc(
1198 alignz(sfd->buffer_size, alignment),
1199 alignment, &sfd->mem_mirror_handle);
1200 if (!sfd->mem_mirror) {
1201 wp_error("Failed to allocate mirror");
1202 return;
1203 }
1204 memcpy(sfd->mem_mirror, sfd->mem_local,
1205 sfd->buffer_size);
1206
1207 sfd->remote_bufsize = 0;
1208 queue_fill_transfers(threads, sfd, transfers);
1209 sfd->remote_bufsize = sfd->buffer_size;
1210 } else {
1211 // TODO: detailed damage tracking
1212 damage_everything(&sfd->damage);
1213
1214 queue_diff_transfers(threads, sfd, transfers);
1215 }
1216 /* Unmapping will be handled by finish_update() */
1217 } break;
1218 case FDC_DMAVID_IR: {
1219 if (!sfd->is_dirty) {
1220 return;
1221 }
1222 sfd->is_dirty = false;
1223 if (!sfd->dmabuf_bo || !sfd->video_context) {
1224 // ^ was not previously able to create buffer
1225 return;
1226 }
1227 if (sfd->only_here) {
1228 sfd->only_here = false;
1229 if (use_old_dmavid_req) {
1230 add_dmabuf_create_request(transfers, sfd,
1231 WMSG_OPEN_DMAVID_DST);
1232 } else {
1233 add_dmabuf_create_request_v2(transfers, sfd,
1234 WMSG_OPEN_DMAVID_DST_V2,
1235 sfd->video_fmt);
1236 }
1237 }
1238 collect_video_from_mirror(sfd, transfers);
1239 } break;
1240 case FDC_DMAVID_IW: {
1241 sfd->is_dirty = false;
1242 if (sfd->only_here) {
1243 sfd->only_here = false;
1244 if (use_old_dmavid_req) {
1245 add_dmabuf_create_request(transfers, sfd,
1246 WMSG_OPEN_DMAVID_SRC);
1247 } else {
1248 add_dmabuf_create_request_v2(transfers, sfd,
1249 WMSG_OPEN_DMAVID_SRC_V2,
1250 sfd->video_fmt);
1251 }
1252 }
1253 } break;
1254 case FDC_PIPE: {
1255 // Pipes always update, no matter what the message
1256 // stream indicates.
1257 if (sfd->only_here) {
1258 sfd->only_here = false;
1259
1260 struct wmsg_basic *createh =
1261 calloc(1, sizeof(struct wmsg_basic));
1262 enum wmsg_type type;
1263 if (sfd->pipe.can_read && !sfd->pipe.can_write) {
1264 type = WMSG_OPEN_IW_PIPE;
1265 sfd->pipe.remote_can_write = true;
1266 } else if (sfd->pipe.can_write && !sfd->pipe.can_read) {
1267 type = WMSG_OPEN_IR_PIPE;
1268 sfd->pipe.remote_can_read = true;
1269 } else {
1270 type = WMSG_OPEN_RW_PIPE;
1271 sfd->pipe.remote_can_read = true;
1272 sfd->pipe.remote_can_write = true;
1273 }
1274 createh->size_and_type = transfer_header(
1275 sizeof(struct wmsg_basic), type);
1276 createh->remote_id = sfd->remote_id;
1277
1278 transfer_add(transfers, sizeof(struct wmsg_basic),
1279 createh);
1280 }
1281
1282 if (sfd->pipe.recv.used > 0) {
1283 size_t msgsz = sizeof(struct wmsg_basic) +
1284 (size_t)sfd->pipe.recv.used;
1285 char *buf = malloc(alignz(msgsz, 4));
1286 struct wmsg_basic *header = (struct wmsg_basic *)buf;
1287 header->size_and_type = transfer_header(
1288 msgsz, WMSG_PIPE_TRANSFER);
1289 header->remote_id = sfd->remote_id;
1290 memcpy(buf + sizeof(struct wmsg_basic),
1291 sfd->pipe.recv.data,
1292 (size_t)sfd->pipe.recv.used);
1293 memset(buf + msgsz, 0, alignz(msgsz, 4) - msgsz);
1294
1295 transfer_add(transfers, alignz(msgsz, 4), buf);
1296
1297 sfd->pipe.recv.used = 0;
1298 }
1299
1300 if (!sfd->pipe.can_read && sfd->pipe.remote_can_write) {
1301 struct wmsg_basic *header =
1302 calloc(1, sizeof(struct wmsg_basic));
1303 header->size_and_type = transfer_header(
1304 sizeof(struct wmsg_basic),
1305 WMSG_PIPE_SHUTDOWN_W);
1306 header->remote_id = sfd->remote_id;
1307 transfer_add(transfers, sizeof(struct wmsg_basic),
1308 header);
1309 sfd->pipe.remote_can_write = false;
1310 }
1311 if (!sfd->pipe.can_write && sfd->pipe.remote_can_read) {
1312 struct wmsg_basic *header =
1313 calloc(1, sizeof(struct wmsg_basic));
1314 header->size_and_type = transfer_header(
1315 sizeof(struct wmsg_basic),
1316 WMSG_PIPE_SHUTDOWN_R);
1317 header->remote_id = sfd->remote_id;
1318 transfer_add(transfers, sizeof(struct wmsg_basic),
1319 header);
1320 sfd->pipe.remote_can_read = false;
1321 }
1322 } break;
1323 case FDC_UNKNOWN:
1324 break;
1325 }
1326 }
1327
increase_buffer_sizes(struct shadow_fd * sfd,struct thread_pool * threads,size_t new_size)1328 static void increase_buffer_sizes(struct shadow_fd *sfd,
1329 struct thread_pool *threads, size_t new_size)
1330 {
1331 size_t old_size = sfd->buffer_size;
1332 munmap(sfd->mem_local, old_size);
1333 sfd->buffer_size = new_size;
1334 sfd->mem_local = mmap(NULL, sfd->buffer_size, PROT_READ | PROT_WRITE,
1335 MAP_SHARED, sfd->fd_local, 0);
1336 if (sfd->mem_local == MAP_FAILED) {
1337 wp_error("Mmap failed to remap increased buffer for RID=%d: %s",
1338 sfd->remote_id, strerror(errno));
1339 return;
1340 }
1341 /* if resize happens before any transfers, mirror may still be zero */
1342 if (sfd->mem_mirror) {
1343 // todo: handle allocation failures
1344 size_t alignment = 1u << threads->diff_alignment_bits;
1345 void *new_mirror = zeroed_aligned_realloc(
1346 alignz(old_size, alignment),
1347 alignz(sfd->buffer_size, alignment), alignment,
1348 sfd->mem_mirror, &sfd->mem_mirror_handle);
1349 if (!new_mirror) {
1350 wp_error("Failed to reallocate mirror");
1351 return;
1352 }
1353 sfd->mem_mirror = new_mirror;
1354 }
1355 }
1356
pipe_close_write(struct shadow_fd * sfd)1357 static void pipe_close_write(struct shadow_fd *sfd)
1358 {
1359 if (sfd->pipe.can_read) {
1360 /* if pipe.fd is both readable and writable, assume
1361 * socket
1362 */
1363 shutdown(sfd->pipe.fd, SHUT_WR);
1364 } else {
1365 checked_close(sfd->pipe.fd);
1366 if (sfd->fd_local == sfd->pipe.fd) {
1367 sfd->fd_local = -1;
1368 }
1369 sfd->pipe.fd = -1;
1370 }
1371 sfd->pipe.can_write = false;
1372
1373 /* Also free any accumulated data that was not delivered */
1374 free(sfd->pipe.send.data);
1375 memset(&sfd->pipe.send, 0, sizeof(sfd->pipe.send));
1376 }
pipe_close_read(struct shadow_fd * sfd)1377 static void pipe_close_read(struct shadow_fd *sfd)
1378 {
1379 if (sfd->pipe.can_write) {
1380 /* if pipe.fd is both readable and writable, assume
1381 * socket */
1382 // TODO: check return value, can legitimately fail with ENOBUFS
1383 shutdown(sfd->pipe.fd, SHUT_RD);
1384 } else {
1385 checked_close(sfd->pipe.fd);
1386 if (sfd->fd_local == sfd->pipe.fd) {
1387 sfd->fd_local = -1;
1388 }
1389 sfd->pipe.fd = -1;
1390 }
1391 sfd->pipe.can_read = false;
1392 }
1393
open_sfd(struct fd_translation_map * map,struct shadow_fd ** sfd_ptr,int remote_id)1394 static int open_sfd(struct fd_translation_map *map, struct shadow_fd **sfd_ptr,
1395 int remote_id)
1396 {
1397 if (*sfd_ptr) {
1398 wp_error("shadow structure for RID=%d was already created",
1399 remote_id);
1400 return ERR_FATAL;
1401 }
1402
1403 wp_debug("Introducing new fd, remoteid=%d", remote_id);
1404 struct shadow_fd *sfd = calloc(1, sizeof(struct shadow_fd));
1405 if (!sfd) {
1406 wp_error("failed to allocate shadow structure for RID=%d",
1407 remote_id);
1408 return ERR_FATAL;
1409 }
1410 sfd->link.l_prev = &map->link;
1411 sfd->link.l_next = map->link.l_next;
1412 sfd->link.l_prev->l_next = &sfd->link;
1413 sfd->link.l_next->l_prev = &sfd->link;
1414
1415 sfd->remote_id = remote_id;
1416 sfd->fd_local = -1;
1417 sfd->is_dirty = false;
1418 /* a received file descriptor is up to date by default */
1419 reset_damage(&sfd->damage);
1420 sfd->only_here = false;
1421 /* Start the object reference at one, so that, if it is owned by
1422 * some known protocol object, it can not be deleted until the
1423 * fd has at least be transferred over the Wayland connection */
1424 sfd->refcount.transfer = 1;
1425 sfd->refcount.protocol = 0;
1426 sfd->refcount.compute = false;
1427 *sfd_ptr = sfd;
1428 return 0;
1429 }
check_message_min_size(enum wmsg_type type,const struct bytebuf * msg,size_t min_size)1430 static int check_message_min_size(
1431 enum wmsg_type type, const struct bytebuf *msg, size_t min_size)
1432 {
1433 if (msg->size < min_size) {
1434 wp_error("Message size for %s is smaller than expected (%zu bytes vs %zu bytes)",
1435 wmsg_type_to_str(type), msg->size, min_size);
1436 return ERR_FATAL;
1437 }
1438
1439 return 0;
1440 }
1441
check_sfd_type_2(struct shadow_fd * sfd,int remote_id,enum wmsg_type mtype,enum fdcat ftype1,enum fdcat ftype2)1442 static int check_sfd_type_2(struct shadow_fd *sfd, int remote_id,
1443 enum wmsg_type mtype, enum fdcat ftype1, enum fdcat ftype2)
1444 {
1445 if (!sfd) {
1446 wp_error("shadow structure for RID=%d was not available",
1447 remote_id);
1448 return ERR_FATAL;
1449 }
1450 if (sfd->type != ftype1 && sfd->type != ftype2) {
1451 wp_error("Trying to apply %s to RID=%d which has incompatible type=%s",
1452 wmsg_type_to_str(mtype), remote_id,
1453 fdcat_to_str(sfd->type));
1454 return ERR_FATAL;
1455 }
1456 return 0;
1457 }
check_sfd_type(struct shadow_fd * sfd,int remote_id,enum wmsg_type mtype,enum fdcat ftype)1458 static int check_sfd_type(struct shadow_fd *sfd, int remote_id,
1459 enum wmsg_type mtype, enum fdcat ftype)
1460 {
1461 return check_sfd_type_2(sfd, remote_id, mtype, ftype, ftype);
1462 }
1463
apply_update(struct fd_translation_map * map,struct thread_pool * threads,struct render_data * render,enum wmsg_type type,int remote_id,const struct bytebuf * msg)1464 int apply_update(struct fd_translation_map *map, struct thread_pool *threads,
1465 struct render_data *render, enum wmsg_type type, int remote_id,
1466 const struct bytebuf *msg)
1467 {
1468 struct shadow_fd *sfd = get_shadow_for_rid(map, remote_id);
1469 int ret = 0;
1470 switch (type) {
1471 default:
1472 case WMSG_RESTART:
1473 case WMSG_CLOSE:
1474 case WMSG_ACK_NBLOCKS:
1475 case WMSG_INJECT_RIDS:
1476 case WMSG_PROTOCOL: {
1477 if (wmsg_type_is_known(type)) {
1478 wp_error("Unexpected update type: %s",
1479 wmsg_type_to_str(type));
1480 } else {
1481 wp_error("Unidentified update type, number %u. "
1482 "This may be caused by the Waypipe instances "
1483 "on different sides of the connection having "
1484 "incompatible versions or options.",
1485 (unsigned)type);
1486 }
1487 return ERR_FATAL;
1488 }
1489 /* SFD creation messages */
1490 case WMSG_OPEN_FILE: {
1491 if ((ret = check_message_min_size(type, msg,
1492 sizeof(struct wmsg_open_file))) < 0) {
1493 return ret;
1494 }
1495
1496 if ((ret = open_sfd(map, &sfd, remote_id)) < 0) {
1497 return ret;
1498 }
1499
1500 const struct wmsg_open_file header =
1501 *(const struct wmsg_open_file *)msg->data;
1502
1503 sfd->type = FDC_FILE;
1504 sfd->mem_local = NULL;
1505 sfd->buffer_size = header.file_size;
1506 sfd->remote_bufsize = sfd->buffer_size;
1507 size_t alignment = 1u << threads->diff_alignment_bits;
1508 sfd->mem_mirror = zeroed_aligned_alloc(
1509 alignz(sfd->buffer_size, alignment), alignment,
1510 &sfd->mem_mirror_handle);
1511 if (!sfd->mem_mirror) {
1512 wp_error("Failed to allocate mirror");
1513 return 0;
1514 }
1515
1516 sfd->fd_local = create_anon_file();
1517 if (sfd->fd_local == -1) {
1518 wp_error("Failed to create anon file for object %d: %s",
1519 sfd->remote_id, strerror(errno));
1520 return 0;
1521 }
1522 /* ftruncate zero initializes the file by default, matching
1523 * the zeroed mem_mirror buffer */
1524 if (ftruncate(sfd->fd_local, (off_t)sfd->buffer_size) == -1) {
1525 wp_error("Failed to resize anon file to size %zu for reason: %s",
1526 sfd->buffer_size, strerror(errno));
1527 return 0;
1528 }
1529 sfd->mem_local = mmap(NULL, sfd->buffer_size,
1530 PROT_READ | PROT_WRITE, MAP_SHARED,
1531 sfd->fd_local, 0);
1532 if (sfd->mem_local == MAP_FAILED) {
1533 wp_error("Failed to mmap newly created shm file for object %d: %s",
1534 sfd->remote_id, strerror(errno));
1535 sfd->mem_local = NULL;
1536 return 0;
1537 }
1538
1539 return 0;
1540 }
1541 case WMSG_OPEN_DMABUF: {
1542 if ((ret = check_message_min_size(type, msg,
1543 sizeof(struct wmsg_open_dmabuf) +
1544 sizeof(struct dmabuf_slice_data))) <
1545 0) {
1546 return ret;
1547 }
1548
1549 if ((ret = open_sfd(map, &sfd, remote_id)) < 0) {
1550 return ret;
1551 }
1552
1553 sfd->type = FDC_DMABUF;
1554 const struct wmsg_open_dmabuf header =
1555 *(const struct wmsg_open_dmabuf *)msg->data;
1556 sfd->buffer_size = header.file_size;
1557
1558 memcpy(&sfd->dmabuf_info,
1559 msg->data + sizeof(struct wmsg_open_dmabuf),
1560 sizeof(struct dmabuf_slice_data));
1561 size_t alignment = 1u << threads->diff_alignment_bits;
1562 sfd->mem_mirror = zeroed_aligned_alloc(
1563 alignz(sfd->buffer_size, alignment), alignment,
1564 &sfd->mem_mirror_handle);
1565 if (!sfd->mem_mirror) {
1566 wp_error("Failed to allocate mirror");
1567 return 0;
1568 }
1569
1570 wp_debug("Creating remote DMAbuf of %d bytes",
1571 (int)sfd->buffer_size);
1572 // Create mirror from first transfer
1573 // The file can only actually be created when we know
1574 // what type it is?
1575 if (init_render_data(render) == -1) {
1576 sfd->fd_local = -1;
1577 return 0;
1578 }
1579
1580 sfd->dmabuf_bo = make_dmabuf(
1581 render, sfd->buffer_size, &sfd->dmabuf_info);
1582 if (!sfd->dmabuf_bo) {
1583 sfd->fd_local = -1;
1584 return 0;
1585 }
1586 sfd->fd_local = export_dmabuf(sfd->dmabuf_bo);
1587
1588 return 0;
1589 }
1590 case WMSG_OPEN_DMAVID_DST:
1591 case WMSG_OPEN_DMAVID_DST_V2: {
1592 const size_t min_msg_size =
1593 sizeof(struct dmabuf_slice_data) +
1594 ((type == WMSG_OPEN_DMAVID_DST_V2)
1595 ? sizeof(struct wmsg_open_dmavid)
1596 : sizeof(struct wmsg_open_dmabuf));
1597 if ((ret = check_message_min_size(type, msg, min_msg_size)) <
1598 0) {
1599 return ret;
1600 }
1601
1602 if ((ret = open_sfd(map, &sfd, remote_id)) < 0) {
1603 return ret;
1604 }
1605
1606 /* remote read data, this side writes data */
1607 sfd->type = FDC_DMAVID_IW;
1608 if (type == WMSG_OPEN_DMAVID_DST) {
1609 const struct wmsg_open_dmabuf header =
1610 *(const struct wmsg_open_dmabuf *)
1611 msg->data;
1612 sfd->buffer_size = header.file_size;
1613
1614 memcpy(&sfd->dmabuf_info,
1615 msg->data + sizeof(struct wmsg_open_dmabuf),
1616 sizeof(struct dmabuf_slice_data));
1617 sfd->video_fmt = VIDEO_H264;
1618 } else {
1619 const struct wmsg_open_dmavid header =
1620 *(const struct wmsg_open_dmavid *)
1621 msg->data;
1622 sfd->buffer_size = header.file_size;
1623
1624 memcpy(&sfd->dmabuf_info,
1625 msg->data + sizeof(struct wmsg_open_dmavid),
1626 sizeof(struct dmabuf_slice_data));
1627 if ((header.vid_flags & 0xff) == DMAVID_H264) {
1628 sfd->video_fmt = VIDEO_H264;
1629 } else if ((header.vid_flags & 0xff) == DMAVID_VP9) {
1630 sfd->video_fmt = VIDEO_VP9;
1631 } else {
1632 sfd->video_fmt = VIDEO_H264;
1633 wp_error("Unidentified video format for RID=%d",
1634 sfd->remote_id);
1635 return 0;
1636 }
1637 }
1638
1639 if (init_render_data(render) == -1) {
1640 sfd->fd_local = -1;
1641 return 0;
1642 }
1643 sfd->dmabuf_bo = make_dmabuf(
1644 render, sfd->buffer_size, &sfd->dmabuf_info);
1645 if (!sfd->dmabuf_bo) {
1646 wp_error("FDC_DMAVID_IW: RID=%d make_dmabuf failure, sz=%d (%d)",
1647 sfd->remote_id, (int)sfd->buffer_size,
1648 sizeof(struct dmabuf_slice_data));
1649 return 0;
1650 }
1651 sfd->fd_local = export_dmabuf(sfd->dmabuf_bo);
1652
1653 if (setup_video_decode(sfd, render) == -1) {
1654 wp_error("Video decoding setup failed for RID=%d",
1655 sfd->remote_id);
1656 }
1657 return 0;
1658 }
1659 case WMSG_OPEN_DMAVID_SRC:
1660 case WMSG_OPEN_DMAVID_SRC_V2: {
1661 const size_t min_msg_size =
1662 sizeof(struct dmabuf_slice_data) +
1663 ((type == WMSG_OPEN_DMAVID_SRC_V2)
1664 ? sizeof(struct wmsg_open_dmavid)
1665 : sizeof(struct wmsg_open_dmabuf));
1666 if ((ret = check_message_min_size(type, msg, min_msg_size)) <
1667 0) {
1668 return ret;
1669 }
1670 if ((ret = open_sfd(map, &sfd, remote_id)) < 0) {
1671 return ret;
1672 }
1673
1674 /* remote writes data, this side reads data */
1675 sfd->type = FDC_DMAVID_IR;
1676 // TODO: deduplicate this section with WMSG_OPEN_DMAVID_DST,
1677 // or stop handling V1 and V2 in the same branch
1678 if (type == WMSG_OPEN_DMAVID_SRC) {
1679 const struct wmsg_open_dmabuf header =
1680 *(const struct wmsg_open_dmabuf *)
1681 msg->data;
1682 sfd->buffer_size = header.file_size;
1683
1684 memcpy(&sfd->dmabuf_info,
1685 msg->data + sizeof(struct wmsg_open_dmabuf),
1686 sizeof(struct dmabuf_slice_data));
1687 sfd->video_fmt = VIDEO_H264;
1688 } else {
1689 const struct wmsg_open_dmavid header =
1690 *(const struct wmsg_open_dmavid *)
1691 msg->data;
1692 sfd->buffer_size = header.file_size;
1693
1694 memcpy(&sfd->dmabuf_info,
1695 msg->data + sizeof(struct wmsg_open_dmavid),
1696 sizeof(struct dmabuf_slice_data));
1697 if ((header.vid_flags & 0xff) == DMAVID_H264) {
1698 sfd->video_fmt = VIDEO_H264;
1699 } else if ((header.vid_flags & 0xff) == DMAVID_VP9) {
1700 sfd->video_fmt = VIDEO_VP9;
1701 } else {
1702 sfd->video_fmt = VIDEO_H264;
1703 wp_error("Unidentified video format for RID=%d",
1704 sfd->remote_id);
1705 return 0;
1706 }
1707 }
1708
1709 if (init_render_data(render) == -1) {
1710 sfd->fd_local = -1;
1711 return 0;
1712 }
1713
1714 sfd->dmabuf_bo = make_dmabuf(
1715 render, sfd->buffer_size, &sfd->dmabuf_info);
1716 if (!sfd->dmabuf_bo) {
1717 wp_error("FDC_DMAVID_IR: RID=%d make_dmabuf failure",
1718 sfd->remote_id);
1719 return 0;
1720 }
1721 sfd->fd_local = export_dmabuf(sfd->dmabuf_bo);
1722
1723 if (setup_video_encode(sfd, render) == -1) {
1724 wp_error("Video encoding setup failed for RID=%d",
1725 sfd->remote_id);
1726 }
1727 return 0;
1728 }
1729 case WMSG_OPEN_RW_PIPE:
1730 case WMSG_OPEN_IW_PIPE:
1731 case WMSG_OPEN_IR_PIPE: {
1732 if ((ret = open_sfd(map, &sfd, remote_id)) < 0) {
1733 return ret;
1734 }
1735 sfd->type = FDC_PIPE;
1736
1737 int pipedes[2];
1738 if (type == WMSG_OPEN_RW_PIPE) {
1739 if (socketpair(AF_UNIX, SOCK_STREAM, 0, pipedes) ==
1740 -1) {
1741 wp_error("Failed to create a socketpair: %s",
1742 strerror(errno));
1743 return 0;
1744 }
1745 } else {
1746 if (pipe(pipedes) == -1) {
1747 wp_error("Failed to create a pipe: %s",
1748 strerror(errno));
1749 return 0;
1750 }
1751 }
1752
1753 /* We pass 'fd_local' to the client, although we only
1754 * read and write from pipe_fd if it exists. */
1755 if (type == WMSG_OPEN_IR_PIPE) {
1756 // Read end is 0; the other process writes
1757 sfd->fd_local = pipedes[1];
1758 sfd->pipe.fd = pipedes[0];
1759 sfd->pipe.can_read = true;
1760 sfd->pipe.remote_can_write = true;
1761 } else if (type == WMSG_OPEN_IW_PIPE) {
1762 // Write end is 1; the other process reads
1763 sfd->fd_local = pipedes[0];
1764 sfd->pipe.fd = pipedes[1];
1765 sfd->pipe.can_write = true;
1766 sfd->pipe.remote_can_read = true;
1767 } else { // FDC_PIPE_RW
1768 // Here, it doesn't matter which end is which
1769 sfd->fd_local = pipedes[0];
1770 sfd->pipe.fd = pipedes[1];
1771 sfd->pipe.can_read = true;
1772 sfd->pipe.can_write = true;
1773 sfd->pipe.remote_can_read = true;
1774 sfd->pipe.remote_can_write = true;
1775 }
1776
1777 if (set_nonblocking(sfd->pipe.fd) == -1) {
1778 wp_error("Failed to make private pipe end nonblocking: %s",
1779 strerror(errno));
1780 return 0;
1781 }
1782 return 0;
1783 }
1784 /* SFD update messages */
1785 case WMSG_EXTEND_FILE: {
1786 if ((ret = check_message_min_size(type, msg,
1787 sizeof(struct wmsg_open_file))) < 0) {
1788 return ret;
1789 }
1790 if ((ret = check_sfd_type(sfd, remote_id, type, FDC_FILE)) <
1791 0) {
1792 return ret;
1793 }
1794
1795 const struct wmsg_open_file *header =
1796 (const struct wmsg_open_file *)msg->data;
1797 if (header->file_size <= sfd->buffer_size) {
1798 wp_error("File extend message for RID=%d does not increase size %u %z",
1799 remote_id, header->file_size,
1800 sfd->buffer_size);
1801 return ERR_FATAL;
1802 }
1803
1804 if (ftruncate(sfd->fd_local, (off_t)header->file_size) == -1) {
1805 wp_error("Failed to resize file buffer: %s",
1806 strerror(errno));
1807 return 0;
1808 }
1809 increase_buffer_sizes(sfd, threads, (size_t)header->file_size);
1810 // the extension implies the remote buffer is at least as large
1811 sfd->remote_bufsize = sfd->buffer_size;
1812 return 0;
1813 }
1814 case WMSG_BUFFER_FILL: {
1815 if ((ret = check_message_min_size(type, msg,
1816 sizeof(struct wmsg_buffer_fill))) < 0) {
1817 return ret;
1818 }
1819 if ((ret = check_sfd_type_2(sfd, remote_id, type, FDC_FILE,
1820 FDC_DMABUF)) < 0) {
1821 return ret;
1822 }
1823 if (sfd->type == FDC_FILE && sfd->file_readonly) {
1824 wp_debug("Ignoring a fill update to readonly file at RID=%d",
1825 remote_id);
1826 return 0;
1827 }
1828
1829 const struct wmsg_buffer_fill *header =
1830 (const struct wmsg_buffer_fill *)msg->data;
1831
1832 size_t uncomp_size = header->end - header->start;
1833 struct thread_data *local = &threads->threads[0];
1834 if (buf_ensure_size((int)uncomp_size, 1, &local->tmp_size,
1835 &local->tmp_buf) == -1) {
1836 wp_error("Failed to expand temporary decompression buffer, dropping update");
1837 return 0;
1838 }
1839
1840 const char *act_buffer = NULL;
1841 size_t act_size = 0;
1842 uncompress_buffer(threads, &threads->threads[0].comp_ctx,
1843 msg->size - sizeof(struct wmsg_buffer_fill),
1844 msg->data + sizeof(struct wmsg_buffer_fill),
1845 uncomp_size, local->tmp_buf, &act_size,
1846 &act_buffer);
1847
1848 // `memsize+8*remote_nthreads` is the worst-case diff
1849 // expansion
1850 if (header->end > sfd->buffer_size) {
1851 wp_error("Transfer end overflow %" PRIu32 " > %zu",
1852 header->end, sfd->buffer_size);
1853 return ERR_FATAL;
1854 }
1855 if (act_size != header->end - header->start) {
1856 wp_error("Transfer size mismatch %zu %" PRIu32,
1857 act_size, header->end - header->start);
1858 return ERR_FATAL;
1859 }
1860 memcpy(sfd->mem_mirror + header->start, act_buffer,
1861 header->end - header->start);
1862
1863 void *handle = NULL;
1864 bool already_mapped = sfd->mem_local != NULL;
1865 if (sfd->type == FDC_DMABUF && !already_mapped) {
1866 uint32_t stride = 0, height = 0;
1867 sfd->mem_local = map_dmabuf(sfd->dmabuf_bo, true,
1868 &handle, &stride, &height);
1869 if (stride * height < sfd->buffer_size) {
1870 wp_error("DMABUF mapped with stride %" PRIu32
1871 " height %" PRIu32
1872 ", but expected ize=%zu>%zu, ignoring overlarge update",
1873 stride, height,
1874 sfd->buffer_size,
1875 (size_t)(stride * height));
1876 unmap_dmabuf(sfd->dmabuf_bo, handle);
1877 return 0;
1878 }
1879 }
1880 if (!sfd->mem_local) {
1881 wp_error("Failed to fill RID=%d, fd not mapped",
1882 sfd->remote_id);
1883 return 0;
1884 }
1885 memcpy(sfd->mem_local + header->start,
1886 sfd->mem_mirror + header->start,
1887 header->end - header->start);
1888
1889 if (sfd->type == FDC_DMABUF && !already_mapped) {
1890 sfd->mem_local = NULL;
1891 if (unmap_dmabuf(sfd->dmabuf_bo, handle) == -1) {
1892 return 0;
1893 }
1894 }
1895 return 0;
1896 }
1897 case WMSG_BUFFER_DIFF: {
1898 if ((ret = check_message_min_size(type, msg,
1899 sizeof(struct wmsg_buffer_diff))) < 0) {
1900 return ret;
1901 }
1902 if ((ret = check_sfd_type_2(sfd, remote_id, type, FDC_FILE,
1903 FDC_DMABUF)) < 0) {
1904 return ret;
1905 }
1906 if (sfd->type == FDC_FILE && sfd->file_readonly) {
1907 wp_debug("Ignoring a diff update to readonly file at RID=%d",
1908 remote_id);
1909 return 0;
1910 }
1911 const struct wmsg_buffer_diff *header =
1912 (const struct wmsg_buffer_diff *)msg->data;
1913
1914 struct thread_data *local = &threads->threads[0];
1915 if (buf_ensure_size((int)(header->diff_size +
1916 header->ntrailing),
1917 1, &local->tmp_size,
1918 &local->tmp_buf) == -1) {
1919 wp_error("Failed to expand temporary decompression buffer, dropping update");
1920 return 0;
1921 }
1922
1923 const char *act_buffer = NULL;
1924 size_t act_size = 0;
1925 uncompress_buffer(threads, &threads->threads[0].comp_ctx,
1926 msg->size - sizeof(struct wmsg_buffer_diff),
1927 msg->data + sizeof(struct wmsg_buffer_diff),
1928 header->diff_size + header->ntrailing,
1929 local->tmp_buf, &act_size, &act_buffer);
1930
1931 // `memsize+8*remote_nthreads` is the worst-case diff
1932 // expansion
1933 if (act_size != header->diff_size + header->ntrailing) {
1934 wp_error("Transfer size mismatch %zu %u", act_size,
1935 header->diff_size + header->ntrailing);
1936 return ERR_FATAL;
1937 }
1938
1939 void *handle = NULL;
1940 bool already_mapped = sfd->mem_local != NULL;
1941 if (sfd->type == FDC_DMABUF && !already_mapped) {
1942 uint32_t stride = 0, height = 0;
1943 sfd->mem_local = map_dmabuf(sfd->dmabuf_bo, true,
1944 &handle, &stride, &height);
1945 if (stride * height < sfd->buffer_size) {
1946 wp_error("DMABUF mapped with stride %" PRIu32
1947 " height %" PRIu32
1948 ", but expected size=%zu>%zu, ignoring overlarge update",
1949 stride, height,
1950 sfd->buffer_size,
1951 (size_t)(stride * height));
1952 unmap_dmabuf(sfd->dmabuf_bo, handle);
1953 return 0;
1954 }
1955 }
1956 if (!sfd->mem_local) {
1957 wp_error("Failed to apply diff to RID=%d, fd not mapped",
1958 sfd->remote_id);
1959 return 0;
1960 }
1961
1962 DTRACE_PROBE2(waypipe, apply_diff_enter, sfd->buffer_size,
1963 header->diff_size);
1964 apply_diff(sfd->buffer_size, sfd->mem_mirror, sfd->mem_local,
1965 header->diff_size, header->ntrailing,
1966 act_buffer);
1967 DTRACE_PROBE(waypipe, apply_diff_exit);
1968
1969 if (sfd->type == FDC_DMABUF && !already_mapped) {
1970 sfd->mem_local = NULL;
1971 if (unmap_dmabuf(sfd->dmabuf_bo, handle) == -1) {
1972 return 0;
1973 }
1974 }
1975
1976 return 0;
1977 }
1978 case WMSG_PIPE_TRANSFER: {
1979 if ((ret = check_sfd_type(sfd, remote_id, type, FDC_PIPE)) <
1980 0) {
1981 return ret;
1982 }
1983 if (!sfd->pipe.can_write || sfd->pipe.pending_w_shutdown) {
1984 wp_debug("Discarding transfer to pipe RID=%d, because pipe cannot be written to",
1985 remote_id);
1986 return 0;
1987 }
1988
1989 size_t transf_data_sz = msg->size - sizeof(struct wmsg_basic);
1990
1991 int netsize = sfd->pipe.send.used + (int)transf_data_sz;
1992 if (buf_ensure_size(netsize, 1, &sfd->pipe.send.size,
1993 (void **)&sfd->pipe.send.data) == -1) {
1994 wp_error("Failed to expand pipe transfer buffer, dropping data");
1995 return 0;
1996 }
1997
1998 memcpy(sfd->pipe.send.data + sfd->pipe.send.used,
1999 msg->data + sizeof(struct wmsg_basic),
2000 transf_data_sz);
2001 sfd->pipe.send.used = netsize;
2002
2003 // The pipe itself will be flushed/or closed later by
2004 // flush_writable_pipes
2005 sfd->pipe.writable = true;
2006 return 0;
2007 }
2008 case WMSG_PIPE_SHUTDOWN_R: {
2009 if ((ret = check_sfd_type(sfd, remote_id, type, FDC_PIPE)) <
2010 0) {
2011 return ret;
2012 }
2013 sfd->pipe.remote_can_write = false;
2014 if (!sfd->pipe.can_read) {
2015 wp_debug("Discarding read shutdown to pipe RID=%d, which cannot read",
2016 remote_id);
2017 return 0;
2018 }
2019 pipe_close_read(sfd);
2020 return 0;
2021 }
2022 case WMSG_PIPE_SHUTDOWN_W: {
2023 if ((ret = check_sfd_type(sfd, remote_id, type, FDC_PIPE)) <
2024 0) {
2025 return ret;
2026 }
2027 sfd->pipe.remote_can_read = false;
2028 if (!sfd->pipe.can_write) {
2029 wp_debug("Discarding write shutdown to pipe RID=%d, which cannot write",
2030 remote_id);
2031 return 0;
2032 }
2033 if (sfd->pipe.send.used <= 0) {
2034 pipe_close_write(sfd);
2035 } else {
2036 /* Shutdown as soon as the current data has been written
2037 */
2038 sfd->pipe.pending_w_shutdown = true;
2039 }
2040
2041 return 0;
2042 }
2043 case WMSG_SEND_DMAVID_PACKET: {
2044 if ((ret = check_sfd_type(sfd, remote_id, type,
2045 FDC_DMAVID_IW)) < 0) {
2046 return ret;
2047 }
2048 if (!sfd->dmabuf_bo) {
2049 wp_error("Applying update to nonexistent dma buffer object rid=%d",
2050 sfd->remote_id);
2051 return 0;
2052 }
2053 struct bytebuf data = {
2054 .data = msg->data + sizeof(struct wmsg_basic),
2055 .size = msg->size - sizeof(struct wmsg_basic)};
2056 apply_video_packet(sfd, render, &data);
2057 return 0;
2058 }
2059 };
2060 /* all returns should happen inside switch, so none here */
2061 }
2062
shadow_decref_protocol(struct shadow_fd * sfd)2063 bool shadow_decref_protocol(struct shadow_fd *sfd)
2064 {
2065 sfd->refcount.protocol--;
2066 return destroy_shadow_if_unreferenced(sfd);
2067 }
2068
shadow_decref_transfer(struct shadow_fd * sfd)2069 bool shadow_decref_transfer(struct shadow_fd *sfd)
2070 {
2071 sfd->refcount.transfer--;
2072 if (sfd->refcount.transfer == 0 && sfd->type == FDC_PIPE) {
2073 /* fd_local has been transferred for the last time, so close
2074 * it and make it match pipe.fd, just as on the side where
2075 * the original pipe was introduced */
2076 if (sfd->pipe.fd != sfd->fd_local) {
2077 checked_close(sfd->fd_local);
2078 sfd->fd_local = sfd->pipe.fd;
2079 }
2080 }
2081 return destroy_shadow_if_unreferenced(sfd);
2082 }
shadow_incref_protocol(struct shadow_fd * sfd)2083 struct shadow_fd *shadow_incref_protocol(struct shadow_fd *sfd)
2084 {
2085 sfd->has_owner = true;
2086 sfd->refcount.protocol++;
2087 return sfd;
2088 }
shadow_incref_transfer(struct shadow_fd * sfd)2089 struct shadow_fd *shadow_incref_transfer(struct shadow_fd *sfd)
2090 {
2091 sfd->has_owner = true;
2092 if (sfd->type == FDC_PIPE && sfd->refcount.transfer == 0) {
2093 wp_error("The other pipe end may have been closed");
2094 }
2095 sfd->refcount.transfer++;
2096 return sfd;
2097 }
decref_transferred_fds(struct fd_translation_map * map,int nfds,int fds[])2098 void decref_transferred_fds(struct fd_translation_map *map, int nfds, int fds[])
2099 {
2100 for (int i = 0; i < nfds; i++) {
2101 struct shadow_fd *sfd = get_shadow_for_local_fd(map, fds[i]);
2102 shadow_decref_transfer(sfd);
2103 }
2104 }
decref_transferred_rids(struct fd_translation_map * map,int nids,int ids[])2105 void decref_transferred_rids(
2106 struct fd_translation_map *map, int nids, int ids[])
2107 {
2108 for (int i = 0; i < nids; i++) {
2109 struct shadow_fd *sfd = get_shadow_for_rid(map, ids[i]);
2110 shadow_decref_transfer(sfd);
2111 }
2112 }
2113
count_npipes(const struct fd_translation_map * map)2114 int count_npipes(const struct fd_translation_map *map)
2115 {
2116 int np = 0;
2117 for (struct shadow_fd_link *lcur = map->link.l_next,
2118 *lnxt = lcur->l_next;
2119 lcur != &map->link; lcur = lnxt, lnxt = lcur->l_next) {
2120 struct shadow_fd *cur = (struct shadow_fd *)lcur;
2121 if (cur->type == FDC_PIPE) {
2122 np++;
2123 }
2124 }
2125 return np;
2126 }
fill_with_pipes(const struct fd_translation_map * map,struct pollfd * pfds,bool check_read)2127 int fill_with_pipes(const struct fd_translation_map *map, struct pollfd *pfds,
2128 bool check_read)
2129 {
2130 int np = 0;
2131 for (struct shadow_fd_link *lcur = map->link.l_next,
2132 *lnxt = lcur->l_next;
2133 lcur != &map->link; lcur = lnxt, lnxt = lcur->l_next) {
2134 struct shadow_fd *cur = (struct shadow_fd *)lcur;
2135 if (cur->type == FDC_PIPE && cur->pipe.fd != -1) {
2136 pfds[np].fd = cur->pipe.fd;
2137 pfds[np].events = 0;
2138 if (check_read && cur->pipe.readable) {
2139 pfds[np].events |= POLLIN;
2140 }
2141 if (cur->pipe.send.used > 0) {
2142 pfds[np].events |= POLLOUT;
2143 }
2144 np++;
2145 }
2146 }
2147 return np;
2148 }
2149
get_shadow_for_pipe_fd(struct fd_translation_map * map,int pipefd)2150 static struct shadow_fd *get_shadow_for_pipe_fd(
2151 struct fd_translation_map *map, int pipefd)
2152 {
2153 for (struct shadow_fd_link *lcur = map->link.l_next,
2154 *lnxt = lcur->l_next;
2155 lcur != &map->link; lcur = lnxt, lnxt = lcur->l_next) {
2156 struct shadow_fd *cur = (struct shadow_fd *)lcur;
2157 if (cur->type == FDC_PIPE && cur->pipe.fd == pipefd) {
2158 return cur;
2159 }
2160 }
2161 return NULL;
2162 }
2163
mark_pipe_object_statuses(struct fd_translation_map * map,int nfds,struct pollfd * pfds)2164 void mark_pipe_object_statuses(
2165 struct fd_translation_map *map, int nfds, struct pollfd *pfds)
2166 {
2167 for (int i = 0; i < nfds; i++) {
2168 int lfd = pfds[i].fd;
2169 struct shadow_fd *sfd = get_shadow_for_pipe_fd(map, lfd);
2170 if (!sfd) {
2171 wp_error("Failed to find shadow struct for .pipe_fd=%d",
2172 lfd);
2173 continue;
2174 }
2175 if (pfds[i].revents & POLLIN || pfds[i].revents & POLLHUP) {
2176 /* In */
2177 sfd->pipe.readable = true;
2178 }
2179 if (pfds[i].revents & POLLOUT) {
2180 sfd->pipe.writable = true;
2181 }
2182 }
2183 }
2184
flush_writable_pipes(struct fd_translation_map * map)2185 void flush_writable_pipes(struct fd_translation_map *map)
2186 {
2187 for (struct shadow_fd_link *lcur = map->link.l_next,
2188 *lnxt = lcur->l_next;
2189 lcur != &map->link; lcur = lnxt, lnxt = lcur->l_next) {
2190 struct shadow_fd *sfd = (struct shadow_fd *)lcur;
2191 if (sfd->type != FDC_PIPE || !sfd->pipe.writable ||
2192 sfd->pipe.send.used <= 0) {
2193 continue;
2194 }
2195
2196 sfd->pipe.writable = false;
2197 wp_debug("Flushing %zd bytes into RID=%d", sfd->pipe.send.used,
2198 sfd->remote_id);
2199 ssize_t changed = write(sfd->pipe.fd, sfd->pipe.send.data,
2200 (size_t)sfd->pipe.send.used);
2201
2202 if (changed == -1 &&
2203 (errno == EAGAIN || errno == EWOULDBLOCK)) {
2204 wp_debug("Writing to pipe RID=%d would block",
2205 sfd->remote_id);
2206 continue;
2207 } else if (changed == -1 &&
2208 (errno == EPIPE || errno == EBADF)) {
2209 /* No process has access to the other end of the pipe,
2210 * or the file descriptor is otherwise permanently
2211 * unwriteable */
2212 pipe_close_write(sfd);
2213 } else if (changed == -1) {
2214 wp_error("Failed to write into pipe with remote_id=%d: %s",
2215 sfd->remote_id, strerror(errno));
2216 } else {
2217 wp_debug("Wrote %zd more bytes into pipe RID=%d",
2218 changed, sfd->remote_id);
2219 sfd->pipe.send.used -= (int)changed;
2220 if (sfd->pipe.send.used > 0) {
2221 memmove(sfd->pipe.send.data,
2222 sfd->pipe.send.data + changed,
2223 (size_t)sfd->pipe.send.used);
2224 }
2225 if (sfd->pipe.send.used <= 0 &&
2226 sfd->pipe.pending_w_shutdown) {
2227 /* A shutdown request was made, but can only be
2228 * applied now that the write buffer has been
2229 * cleared */
2230 pipe_close_write(sfd);
2231 sfd->pipe.pending_w_shutdown = false;
2232 }
2233 }
2234 }
2235 /* Destroy any new unreferenced objects */
2236 for (struct shadow_fd_link *lcur = map->link.l_next,
2237 *lnxt = lcur->l_next;
2238 lcur != &map->link; lcur = lnxt, lnxt = lcur->l_next) {
2239 struct shadow_fd *cur = (struct shadow_fd *)lcur;
2240 destroy_shadow_if_unreferenced(cur);
2241 }
2242 }
read_readable_pipes(struct fd_translation_map * map)2243 void read_readable_pipes(struct fd_translation_map *map)
2244 {
2245 for (struct shadow_fd_link *lcur = map->link.l_next,
2246 *lnxt = lcur->l_next;
2247 lcur != &map->link; lcur = lnxt, lnxt = lcur->l_next) {
2248 struct shadow_fd *sfd = (struct shadow_fd *)lcur;
2249 if (sfd->type != FDC_PIPE || !sfd->pipe.readable) {
2250 continue;
2251 }
2252
2253 if (sfd->pipe.recv.size == 0) {
2254 sfd->pipe.recv.size = 32768;
2255 sfd->pipe.recv.data =
2256 malloc((size_t)sfd->pipe.recv.size);
2257 }
2258 if (sfd->pipe.recv.size > sfd->pipe.recv.used) {
2259 sfd->pipe.readable = false;
2260 ssize_t changed = read(sfd->pipe.fd,
2261 sfd->pipe.recv.data +
2262 sfd->pipe.recv.used,
2263 (size_t)(sfd->pipe.recv.size -
2264 sfd->pipe.recv.used));
2265 if (changed == 0) {
2266 /* No process has access to the other end of the
2267 * pipe */
2268 pipe_close_read(sfd);
2269 } else if (changed == -1 &&
2270 (errno == EAGAIN ||
2271 errno == EWOULDBLOCK)) {
2272 wp_debug("Reading from pipe RID=%d would block",
2273 sfd->remote_id);
2274 continue;
2275 } else if (changed == -1) {
2276 wp_error("Failed to read from pipe with remote_id=%d: %s",
2277 sfd->remote_id,
2278 strerror(errno));
2279 } else {
2280 wp_debug("Read %zd more bytes from pipe RID=%d",
2281 changed, sfd->remote_id);
2282 sfd->pipe.recv.used += (int)changed;
2283 }
2284 }
2285 }
2286
2287 /* Destroy any new unreferenced objects */
2288 for (struct shadow_fd_link *lcur = map->link.l_next,
2289 *lnxt = lcur->l_next;
2290 lcur != &map->link; lcur = lnxt, lnxt = lcur->l_next) {
2291 struct shadow_fd *cur = (struct shadow_fd *)lcur;
2292 destroy_shadow_if_unreferenced(cur);
2293 }
2294 }
2295
extend_shm_shadow(struct fd_translation_map * map,struct thread_pool * threads,struct shadow_fd * sfd,size_t new_size)2296 void extend_shm_shadow(struct fd_translation_map *map,
2297 struct thread_pool *threads, struct shadow_fd *sfd,
2298 size_t new_size)
2299 {
2300 if (sfd->buffer_size >= new_size) {
2301 return;
2302 }
2303
2304 // Verify that the file size actually increased
2305 struct stat st;
2306 int fs = fstat(sfd->fd_local, &st);
2307 if (fs == -1) {
2308 wp_error("Checking file size failed: %s", strerror(errno));
2309 return;
2310 }
2311 if ((size_t)st.st_size < new_size) {
2312 wp_error("Trying to resize file larger (%d) than the actual file size (%d), ignoring",
2313 (int)new_size, (int)st.st_size);
2314 return;
2315 }
2316
2317 increase_buffer_sizes(sfd, threads, new_size);
2318 (void)map;
2319
2320 // leave `sfd->remote_bufsize` unchanged, and mark dirty
2321 sfd->is_dirty = true;
2322 }
2323
run_task(struct task_data * task,struct thread_data * local)2324 void run_task(struct task_data *task, struct thread_data *local)
2325 {
2326 if (task->type == TASK_COMPRESS_BLOCK) {
2327 worker_run_compress_block(task, local);
2328 } else if (task->type == TASK_COMPRESS_DIFF) {
2329 worker_run_compress_diff(task, local);
2330 } else {
2331 wp_error("Unidentified task type");
2332 }
2333 }
2334
start_parallel_work(struct thread_pool * pool,struct thread_msg_recv_buf * recv_queue)2335 int start_parallel_work(struct thread_pool *pool,
2336 struct thread_msg_recv_buf *recv_queue)
2337 {
2338 pthread_mutex_lock(&pool->work_mutex);
2339 if (recv_queue->zone_start != recv_queue->zone_end) {
2340 wp_error("Some async messages not yet sent");
2341 }
2342 recv_queue->zone_start = 0;
2343 recv_queue->zone_end = 0;
2344 int num_mt_tasks = pool->stack_count;
2345 if (buf_ensure_size(num_mt_tasks, sizeof(struct iovec),
2346 &recv_queue->size,
2347 (void **)&recv_queue->data) == -1) {
2348 wp_error("Failed to provide enough space for receive queue, skipping all work tasks");
2349 num_mt_tasks = 0;
2350 }
2351 pool->do_work = num_mt_tasks > 0;
2352
2353 /* Start the work tasks here */
2354 if (num_mt_tasks > 0) {
2355 pthread_cond_broadcast(&pool->work_cond);
2356 }
2357 pthread_mutex_unlock(&pool->work_mutex);
2358
2359 return num_mt_tasks;
2360 }
2361
request_work_task(struct thread_pool * pool,struct task_data * task,bool * is_done)2362 bool request_work_task(
2363 struct thread_pool *pool, struct task_data *task, bool *is_done)
2364 {
2365 pthread_mutex_lock(&pool->work_mutex);
2366 *is_done = pool->stack_count == 0 && pool->tasks_in_progress == 0;
2367 bool has_task = false;
2368 if (pool->stack_count > 0 && pool->do_work) {
2369 int i = pool->stack_count - 1;
2370 if (pool->stack[i].type != TASK_STOP) {
2371 *task = pool->stack[i];
2372 has_task = true;
2373 pool->stack_count--;
2374 pool->tasks_in_progress++;
2375 if (pool->stack_count <= 0) {
2376 pool->do_work = false;
2377 }
2378 }
2379 }
2380 pthread_mutex_unlock(&pool->work_mutex);
2381 return has_task;
2382 }
2383
worker_thread_main(void * arg)2384 static void *worker_thread_main(void *arg)
2385 {
2386 struct thread_data *data = arg;
2387 struct thread_pool *pool = data->pool;
2388
2389 /* The loop is globally locked by default, and only unlocked in
2390 * pthread_cond_wait. Yes, there are fancier and faster schemes.
2391 */
2392 pthread_mutex_lock(&pool->work_mutex);
2393 while (1) {
2394 while (!pool->do_work) {
2395 pthread_cond_wait(&pool->work_cond, &pool->work_mutex);
2396 }
2397 if (pool->stack_count <= 0) {
2398 pool->do_work = false;
2399 continue;
2400 }
2401 /* Copy task, since the queue may be resized */
2402 int i = pool->stack_count - 1;
2403 struct task_data task = pool->stack[i];
2404 if (task.type == TASK_STOP) {
2405 break;
2406 }
2407 pool->tasks_in_progress++;
2408 pool->stack_count--;
2409 if (pool->stack_count <= 0) {
2410 pool->do_work = false;
2411 }
2412 pthread_mutex_unlock(&pool->work_mutex);
2413 run_task(&task, data);
2414 pthread_mutex_lock(&pool->work_mutex);
2415
2416 uint8_t triv = 0;
2417 pool->tasks_in_progress--;
2418 if (write(pool->selfpipe_w, &triv, 1) == -1) {
2419 wp_error("Failed to write to self-pipe");
2420 }
2421 }
2422 pthread_mutex_unlock(&pool->work_mutex);
2423
2424 return NULL;
2425 }
2426