1 /*
2 * Copyright © 2019 Manuel Stoeckl
3 *
4 * Permission is hereby granted, free of charge, to any person obtaining
5 * a copy of this software and associated documentation files (the
6 * "Software"), to deal in the Software without restriction, including
7 * without limitation the rights to use, copy, modify, merge, publish,
8 * distribute, sublicense, and/or sell copies of the Software, and to
9 * permit persons to whom the Software is furnished to do so, subject to
10 * the following conditions:
11 *
12 * The above copyright notice and this permission notice (including the
13 * next paragraph) shall be included in all copies or substantial
14 * portions of the Software.
15 *
16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
17 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
18 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
19 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
20 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
21 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
22 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
23 * SOFTWARE.
24 */
25
26 #include "main.h"
27
28 #include <errno.h>
29 #include <fcntl.h>
30 #include <inttypes.h>
31 #include <limits.h>
32 #include <poll.h>
33 #include <stdlib.h>
34 #include <string.h>
35 #include <sys/socket.h>
36 #include <sys/uio.h>
37 #include <unistd.h>
38
39 // The maximum number of fds libwayland can recvmsg at once
40 #define MAX_LIBWAY_FDS 28
iovec_read(int conn,char * buf,size_t buflen,struct int_window * fds)41 static ssize_t iovec_read(
42 int conn, char *buf, size_t buflen, struct int_window *fds)
43 {
44 char cmsgdata[(CMSG_LEN(MAX_LIBWAY_FDS * sizeof(int32_t)))];
45 struct iovec the_iovec;
46 the_iovec.iov_len = buflen;
47 the_iovec.iov_base = buf;
48 struct msghdr msg;
49 msg.msg_name = NULL;
50 msg.msg_namelen = 0;
51 msg.msg_iov = &the_iovec;
52 msg.msg_iovlen = 1;
53 msg.msg_control = &cmsgdata;
54 msg.msg_controllen = sizeof(cmsgdata);
55 msg.msg_flags = 0;
56 ssize_t ret = recvmsg(conn, &msg, 0);
57
58 // Read cmsg
59 struct cmsghdr *header = CMSG_FIRSTHDR(&msg);
60 while (header) {
61 struct cmsghdr *nxt_hdr = CMSG_NXTHDR(&msg, header);
62 if (header->cmsg_level != SOL_SOCKET ||
63 header->cmsg_type != SCM_RIGHTS) {
64 header = nxt_hdr;
65 continue;
66 }
67
68 int *data = (int *)CMSG_DATA(header);
69 int nf = (int)((header->cmsg_len - CMSG_LEN(0)) / sizeof(int));
70
71 if (buf_ensure_size(fds->zone_end + nf, sizeof(int), &fds->size,
72 (void **)&fds->data) == -1) {
73 wp_error("Failed to allocate space for new fds");
74 errno = ENOMEM;
75 ret = -1;
76 } else {
77 for (int i = 0; i < nf; i++) {
78 fds->data[fds->zone_end++] = data[i];
79 }
80 }
81
82 header = nxt_hdr;
83 }
84 return ret;
85 }
86
iovec_write(int conn,const char * buf,size_t buflen,const int * fds,int numfds,int * nfds_written)87 static ssize_t iovec_write(int conn, const char *buf, size_t buflen,
88 const int *fds, int numfds, int *nfds_written)
89 {
90 bool overflow = numfds > MAX_LIBWAY_FDS;
91
92 struct iovec the_iovec;
93 the_iovec.iov_len = overflow ? 1 : buflen;
94 the_iovec.iov_base = (char *)buf;
95 struct msghdr msg;
96 msg.msg_name = NULL;
97 msg.msg_namelen = 0;
98 msg.msg_iov = &the_iovec;
99 msg.msg_iovlen = 1;
100 msg.msg_control = NULL;
101 msg.msg_controllen = 0;
102 msg.msg_flags = 0;
103
104 union {
105 char buf[CMSG_SPACE(sizeof(int) * MAX_LIBWAY_FDS)];
106 struct cmsghdr align;
107 } uc;
108 memset(uc.buf, 0, sizeof(uc.buf));
109
110 if (numfds > 0) {
111 msg.msg_control = uc.buf;
112 msg.msg_controllen = sizeof(uc.buf);
113 struct cmsghdr *frst = CMSG_FIRSTHDR(&msg);
114 frst->cmsg_level = SOL_SOCKET;
115 frst->cmsg_type = SCM_RIGHTS;
116 *nfds_written = min(numfds, MAX_LIBWAY_FDS);
117 size_t nwritten = (size_t)(*nfds_written);
118 memcpy(CMSG_DATA(frst), fds, nwritten * sizeof(int));
119 for (int i = 0; i < numfds; i++) {
120 int flags = fcntl(fds[i], F_GETFL, 0);
121 if (flags == -1 && errno == EBADF) {
122 wp_error("Writing invalid fd %d", fds[i]);
123 }
124 }
125
126 frst->cmsg_len = CMSG_LEN(nwritten * sizeof(int));
127 msg.msg_controllen = CMSG_SPACE(nwritten * sizeof(int));
128 wp_debug("Writing %d fds to cmsg data", *nfds_written);
129 } else {
130 *nfds_written = 0;
131 }
132
133 ssize_t ret = sendmsg(conn, &msg, 0);
134 return ret;
135 }
136
translate_fds(struct fd_translation_map * map,struct render_data * render,int nfds,const int fds[],int ids[])137 static int translate_fds(struct fd_translation_map *map,
138 struct render_data *render, int nfds, const int fds[],
139 int ids[])
140 {
141 for (int i = 0; i < nfds; i++) {
142 struct shadow_fd *sfd = get_shadow_for_local_fd(map, fds[i]);
143 if (!sfd) {
144 /* Autodetect type + create shadow fd */
145 size_t fdsz = 0;
146 enum fdcat fdtype = get_fd_type(fds[i], &fdsz);
147 sfd = translate_fd(map, render, fds[i], fdtype, fdsz,
148 NULL, false, false);
149 }
150 if (sfd) {
151 ids[i] = sfd->remote_id;
152 } else {
153 return -1;
154 }
155 }
156 return 0;
157 }
158 /** Given a list of global ids, and an up-to-date translation map, produce local
159 * file descriptors */
untranslate_ids(struct fd_translation_map * map,int nids,const int * ids,int * fds)160 static void untranslate_ids(struct fd_translation_map *map, int nids,
161 const int *ids, int *fds)
162 {
163 for (int i = 0; i < nids; i++) {
164 struct shadow_fd *shadow = get_shadow_for_rid(map, ids[i]);
165 if (!shadow) {
166 wp_error("Could not untranslate remote id %d in map. Application will probably crash.",
167 ids[i]);
168 fds[i] = -1;
169 } else {
170 fds[i] = shadow->fd_local;
171 }
172 }
173 }
174
175 enum wm_state { WM_WAITING_FOR_PROGRAM, WM_WAITING_FOR_CHANNEL, WM_TERMINAL };
176 /** This state corresponds to the in-progress transfer from the program
177 * (compositor or application) and its pipes/buffers to the channel. */
178 struct way_msg_state {
179 enum wm_state state;
180
181 /** Window zone contains the message data which has been read
182 * but not yet parsed/copied to proto_write */
183 struct char_window proto_read;
184 /** Buffer of complete protocol messages to be written to the channel */
185 struct char_window proto_write;
186
187 /** Queue of fds to be used by protocol parser */
188 struct int_window fds;
189
190 /** Individual messages, to be sent out via writev and deleted on
191 * acknowledgement */
192 struct transfer_queue transfers;
193 /** bytes written in this cycle, for debug */
194 int total_written;
195 /** Maximum chunk size to writev at once*/
196 int max_iov;
197
198 /** Transfers to send after the compute queue is empty */
199 int ntrailing;
200 struct iovec trailing[3];
201
202 /** Statically allocated message acknowledgement messages; due
203 * to the way they are updated out of order, at most two are needed */
204 struct wmsg_ack ack_msgs[2];
205 };
206
207 enum cm_state { CM_WAITING_FOR_PROGRAM, CM_WAITING_FOR_CHANNEL, CM_TERMINAL };
208 /** This state corresponds to the in-progress transfer from the channel
209 * to the program and the buffers/pipes on which will be written. */
210 struct chan_msg_state {
211 enum cm_state state;
212
213 /** Edited protocol data which is being written to the program */
214 struct char_window proto_write;
215
216 /**< FDs that should immediately be transferred to the program */
217 struct int_window transf_fds;
218 /**< FD queue for the protocol parser */
219 struct int_window proto_fds;
220
221 #define RECV_GOAL_READ_SIZE 131072
222 char *recv_buffer; // ring-like buffer for message data
223 size_t recv_size;
224 size_t recv_start; // (recv_buffer+rev_start) should be a message header
225 size_t recv_end; // last byte read from channel, always >=recv_start
226 int recv_unhandled_messages; // number of messages to parse
227 };
228
229 /** State used by both forward and reverse messages */
230 struct cross_state {
231 /* Which was the last received message received from the other
232 * application, for which acknowledgement was sent? */
233 uint32_t last_acked_msgno;
234 /* Which was the last message number received from the other
235 * application? */
236 uint32_t last_received_msgno;
237 /* What was the highest number message received from the other
238 * application? (matches last_received, unless we needed a restart */
239 uint32_t newest_received_msgno;
240 /* Which was the last message number sent to the other application which
241 * was acknowledged by that side? */
242 uint32_t last_confirmed_msgno;
243 };
244
interpret_chanmsg(struct chan_msg_state * cmsg,struct cross_state * cxs,struct globals * g,bool display_side,char * packet)245 static int interpret_chanmsg(struct chan_msg_state *cmsg,
246 struct cross_state *cxs, struct globals *g, bool display_side,
247 char *packet)
248 {
249 uint32_t size_and_type = *(uint32_t *)packet;
250 size_t unpadded_size = transfer_size(size_and_type);
251 enum wmsg_type type = transfer_type(size_and_type);
252 if (type == WMSG_CLOSE) {
253 /* No new messages from the channel to the program will be
254 * allowed after this */
255 cmsg->state = CM_TERMINAL;
256
257 wp_debug("Other side has closed");
258 if (unpadded_size < 8) {
259 return ERR_FATAL;
260 }
261 int32_t code = ((int32_t *)packet)[1];
262 if (code == ERR_FATAL) {
263 return ERR_FATAL;
264 } else if (code == ERR_NOMEM) {
265 return ERR_NOMEM;
266 } else {
267 return ERR_STOP;
268 }
269 } else if (type == WMSG_RESTART) {
270 struct wmsg_restart *ackm = (struct wmsg_restart *)packet;
271 wp_debug("Received restart message: remote last saw ack %d (we last recvd %d, acked %d)",
272 ackm->last_ack_received,
273 cxs->last_received_msgno,
274 cxs->last_acked_msgno);
275 cxs->last_received_msgno = ackm->last_ack_received;
276 return 0;
277 } else if (type == WMSG_ACK_NBLOCKS) {
278 struct wmsg_ack *ackm = (struct wmsg_ack *)packet;
279 if (msgno_gt(ackm->messages_received,
280 cxs->last_received_msgno)) {
281 cxs->last_confirmed_msgno = ackm->messages_received;
282 }
283 return 0;
284 } else {
285 cxs->last_received_msgno++;
286 if (msgno_gt(cxs->newest_received_msgno,
287 cxs->last_received_msgno)) {
288 /* Skip packet, as we already received it */
289 wp_debug("Ignoring replayed message %d (newest=%d)",
290 cxs->last_received_msgno,
291 cxs->newest_received_msgno);
292 return 0;
293 }
294 cxs->newest_received_msgno = cxs->last_received_msgno;
295 }
296
297 if (type == WMSG_INJECT_RIDS) {
298 const int32_t *fds = &((const int32_t *)packet)[1];
299 int nfds = (int)((unpadded_size - sizeof(uint32_t)) /
300 sizeof(int32_t));
301
302 if (buf_ensure_size(nfds, sizeof(int), &cmsg->transf_fds.size,
303 (void **)&cmsg->transf_fds.data) == -1) {
304 wp_error("Allocation failure for fd transfer queue, expect a crash");
305 return ERR_NOMEM;
306 }
307 /* Reset transfer buffer; all fds in here were already sent */
308 cmsg->transf_fds.zone_start = 0;
309 cmsg->transf_fds.zone_end = nfds;
310 untranslate_ids(&g->map, nfds, fds, cmsg->transf_fds.data);
311 if (nfds > 0) {
312 if (buf_ensure_size(cmsg->proto_fds.zone_end + nfds,
313 sizeof(int), &cmsg->proto_fds.size,
314 (void **)&cmsg->proto_fds.data) ==
315 -1) {
316 wp_error("Allocation failure for fd protocol queue");
317 return ERR_NOMEM;
318 }
319
320 // Append the new file descriptors to the parsing queue
321 memcpy(cmsg->proto_fds.data + cmsg->proto_fds.zone_end,
322 cmsg->transf_fds.data,
323 sizeof(int) * (size_t)nfds);
324 cmsg->proto_fds.zone_end += nfds;
325 }
326 return 0;
327 } else if (type == WMSG_PROTOCOL) {
328 /* While by construction, the provided message buffer should be
329 * aligned with individual message boundaries, it is not
330 * guaranteed that all file descriptors provided will be used by
331 * the messages. This makes fd handling more complicated. */
332 int protosize = (int)(unpadded_size - sizeof(uint32_t));
333 // TODO: have message editing routines ensure size, so
334 // that this limit can be tighter
335 if (buf_ensure_size(protosize + 1024, 1,
336 &cmsg->proto_write.size,
337 (void **)&cmsg->proto_write.data) == -1) {
338 wp_error("Allocation failure for message workspace");
339 return ERR_NOMEM;
340 }
341 cmsg->proto_write.zone_end = 0;
342 cmsg->proto_write.zone_start = 0;
343
344 struct char_window src;
345 src.data = packet + sizeof(uint32_t);
346 src.zone_start = 0;
347 src.zone_end = protosize;
348 src.size = protosize;
349 parse_and_prune_messages(g, display_side, display_side, &src,
350 &cmsg->proto_write, &cmsg->proto_fds);
351 if (src.zone_start != src.zone_end) {
352 wp_error("did not expect partial messages over channel, only parsed %d/%d bytes",
353 src.zone_start, src.zone_end);
354 return ERR_FATAL;
355 }
356 /* Update file descriptor queue */
357 if (cmsg->proto_fds.zone_end > cmsg->proto_fds.zone_start) {
358 memmove(cmsg->proto_fds.data,
359 cmsg->proto_fds.data +
360 cmsg->proto_fds.zone_start,
361 sizeof(int) * (size_t)(cmsg->proto_fds.zone_end >
362 cmsg->proto_fds.zone_start));
363 cmsg->proto_fds.zone_end -= cmsg->proto_fds.zone_start;
364 }
365 return 0;
366 } else {
367 if (unpadded_size < sizeof(struct wmsg_basic)) {
368 wp_error("Message is too small to contain header+RID, %d bytes",
369 unpadded_size);
370 return ERR_FATAL;
371 }
372 const struct wmsg_basic *op_header =
373 (const struct wmsg_basic *)packet;
374 struct bytebuf msg = {
375 .data = packet,
376 .size = unpadded_size,
377 };
378 wp_debug("Received %s for RID=%d (len %d)",
379 wmsg_type_to_str(type), op_header->remote_id,
380 unpadded_size);
381 return apply_update(&g->map, &g->threads, &g->render, type,
382 op_header->remote_id, &msg);
383 }
384 }
385
advance_chanmsg_chanread(struct chan_msg_state * cmsg,struct cross_state * cxs,int chanfd,bool display_side,struct globals * g)386 static int advance_chanmsg_chanread(struct chan_msg_state *cmsg,
387 struct cross_state *cxs, int chanfd, bool display_side,
388 struct globals *g)
389 {
390 /* Setup read operation to be able to read a minimum number of bytes,
391 * wrapping around as early as overlap conditions permit */
392 if (cmsg->recv_unhandled_messages == 0) {
393 struct iovec vec[2];
394 memset(vec, 0, sizeof(vec));
395 int nvec;
396 if (cmsg->recv_start == cmsg->recv_end) {
397 /* A fresh packet */
398 cmsg->recv_start = 0;
399 cmsg->recv_end = 0;
400 nvec = 1;
401 vec[0].iov_base = cmsg->recv_buffer;
402 vec[0].iov_len = (size_t)(cmsg->recv_size / 2);
403 } else if (cmsg->recv_end <
404 cmsg->recv_start + sizeof(uint32_t)) {
405 /* Didn't quite finish reading the header */
406 int recvsz = (int)cmsg->recv_size;
407 if (buf_ensure_size((int)cmsg->recv_end +
408 RECV_GOAL_READ_SIZE,
409 1, &recvsz,
410 (void **)&cmsg->recv_buffer) ==
411 -1) {
412 wp_error("Allocation failure, resizing receive buffer failed");
413 return ERR_NOMEM;
414 }
415 cmsg->recv_size = (size_t)recvsz;
416
417 nvec = 1;
418 vec[0].iov_base = cmsg->recv_buffer + cmsg->recv_end;
419 vec[0].iov_len = RECV_GOAL_READ_SIZE;
420 } else {
421 /* Continuing an old packet; space made available last
422 * time */
423 uint32_t *header = (uint32_t *)&cmsg->recv_buffer
424 [cmsg->recv_start];
425 size_t sz = alignz(transfer_size(*header), 4);
426
427 size_t read_end = cmsg->recv_start + sz;
428 bool wraparound =
429 cmsg->recv_start >= RECV_GOAL_READ_SIZE;
430 if (!wraparound) {
431 read_end = maxu(read_end,
432 cmsg->recv_end +
433 RECV_GOAL_READ_SIZE);
434 }
435 int recvsz = (int)cmsg->recv_size;
436 if (buf_ensure_size((int)read_end, 1, &recvsz,
437 (void **)&cmsg->recv_buffer) ==
438 -1) {
439 wp_error("Allocation failure, resizing receive buffer failed");
440 return ERR_NOMEM;
441 }
442 cmsg->recv_size = (size_t)recvsz;
443
444 nvec = 1;
445 vec[0].iov_base = cmsg->recv_buffer + cmsg->recv_end;
446 vec[0].iov_len = read_end - cmsg->recv_end;
447 if (wraparound) {
448 nvec = 2;
449 vec[1].iov_base = cmsg->recv_buffer;
450 vec[1].iov_len = cmsg->recv_start;
451 }
452 }
453
454 ssize_t r = readv(chanfd, vec, nvec);
455 if (r == -1 && (errno == EWOULDBLOCK || errno == EAGAIN)) {
456 wp_debug("Read would block");
457 return 0;
458 } else if (r == 0 || (r == -1 && errno == ECONNRESET)) {
459 wp_debug("Channel connection closed");
460 return ERR_DISCONN;
461 } else if (r == -1) {
462 wp_error("chanfd read failure: %s", strerror(errno));
463 return ERR_FATAL;
464 } else {
465 if (nvec == 2 && (size_t)r >= vec[0].iov_len) {
466 /* Complete parsing this message */
467 int cm_ret = interpret_chanmsg(cmsg, cxs, g,
468 display_side,
469 cmsg->recv_buffer +
470 cmsg->recv_start);
471 if (cm_ret < 0) {
472 return cm_ret;
473 }
474
475 cmsg->recv_start = 0;
476 cmsg->recv_end = (size_t)r - vec[0].iov_len;
477
478 if (cmsg->proto_write.zone_start <
479 cmsg->proto_write.zone_end) {
480 goto next_stage;
481 }
482 } else {
483 cmsg->recv_end += (size_t)r;
484 }
485 }
486 }
487
488 /* Recount unhandled messages */
489 cmsg->recv_unhandled_messages = 0;
490 size_t i = cmsg->recv_start;
491 while (i + sizeof(uint32_t) <= cmsg->recv_end) {
492 uint32_t *header = (uint32_t *)&cmsg->recv_buffer[i];
493 size_t sz = alignz(transfer_size(*header), 4);
494 if (sz == 0) {
495 wp_error("Encountered malformed zero size packet");
496 return ERR_FATAL;
497 }
498 i += sz;
499 if (i > cmsg->recv_end) {
500 break;
501 }
502 cmsg->recv_unhandled_messages++;
503 }
504
505 while (cmsg->recv_unhandled_messages > 0) {
506 char *packet_start = &cmsg->recv_buffer[cmsg->recv_start];
507 uint32_t *header = (uint32_t *)packet_start;
508 size_t sz = transfer_size(*header);
509 int cm_ret = interpret_chanmsg(
510 cmsg, cxs, g, display_side, packet_start);
511 if (cm_ret < 0) {
512 return cm_ret;
513 }
514 cmsg->recv_start += alignz(sz, 4);
515 cmsg->recv_unhandled_messages--;
516
517 if (cmsg->proto_write.zone_start < cmsg->proto_write.zone_end) {
518 goto next_stage;
519 }
520 }
521 return 0;
522 next_stage:
523 /* When protocol data was sent, switch to trying to write the protocol
524 * data to its socket, before trying to parse any other message */
525 cmsg->state = CM_WAITING_FOR_PROGRAM;
526 DTRACE_PROBE(waypipe, chanmsg_program_wait);
527 return 0;
528 }
advance_chanmsg_progwrite(struct chan_msg_state * cmsg,int progfd,bool display_side,struct globals * g)529 static int advance_chanmsg_progwrite(struct chan_msg_state *cmsg, int progfd,
530 bool display_side, struct globals *g)
531 {
532 const char *progdesc = display_side ? "compositor" : "application";
533 // Write as much as possible
534 while (cmsg->proto_write.zone_start < cmsg->proto_write.zone_end) {
535 ssize_t wc = iovec_write(progfd,
536 cmsg->proto_write.data +
537 cmsg->proto_write.zone_start,
538 (size_t)(cmsg->proto_write.zone_end -
539 cmsg->proto_write.zone_start),
540 cmsg->transf_fds.data,
541 cmsg->transf_fds.zone_end,
542 &cmsg->transf_fds.zone_start);
543 if (wc == -1 && (errno == EWOULDBLOCK || errno == EAGAIN)) {
544 wp_debug("Write to the %s would block", progdesc);
545 return 0;
546 } else if (wc == -1 &&
547 (errno == EPIPE || errno == ECONNRESET)) {
548 wp_error("%s has closed", progdesc);
549 /* The program has closed its end of the connection,
550 * so waypipe can also cease to process all messages and
551 * data updates that would be directed to it */
552 cmsg->state = CM_TERMINAL;
553 return ERR_STOP;
554 } else if (wc == -1) {
555 wp_error("%s write failure %zd: %s", progdesc, wc,
556 strerror(errno));
557 return ERR_FATAL;
558 } else {
559 cmsg->proto_write.zone_start += (int)wc;
560 wp_debug("Wrote to %s, %d/%d bytes in chunk %zd, %d/%d fds",
561 progdesc, cmsg->proto_write.zone_start,
562 cmsg->proto_write.zone_end, wc,
563 cmsg->transf_fds.zone_start,
564 cmsg->transf_fds.zone_end);
565
566 if (cmsg->transf_fds.zone_start > 0) {
567 decref_transferred_fds(&g->map,
568 cmsg->transf_fds.zone_start,
569 cmsg->transf_fds.data);
570 memmove(cmsg->transf_fds.data,
571 cmsg->transf_fds.data +
572 cmsg->transf_fds.zone_start,
573 (size_t)(cmsg->transf_fds.zone_end -
574 cmsg->transf_fds.zone_start) *
575 sizeof(int));
576 cmsg->transf_fds.zone_end -=
577 cmsg->transf_fds.zone_start;
578 }
579 }
580 }
581 if (cmsg->proto_write.zone_start == cmsg->proto_write.zone_end) {
582 wp_debug("Write to the %s succeeded", progdesc);
583 cmsg->state = CM_WAITING_FOR_CHANNEL;
584 DTRACE_PROBE(waypipe, chanmsg_channel_wait);
585 }
586 return 0;
587 }
advance_chanmsg_transfer(struct globals * g,struct chan_msg_state * cmsg,struct cross_state * cxs,bool display_side,int chanfd,int progfd,bool any_changes)588 static int advance_chanmsg_transfer(struct globals *g,
589 struct chan_msg_state *cmsg, struct cross_state *cxs,
590 bool display_side, int chanfd, int progfd, bool any_changes)
591 {
592 if (!any_changes) {
593 return 0;
594 }
595 if (cmsg->state == CM_WAITING_FOR_CHANNEL) {
596 return advance_chanmsg_chanread(
597 cmsg, cxs, chanfd, display_side, g);
598 } else if (cmsg->state == CM_WAITING_FOR_PROGRAM) {
599 return advance_chanmsg_progwrite(cmsg, progfd, display_side, g);
600 }
601 return 0;
602 }
603
clear_old_transfers(struct transfer_queue * td,uint32_t inclusive_cutoff)604 static void clear_old_transfers(
605 struct transfer_queue *td, uint32_t inclusive_cutoff)
606 {
607 for (int i = 0; i < td->end; i++) {
608 if (td->vecs[i].iov_len == 0) {
609 wp_error("Unexpected zero sized item %d [%d,%d)", i,
610 td->start, td->end);
611 }
612 }
613 int k = 0;
614 for (int i = 0; i < td->start; i++) {
615 if (!msgno_gt(inclusive_cutoff, td->meta[i].msgno)) {
616 break;
617 }
618 if (!td->meta[i].static_alloc) {
619 free(td->vecs[i].iov_base);
620 }
621 td->vecs[i].iov_base = NULL;
622 td->vecs[i].iov_len = 0;
623 k = i + 1;
624 }
625 if (k > 0) {
626 size_t nshift = (size_t)(td->end - k);
627 memmove(td->meta, td->meta + k, nshift * sizeof(td->meta[0]));
628 memmove(td->vecs, td->vecs + k, nshift * sizeof(td->vecs[0]));
629 td->start -= k;
630 td->end -= k;
631 }
632 }
633
634 /* Returns 0 sucessful -1 if fatal error, -2 if closed */
partial_write_transfer(int chanfd,struct transfer_queue * td,int * total_written,int max_iov)635 static int partial_write_transfer(int chanfd, struct transfer_queue *td,
636 int *total_written, int max_iov)
637 {
638 // Waiting for channel write to complete
639 if (td->start < td->end) {
640 /* Advance the current element by amount actually written */
641 char *orig_base = td->vecs[td->start].iov_base;
642 size_t orig_len = td->vecs[td->start].iov_len;
643 td->vecs[td->start].iov_base =
644 orig_base + td->partial_write_amt;
645 td->vecs[td->start].iov_len = orig_len - td->partial_write_amt;
646 int count = min(max_iov, td->end - td->start);
647 ssize_t wr = writev(chanfd, &td->vecs[td->start], count);
648 td->vecs[td->start].iov_base = orig_base;
649 td->vecs[td->start].iov_len = orig_len;
650
651 if (wr == -1 && (errno == EWOULDBLOCK || errno == EAGAIN)) {
652 return 0;
653 } else if (wr == -1 &&
654 (errno == ECONNRESET || errno == EPIPE)) {
655 wp_debug("Channel connection closed");
656 return ERR_DISCONN;
657 } else if (wr == -1) {
658 wp_error("chanfd write failure: %s", strerror(errno));
659 return ERR_FATAL;
660 }
661
662 size_t uwr = (size_t)wr;
663 *total_written += (int)wr;
664 while (uwr > 0 && td->start < td->end) {
665 /* Skip past zero-length blocks */
666 if (td->vecs[td->start].iov_len == 0) {
667 td->start++;
668 continue;
669 }
670 size_t left = td->vecs[td->start].iov_len -
671 td->partial_write_amt;
672 if (left > uwr) {
673 /* Block partially completed */
674 td->partial_write_amt += uwr;
675 uwr = 0;
676 } else {
677 /* Block completed */
678 td->partial_write_amt = 0;
679 uwr -= left;
680 td->start++;
681 }
682 }
683 }
684 return 0;
685 }
686
advance_waymsg_chanwrite(struct way_msg_state * wmsg,struct cross_state * cxs,struct globals * g,int chanfd,bool display_side)687 static int advance_waymsg_chanwrite(struct way_msg_state *wmsg,
688 struct cross_state *cxs, struct globals *g, int chanfd,
689 bool display_side)
690 {
691 const char *progdesc = display_side ? "compositor" : "application";
692
693 /* Copy the data in the transfer queue to the write queue. */
694 (void)transfer_load_async(&wmsg->transfers);
695
696 // First, clear out any transfers that are no longer needed
697 clear_old_transfers(&wmsg->transfers, cxs->last_confirmed_msgno);
698
699 /* Acknowledge the other side's transfers as soon as possible */
700 if (cxs->last_acked_msgno != cxs->last_received_msgno) {
701 if (transfer_ensure_size(&wmsg->transfers,
702 wmsg->transfers.end + 1) == -1) {
703 wp_error("Failed to allocate space for ack message transfer");
704 goto ackmsg_fail;
705 }
706 /* To avoid infinite regress, receive acknowledgement
707 * messages do not themselves increase the message counters. */
708 uint32_t ack_msgno;
709 if (wmsg->transfers.start == wmsg->transfers.end) {
710 ack_msgno = wmsg->transfers.last_msgno;
711 } else {
712 ack_msgno = wmsg->transfers.meta[wmsg->transfers.start]
713 .msgno;
714 }
715
716 /* This is the next point where messages can be changed */
717 int next_slot = (wmsg->transfers.partial_write_amt > 0)
718 ? wmsg->transfers.start + 1
719 : wmsg->transfers.start;
720 struct wmsg_ack *not_in_prog_msg = NULL;
721 struct wmsg_ack *queued_msg = NULL;
722 for (size_t i = 0; i < 2; i++) {
723 if (wmsg->transfers.partial_write_amt > 0 &&
724 wmsg->transfers.vecs[wmsg->transfers.start]
725 .iov_base ==
726 &wmsg->ack_msgs[i]) {
727 not_in_prog_msg = &wmsg->ack_msgs[1 - i];
728 }
729 if (wmsg->transfers.vecs[next_slot].iov_base ==
730 &wmsg->ack_msgs[i]) {
731 queued_msg = &wmsg->ack_msgs[i];
732 }
733 }
734
735 if (!queued_msg) {
736 /* Insert a message--which is not partially written--
737 * in the next available slot, pushing forward other
738 * messages */
739 if (!not_in_prog_msg) {
740 queued_msg = &wmsg->ack_msgs[0];
741 } else {
742 queued_msg = not_in_prog_msg;
743 }
744
745 if (next_slot < wmsg->transfers.end) {
746 size_t nmoved = (size_t)(wmsg->transfers.end -
747 next_slot);
748 memmove(wmsg->transfers.vecs + next_slot + 1,
749 wmsg->transfers.vecs +
750 next_slot,
751 sizeof(*wmsg->transfers.vecs) *
752 nmoved);
753 memmove(wmsg->transfers.meta + next_slot + 1,
754 wmsg->transfers.meta +
755 next_slot,
756 sizeof(*wmsg->transfers.meta) *
757 nmoved);
758 }
759 wmsg->transfers.vecs[next_slot].iov_len =
760 sizeof(struct wmsg_ack);
761 wmsg->transfers.vecs[next_slot].iov_base = queued_msg;
762 wmsg->transfers.meta[next_slot].msgno = ack_msgno;
763 wmsg->transfers.meta[next_slot].static_alloc = true;
764 wmsg->transfers.end++;
765 }
766
767 /* Modify the message which is now next up in the transfer
768 * queue */
769 queued_msg->size_and_type = transfer_header(
770 sizeof(struct wmsg_ack), WMSG_ACK_NBLOCKS);
771 queued_msg->messages_received = cxs->last_received_msgno;
772 cxs->last_acked_msgno = cxs->last_received_msgno;
773 ackmsg_fail:;
774 }
775
776 int ret = partial_write_transfer(chanfd, &wmsg->transfers,
777 &wmsg->total_written, wmsg->max_iov);
778 if (ret < 0) {
779 return ret;
780 }
781
782 bool is_done = false;
783 struct task_data task;
784 bool has_task = request_work_task(&g->threads, &task, &is_done);
785
786 /* Run a task ourselves, making use of the main thread */
787 if (has_task) {
788 run_task(&task, &g->threads.threads[0]);
789
790 pthread_mutex_lock(&g->threads.work_mutex);
791 g->threads.tasks_in_progress--;
792 pthread_mutex_unlock(&g->threads.work_mutex);
793 /* To skip the next poll */
794 uint8_t triv = 0;
795 if (write(g->threads.selfpipe_w, &triv, 1) == -1) {
796 wp_error("Failed to write to self-pipe");
797 }
798 }
799
800 if (is_done) {
801 /* It's possible for the last task to complete between
802 * `transfer_load_async` and `request_work_task` in this
803 * function, so copy out any remaining messages.`*/
804 (void)transfer_load_async(&wmsg->transfers);
805 }
806
807 if (is_done && wmsg->ntrailing > 0) {
808 for (int i = 0; i < wmsg->ntrailing; i++) {
809 transfer_add(&wmsg->transfers,
810 wmsg->trailing[i].iov_len,
811 wmsg->trailing[i].iov_base);
812 }
813
814 wmsg->ntrailing = 0;
815 memset(wmsg->trailing, 0, sizeof(wmsg->trailing));
816 }
817
818 if (wmsg->transfers.start == wmsg->transfers.end && is_done) {
819 for (struct shadow_fd_link *lcur = g->map.link.l_next,
820 *lnxt = lcur->l_next;
821 lcur != &g->map.link;
822 lcur = lnxt, lnxt = lcur->l_next) {
823 /* Note: finish_update() may delete `cur` */
824 struct shadow_fd *cur = (struct shadow_fd *)lcur;
825 finish_update(cur);
826 destroy_shadow_if_unreferenced(cur);
827 }
828
829 /* Reset work queue */
830 pthread_mutex_lock(&g->threads.work_mutex);
831 if (g->threads.stack_count > 0 ||
832 g->threads.tasks_in_progress > 0) {
833 wp_error("Multithreading state failure");
834 }
835 g->threads.do_work = false;
836 g->threads.stack_count = 0;
837 g->threads.tasks_in_progress = 0;
838 pthread_mutex_unlock(&g->threads.work_mutex);
839
840 DTRACE_PROBE(waypipe, channel_write_end);
841 size_t unacked_bytes = 0;
842 for (int i = 0; i < wmsg->transfers.end; i++) {
843 unacked_bytes += wmsg->transfers.vecs[i].iov_len;
844 }
845
846 wp_debug("Sent %d-byte message from %s to channel; %zu-bytes in flight",
847 wmsg->total_written, progdesc, unacked_bytes);
848
849 /* do not delete the used transfers yet; we need a remote
850 * acknowledgement */
851 wmsg->total_written = 0;
852 wmsg->state = WM_WAITING_FOR_PROGRAM;
853 }
854 return 0;
855 }
advance_waymsg_progread(struct way_msg_state * wmsg,struct globals * g,int progfd,bool display_side,bool progsock_readable)856 static int advance_waymsg_progread(struct way_msg_state *wmsg,
857 struct globals *g, int progfd, bool display_side,
858 bool progsock_readable)
859 {
860 const char *progdesc = display_side ? "compositor" : "application";
861 // We have data to read from programs/pipes
862 bool new_proto_data = false;
863 int old_fbuffer_end = wmsg->fds.zone_end;
864 if (progsock_readable) {
865 // Read /once/
866 ssize_t rc = iovec_read(progfd,
867 wmsg->proto_read.data +
868 wmsg->proto_read.zone_end,
869 (size_t)(wmsg->proto_read.size -
870 wmsg->proto_read.zone_end),
871 &wmsg->fds);
872 if (rc == -1 && (errno == EWOULDBLOCK || errno == EAGAIN)) {
873 // do nothing
874 } else if (rc == 0 || (rc == -1 && errno == ECONNRESET)) {
875 wp_error("%s has closed", progdesc);
876 // state transitions handled in main loop
877 return ERR_STOP;
878 } else if (rc == -1) {
879 wp_error("%s read failure: %s", progdesc,
880 strerror(errno));
881 return ERR_FATAL;
882 } else {
883 // We have successfully read some data.
884 wmsg->proto_read.zone_end += (int)rc;
885 new_proto_data = true;
886 }
887 }
888
889 if (new_proto_data) {
890 wp_debug("Read %d new file descriptors, have %d total now",
891 wmsg->fds.zone_end - old_fbuffer_end,
892 wmsg->fds.zone_end);
893
894 if (buf_ensure_size(wmsg->proto_read.size + 1024, 1,
895 &wmsg->proto_write.size,
896 (void **)&wmsg->proto_write.data) == -1) {
897 wp_error("Allocation failure for message workspace");
898 return ERR_NOMEM;
899 }
900
901 wmsg->proto_write.zone_start = 0;
902 wmsg->proto_write.zone_end = 0;
903 parse_and_prune_messages(g, display_side, !display_side,
904 &wmsg->proto_read, &wmsg->proto_write,
905 &wmsg->fds);
906
907 /* Recycle partial message bytes */
908 if (wmsg->proto_read.zone_start > 0) {
909 if (wmsg->proto_read.zone_end >
910 wmsg->proto_read.zone_start) {
911 memmove(wmsg->proto_read.data,
912 wmsg->proto_read.data +
913 wmsg->proto_read.zone_start,
914 (size_t)(wmsg->proto_read.zone_end -
915 wmsg->proto_read.zone_start));
916 }
917 wmsg->proto_read.zone_end -=
918 wmsg->proto_read.zone_start;
919 wmsg->proto_read.zone_start = 0;
920 }
921 }
922
923 read_readable_pipes(&g->map);
924
925 for (struct shadow_fd_link *lcur = g->map.link.l_next,
926 *lnxt = lcur->l_next;
927 lcur != &g->map.link;
928 lcur = lnxt, lnxt = lcur->l_next) {
929 /* Note: finish_update() may delete `cur` */
930 struct shadow_fd *cur = (struct shadow_fd *)lcur;
931 collect_update(&g->threads, cur, &wmsg->transfers,
932 g->config->old_video_mode);
933 /* collecting updates can reset `pipe.remote_can_X` state, so
934 * garbage collect the sfd immediately after */
935 destroy_shadow_if_unreferenced(cur);
936 }
937
938 int num_mt_tasks = start_parallel_work(
939 &g->threads, &wmsg->transfers.async_recv_queue);
940
941 if (new_proto_data) {
942 /* Send all file descriptors which have been used by the
943 * protocol parser, translating them if this has not already
944 * been done */
945 if (wmsg->fds.zone_start > 0) {
946 size_t act_size = (size_t)wmsg->fds.zone_start *
947 sizeof(int32_t) +
948 sizeof(uint32_t);
949 uint32_t *msg = malloc(act_size);
950 if (!msg) {
951 // TODO: use a ring buffer for allocations,
952 // and figure out how to block until it is clear
953 wp_error("Failed to allocate file desc tx msg");
954 return ERR_NOMEM;
955 }
956 msg[0] = transfer_header(act_size, WMSG_INJECT_RIDS);
957 int32_t *rbuffer = (int32_t *)(msg + 1);
958
959 /* Translate and adjust refcounts */
960 if (translate_fds(&g->map, &g->render,
961 wmsg->fds.zone_start,
962 wmsg->fds.data, rbuffer) == -1) {
963 free(msg);
964 return ERR_FATAL;
965 }
966 decref_transferred_rids(
967 &g->map, wmsg->fds.zone_start, rbuffer);
968 memmove(wmsg->fds.data,
969 wmsg->fds.data + wmsg->fds.zone_start,
970 sizeof(int) * (size_t)(wmsg->fds.zone_end -
971 wmsg->fds.zone_start));
972 wmsg->fds.zone_end -= wmsg->fds.zone_start;
973 wmsg->fds.zone_start = 0;
974
975 /* Add message to trailing queue */
976 wmsg->trailing[wmsg->ntrailing].iov_len = act_size;
977 wmsg->trailing[wmsg->ntrailing].iov_base = msg;
978 wmsg->ntrailing++;
979 }
980 if (wmsg->proto_write.zone_end > 0) {
981 wp_debug("We are transferring a data buffer with %d bytes",
982 wmsg->proto_write.zone_end);
983 size_t act_size = (size_t)wmsg->proto_write.zone_end +
984 sizeof(uint32_t);
985 uint32_t protoh = transfer_header(
986 act_size, WMSG_PROTOCOL);
987
988 uint8_t *copy_proto = malloc(alignz(act_size, 4));
989 if (!copy_proto) {
990 wp_error("Failed to allocate protocol tx msg");
991 return ERR_NOMEM;
992 }
993 memcpy(copy_proto, &protoh, sizeof(uint32_t));
994 memcpy(copy_proto + sizeof(uint32_t),
995 wmsg->proto_write.data,
996 (size_t)wmsg->proto_write.zone_end);
997 memset(copy_proto + sizeof(uint32_t) +
998 wmsg->proto_write
999 .zone_end,
1000 0, alignz(act_size, 4) - act_size);
1001
1002 wmsg->trailing[wmsg->ntrailing].iov_len =
1003 alignz(act_size, 4);
1004 wmsg->trailing[wmsg->ntrailing].iov_base = copy_proto;
1005 wmsg->ntrailing++;
1006 }
1007 }
1008
1009 int n_transfers = wmsg->transfers.end - wmsg->transfers.start;
1010 size_t net_bytes = 0;
1011 for (int i = wmsg->transfers.start; i < wmsg->transfers.end; i++) {
1012 net_bytes += wmsg->transfers.vecs[i].iov_len;
1013 }
1014
1015 if (n_transfers > 0 || num_mt_tasks > 0 || wmsg->ntrailing > 0) {
1016 wp_debug("Channel message start (%d blobs, %d bytes, %d trailing, %d tasks)",
1017 n_transfers, net_bytes, wmsg->ntrailing,
1018 num_mt_tasks);
1019 wmsg->state = WM_WAITING_FOR_CHANNEL;
1020 DTRACE_PROBE(waypipe, channel_write_start);
1021 }
1022 return 0;
1023 }
advance_waymsg_transfer(struct globals * g,struct way_msg_state * wmsg,struct cross_state * cxs,bool display_side,int chanfd,int progfd,bool progsock_readable)1024 static int advance_waymsg_transfer(struct globals *g,
1025 struct way_msg_state *wmsg, struct cross_state *cxs,
1026 bool display_side, int chanfd, int progfd,
1027 bool progsock_readable)
1028 {
1029 if (wmsg->state == WM_WAITING_FOR_CHANNEL) {
1030 return advance_waymsg_chanwrite(
1031 wmsg, cxs, g, chanfd, display_side);
1032 } else if (wmsg->state == WM_WAITING_FOR_PROGRAM) {
1033 return advance_waymsg_progread(wmsg, g, progfd, display_side,
1034 progsock_readable);
1035 }
1036 return 0;
1037 }
1038
read_new_chanfd(int linkfd,struct int_window * recon_fds)1039 static int read_new_chanfd(int linkfd, struct int_window *recon_fds)
1040 {
1041 uint8_t tmp = 0;
1042 ssize_t rd = iovec_read(linkfd, (char *)&tmp, 1, recon_fds);
1043 if (rd == -1 && (errno == EWOULDBLOCK || errno == EAGAIN)) {
1044 // do nothing
1045 return -1;
1046 } else if (rd == 0 || (rd == -1 && errno == ECONNRESET)) {
1047 wp_error("link has closed");
1048 // sentinel value, to indicate that linkfd should be closed
1049 return -2;
1050 } else if (rd == -1) {
1051 wp_error("link read failure: %s", strerror(errno));
1052 return -1;
1053 }
1054 for (int i = 0; i < recon_fds->zone_end - 1; i++) {
1055 checked_close(recon_fds->data[i]);
1056 }
1057 int ret_fd = -1;
1058 if (recon_fds->zone_end > 0) {
1059 ret_fd = recon_fds->data[recon_fds->zone_end - 1];
1060 }
1061 recon_fds->zone_end = 0;
1062 return ret_fd;
1063 }
1064
reconnect_loop(int linkfd,int progfd,struct int_window * recon_fds)1065 static int reconnect_loop(int linkfd, int progfd, struct int_window *recon_fds)
1066 {
1067 while (!shutdown_flag) {
1068 struct pollfd rcfs[2];
1069 rcfs[0].fd = linkfd;
1070 rcfs[0].events = POLLIN;
1071 rcfs[0].revents = 0;
1072 rcfs[1].fd = progfd;
1073 rcfs[1].events = 0;
1074 rcfs[1].revents = 0;
1075 int r = poll(rcfs, 2, -1);
1076 if (r == -1) {
1077 if (errno == EINTR) {
1078 continue;
1079 } else {
1080 break;
1081 }
1082 }
1083 if (rcfs[0].revents & POLLIN) {
1084 int nfd = read_new_chanfd(linkfd, recon_fds);
1085 if (nfd != -1) {
1086 return nfd;
1087 }
1088 }
1089 if (rcfs[0].revents & POLLHUP || rcfs[1].revents & POLLHUP) {
1090 return -1;
1091 }
1092 }
1093 return -1;
1094 }
1095
reset_connection(struct cross_state * cxs,struct chan_msg_state * cmsg,struct way_msg_state * wmsg,int chanfd)1096 static void reset_connection(struct cross_state *cxs,
1097 struct chan_msg_state *cmsg, struct way_msg_state *wmsg,
1098 int chanfd)
1099 {
1100 /* Discard partial read transfer, throwing away complete but unread
1101 * messages, and trailing remnants */
1102 cmsg->recv_end = 0;
1103 cmsg->recv_start = 0;
1104 cmsg->recv_unhandled_messages = 0;
1105
1106 clear_old_transfers(&wmsg->transfers, cxs->last_confirmed_msgno);
1107 wp_debug("Resetting connection: %d blocks unacknowledged",
1108 wmsg->transfers.end);
1109 if (wmsg->transfers.end > 0) {
1110 /* If there was any data in flight, restart. If there wasn't
1111 * anything in flight, then the remote side shouldn't notice the
1112 * difference */
1113 struct wmsg_restart restart;
1114 restart.size_and_type =
1115 transfer_header(sizeof(restart), WMSG_RESTART);
1116 restart.last_ack_received = cxs->last_confirmed_msgno;
1117 wmsg->transfers.start = 0;
1118 wmsg->transfers.partial_write_amt = 0;
1119 wp_debug("Sending restart message: last ack=%d",
1120 restart.last_ack_received);
1121 if (write(chanfd, &restart, sizeof(restart)) !=
1122 sizeof(restart)) {
1123 wp_error("Failed to write restart message");
1124 }
1125 }
1126
1127 if (set_nonblocking(chanfd) == -1) {
1128 wp_error("Error making new channel connection nonblocking: %s",
1129 strerror(errno));
1130 }
1131
1132 (void)cxs;
1133 }
1134
set_connections_nonblocking(int chanfd,int progfd,int linkfd,bool display_side)1135 static int set_connections_nonblocking(
1136 int chanfd, int progfd, int linkfd, bool display_side)
1137 {
1138 const char *progdesc = display_side ? "compositor" : "application";
1139 if (set_nonblocking(chanfd) == -1) {
1140 wp_error("Error making channel connection nonblocking: %s",
1141 strerror(errno));
1142 return -1;
1143 }
1144 if (set_nonblocking(progfd) == -1) {
1145 wp_error("Error making %s connection nonblocking: %s", progdesc,
1146 strerror(errno));
1147 return -1;
1148 }
1149 if (linkfd != -1 && set_nonblocking(linkfd) == -1) {
1150 wp_error("Error making link connection nonblocking: %s",
1151 strerror(errno));
1152 return -1;
1153 }
1154 return 0;
1155 }
1156
main_interface_loop(int chanfd,int progfd,int linkfd,const struct main_config * config,bool display_side)1157 int main_interface_loop(int chanfd, int progfd, int linkfd,
1158 const struct main_config *config, bool display_side)
1159 {
1160 if (set_connections_nonblocking(chanfd, progfd, linkfd, display_side) ==
1161 -1) {
1162 if (linkfd != -1) {
1163 checked_close(linkfd);
1164 }
1165 checked_close(chanfd);
1166 checked_close(progfd);
1167 return EXIT_FAILURE;
1168 }
1169 const char *progdesc = display_side ? "compositor" : "application";
1170 wp_debug("Running main loop on %s side", progdesc);
1171
1172 struct way_msg_state way_msg;
1173 memset(&way_msg, 0, sizeof(way_msg));
1174 struct chan_msg_state chan_msg;
1175 memset(&chan_msg, 0, sizeof(chan_msg));
1176 struct cross_state cross_data;
1177 memset(&cross_data, 0, sizeof(cross_data));
1178 struct globals g;
1179 memset(&g, 0, sizeof(g));
1180
1181 way_msg.state = WM_WAITING_FOR_PROGRAM;
1182 /* AFAIK, there is no documented upper bound for the size of a
1183 * Wayland protocol message, but libwayland (in wl_buffer_put)
1184 * effectively limits message sizes to 4096 bytes. We must
1185 * therefore adopt a limit as least as large. */
1186 const int max_read_size = 4096;
1187 way_msg.proto_read.size = max_read_size;
1188 way_msg.proto_read.data = malloc((size_t)way_msg.proto_read.size);
1189 way_msg.fds.size = 128;
1190 way_msg.fds.data = malloc((size_t)way_msg.fds.size * sizeof(int));
1191 way_msg.proto_write.size = 2 * max_read_size;
1192 way_msg.proto_write.data = malloc((size_t)way_msg.proto_write.size);
1193 way_msg.max_iov = get_iov_max();
1194 int mut_ret = pthread_mutex_init(
1195 &way_msg.transfers.async_recv_queue.lock, NULL);
1196 if (mut_ret) {
1197 wp_error("Mutex creation failed: %s", strerror(mut_ret));
1198 goto init_failure_cleanup;
1199 }
1200
1201 chan_msg.state = CM_WAITING_FOR_CHANNEL;
1202 chan_msg.recv_size = 2 * RECV_GOAL_READ_SIZE;
1203 chan_msg.recv_buffer = malloc((size_t)chan_msg.recv_size);
1204 chan_msg.proto_write.size = max_read_size * 2;
1205 chan_msg.proto_write.data = malloc((size_t)chan_msg.proto_write.size);
1206 if (!chan_msg.proto_write.data || !chan_msg.recv_buffer ||
1207 !way_msg.proto_write.data || !way_msg.fds.data ||
1208 !way_msg.proto_read.data) {
1209 wp_error("Failed to allocate a message scratch buffer");
1210 goto init_failure_cleanup;
1211 }
1212
1213 /* The first packet received will be #1 */
1214 way_msg.transfers.last_msgno = 1;
1215
1216 g.config = config;
1217 g.render = (struct render_data){
1218 .drm_node_path = config->drm_node,
1219 .drm_fd = -1,
1220 .dev = NULL,
1221 .disabled = config->no_gpu,
1222 .av_disabled = config->no_gpu ||
1223 !config->prefer_hwvideo,
1224 .av_bpf = config->video_bpf,
1225 .av_video_fmt = (int)config->video_fmt,
1226 .av_hwdevice_ref = NULL,
1227 .av_drmdevice_ref = NULL,
1228 .av_vadisplay = NULL,
1229 .av_copy_config = 0,
1230 };
1231 if (setup_thread_pool(&g.threads, config->compression,
1232 config->compression_level,
1233 config->n_worker_threads) == -1) {
1234 goto init_failure_cleanup;
1235 }
1236 setup_translation_map(&g.map, display_side);
1237 if (init_message_tracker(&g.tracker) == -1) {
1238 goto init_failure_cleanup;
1239 }
1240
1241 struct int_window recon_fds = {
1242 .data = NULL,
1243 .size = 0,
1244 .zone_start = 0,
1245 .zone_end = 0,
1246 };
1247
1248 bool needs_new_channel = false;
1249 struct pollfd *pfds = NULL;
1250 int pfds_size = 0;
1251 int exit_code = 0;
1252 while (!shutdown_flag && exit_code == 0 &&
1253 !(way_msg.state == WM_TERMINAL &&
1254 chan_msg.state == CM_TERMINAL)) {
1255 int psize = 4 + count_npipes(&g.map);
1256 if (buf_ensure_size(psize, sizeof(struct pollfd), &pfds_size,
1257 (void **)&pfds) == -1) {
1258 wp_error("Allocation failure, not enough space for pollfds");
1259 exit_code = ERR_NOMEM;
1260 break;
1261 }
1262 pfds[0].fd = chanfd;
1263 pfds[1].fd = progfd;
1264 pfds[2].fd = linkfd;
1265 pfds[3].fd = g.threads.selfpipe_r;
1266 pfds[0].events = 0;
1267 pfds[1].events = 0;
1268 pfds[2].events = POLLIN;
1269 pfds[3].events = POLLIN;
1270 if (way_msg.state == WM_WAITING_FOR_CHANNEL) {
1271 pfds[0].events |= POLLOUT;
1272 } else if (way_msg.state == WM_WAITING_FOR_PROGRAM) {
1273 pfds[1].events |= POLLIN;
1274 }
1275 if (chan_msg.state == CM_WAITING_FOR_CHANNEL) {
1276 pfds[0].events |= POLLIN;
1277 } else if (chan_msg.state == CM_WAITING_FOR_PROGRAM) {
1278 pfds[1].events |= POLLOUT;
1279 }
1280 bool check_read = way_msg.state == WM_WAITING_FOR_PROGRAM;
1281 int npoll = 4 + fill_with_pipes(&g.map, pfds + 4, check_read);
1282
1283 bool own_msg_pending =
1284 (cross_data.last_acked_msgno !=
1285 cross_data.last_received_msgno) &&
1286 way_msg.state == WM_WAITING_FOR_PROGRAM;
1287 bool unread_chan_msgs =
1288 chan_msg.state == CM_WAITING_FOR_CHANNEL &&
1289 chan_msg.recv_unhandled_messages > 0;
1290
1291 int poll_delay = -1;
1292 if (unread_chan_msgs) {
1293 /* There is work to do, so continue */
1294 poll_delay = 0;
1295 }
1296 if (own_msg_pending) {
1297 /* To coalesce acknowledgements, we wait for a minimum
1298 * amount */
1299 poll_delay = 20;
1300 }
1301 int r = poll(pfds, (nfds_t)npoll, poll_delay);
1302 if (r == -1) {
1303 if (errno == EINTR) {
1304 wp_error("poll interrupted: shutdown=%c",
1305 shutdown_flag ? 'Y' : 'n');
1306 continue;
1307 } else {
1308 wp_error("poll failed due to, stopping: %s",
1309 strerror(errno));
1310 exit_code = ERR_FATAL;
1311 break;
1312 }
1313 }
1314 if (pfds[3].revents & POLLIN) {
1315 /* After the self pipe has been used to wake up the
1316 * connection, drain it */
1317 char tmp[64];
1318 (void)read(g.threads.selfpipe_r, tmp, sizeof(tmp));
1319 }
1320
1321 mark_pipe_object_statuses(&g.map, npoll - 4, pfds + 4);
1322 /* POLLHUP sometimes implies POLLIN, but not on all systems.
1323 * Checking POLLHUP|POLLIN means that we can detect EOF when
1324 * we actually do try to read from the sockets, but also, if
1325 * there was data in the pipe just before the hang up, then we
1326 * can read and handle that data. */
1327 bool progsock_readable = pfds[1].revents & (POLLIN | POLLHUP);
1328 bool chanmsg_active = (pfds[0].revents & (POLLIN | POLLHUP)) ||
1329 (pfds[1].revents & POLLOUT) ||
1330 unread_chan_msgs;
1331
1332 bool maybe_new_channel = (pfds[2].revents & (POLLIN | POLLHUP));
1333 if (maybe_new_channel) {
1334 int new_fd = read_new_chanfd(linkfd, &recon_fds);
1335 if (new_fd >= 0) {
1336 if (chanfd != -1) {
1337 checked_close(chanfd);
1338 }
1339 chanfd = new_fd;
1340 reset_connection(&cross_data, &chan_msg,
1341 &way_msg, chanfd);
1342 needs_new_channel = false;
1343 } else if (new_fd == -2) {
1344 wp_error("Link to root process hang-up detected");
1345 checked_close(linkfd);
1346 linkfd = -1;
1347 }
1348 }
1349 if (needs_new_channel && linkfd != -1) {
1350 wp_error("Channel hang up detected, waiting for reconnection");
1351 int new_fd = reconnect_loop(linkfd, progfd, &recon_fds);
1352 if (new_fd < 0) {
1353 // -1 is read failure or misc error, -2 is HUP
1354 exit_code = ERR_FATAL;
1355 break;
1356 } else {
1357 /* Actually handle the reconnection/reset state
1358 */
1359 if (chanfd != -1) {
1360 checked_close(chanfd);
1361 }
1362 chanfd = new_fd;
1363 reset_connection(&cross_data, &chan_msg,
1364 &way_msg, chanfd);
1365 needs_new_channel = false;
1366 }
1367 } else if (needs_new_channel) {
1368 wp_error("Channel hang up detected, no reconnection link, fatal");
1369 exit_code = ERR_FATAL;
1370 break;
1371 }
1372
1373 // Q: randomize the order of these?, to highlight
1374 // accidental dependencies?
1375 for (int m = 0; m < 2; m++) {
1376 int tr;
1377 if (m == 0) {
1378 tr = advance_chanmsg_transfer(&g, &chan_msg,
1379 &cross_data, display_side,
1380 chanfd, progfd, chanmsg_active);
1381 } else {
1382 tr = advance_waymsg_transfer(&g, &way_msg,
1383 &cross_data, display_side,
1384 chanfd, progfd,
1385 progsock_readable);
1386 }
1387
1388 if (tr >= 0) {
1389 /* do nothing */
1390 } else if (tr == ERR_DISCONN) {
1391 /* Channel connection has at least
1392 * partially been shut down, so close it
1393 * fully. */
1394 checked_close(chanfd);
1395 chanfd = -1;
1396 if (linkfd == -1) {
1397 wp_error("Channel hang up detected, no reconnection link, fatal");
1398 exit_code = ERR_FATAL;
1399 break;
1400 }
1401 needs_new_channel = true;
1402 } else if (tr == ERR_STOP) {
1403 if (m == 0) {
1404 /* Stop returned while writing: Wayland
1405 * connection has at least partially
1406 * shut down, so close it fully. */
1407 checked_close(progfd);
1408 progfd = -1;
1409 } else {
1410 /* Stop returned while reading */
1411 checked_close(progfd);
1412 progfd = -1;
1413 if (way_msg.state ==
1414 WM_WAITING_FOR_PROGRAM) {
1415 way_msg.state = WM_TERMINAL;
1416 }
1417 if (chan_msg.state == CM_WAITING_FOR_PROGRAM ||
1418 chan_msg.recv_start ==
1419 chan_msg.recv_end) {
1420 chan_msg.state = CM_TERMINAL;
1421 }
1422 }
1423 } else {
1424 /* Fatal error, close and flush */
1425 exit_code = tr;
1426 break;
1427 }
1428
1429 /* If the program connection has closed, and
1430 * there waypipe is not currently transferring
1431 * any message to the channel, then shutdown the
1432 * program->channel transfers. (The reverse
1433 * situation with the chnanel connection is not
1434 * a cause for permanent closure, thanks to
1435 * reconnection support */
1436 if (progfd == -1) {
1437 if (way_msg.state == WM_WAITING_FOR_PROGRAM) {
1438 way_msg.state = WM_TERMINAL;
1439 }
1440 if (chan_msg.state == CM_WAITING_FOR_PROGRAM ||
1441 chan_msg.recv_start ==
1442 chan_msg.recv_end) {
1443 chan_msg.state = CM_TERMINAL;
1444 }
1445 }
1446 }
1447
1448 // Periodic maintenance. It doesn't matter who does this
1449 flush_writable_pipes(&g.map);
1450 }
1451 free(pfds);
1452 free(recon_fds.data);
1453 wp_debug("Exiting main loop (%d, %d, %d), attempting close message",
1454 exit_code, way_msg.state, chan_msg.state);
1455
1456 init_failure_cleanup:
1457 /* It's possible, but very very unlikely, that waypipe gets closed
1458 * while Wayland protocol messages are being written to the program
1459 * and the most recent message was only partially written. */
1460 exit_code = ERR_FATAL;
1461 if (chan_msg.proto_write.zone_start != chan_msg.proto_write.zone_end) {
1462 wp_debug("Final write to %s was incomplete, %d/%d", progdesc,
1463 chan_msg.proto_write.zone_start,
1464 chan_msg.proto_write.zone_end);
1465 }
1466
1467 if (!display_side && progfd != -1) {
1468 char error[128];
1469 if (exit_code == ERR_FATAL) {
1470 size_t len = print_display_error(error, sizeof(error),
1471 3, "waypipe internal error");
1472 if (write(progfd, error, len) == -1) {
1473 wp_error("Failed to send waypipe error notification: %s",
1474 strerror(errno));
1475 }
1476 } else if (exit_code == ERR_NOMEM) {
1477 size_t len = print_display_error(
1478 error, sizeof(error), 2, "no memory");
1479 if (write(progfd, error, len) == -1) {
1480 wp_error("Failed to send OOM notification: %s",
1481 strerror(errno));
1482 }
1483 }
1484 }
1485
1486 /* Attempt to notify remote end that the application has closed,
1487 * waiting at most for a very short amount of time */
1488 if (way_msg.transfers.start != way_msg.transfers.end) {
1489 wp_error("Final write to channel was incomplete, %d+%zu/%d",
1490 way_msg.transfers.start,
1491 way_msg.transfers.partial_write_amt,
1492 way_msg.transfers.end);
1493 }
1494
1495 if (chanfd != -1) {
1496 struct pollfd close_poll;
1497 close_poll.fd = chanfd;
1498 close_poll.events = POLLOUT;
1499 int close_ret = poll(&close_poll, 1, 200);
1500 if (close_ret == 0) {
1501 wp_debug("Exit poll timed out");
1502 }
1503 uint32_t close_msg[2];
1504 close_msg[0] = transfer_header(sizeof(close_msg), WMSG_CLOSE);
1505 close_msg[1] = exit_code == ERR_STOP ? 0 : (uint32_t)exit_code;
1506 wp_debug("Sending close message, modecode=%d", close_msg[1]);
1507 if (write(chanfd, &close_msg, sizeof(close_msg)) == -1) {
1508 wp_error("Failed to send close notification: %s",
1509 strerror(errno));
1510 }
1511 } else {
1512 wp_debug("Channel closed, hence no close notification");
1513 }
1514
1515 cleanup_thread_pool(&g.threads);
1516 cleanup_message_tracker(&g.tracker);
1517 cleanup_translation_map(&g.map);
1518 cleanup_render_data(&g.render);
1519 cleanup_hwcontext(&g.render);
1520 free(way_msg.proto_read.data);
1521 free(way_msg.proto_write.data);
1522 free(way_msg.fds.data);
1523 cleanup_transfer_queue(&way_msg.transfers);
1524 for (int i = 0; i < way_msg.ntrailing; i++) {
1525 free(way_msg.trailing[i].iov_base);
1526 }
1527 free(chan_msg.transf_fds.data);
1528 free(chan_msg.proto_fds.data);
1529 free(chan_msg.recv_buffer);
1530 free(chan_msg.proto_write.data);
1531
1532 if (chanfd != -1) {
1533 checked_close(chanfd);
1534 }
1535 if (progfd != -1) {
1536 checked_close(progfd);
1537 }
1538 if (linkfd != -1) {
1539 checked_close(linkfd);
1540 }
1541 return EXIT_SUCCESS;
1542 }
1543