1 /*
2  * Copyright © 2019 Manuel Stoeckl
3  *
4  * Permission is hereby granted, free of charge, to any person obtaining
5  * a copy of this software and associated documentation files (the
6  * "Software"), to deal in the Software without restriction, including
7  * without limitation the rights to use, copy, modify, merge, publish,
8  * distribute, sublicense, and/or sell copies of the Software, and to
9  * permit persons to whom the Software is furnished to do so, subject to
10  * the following conditions:
11  *
12  * The above copyright notice and this permission notice (including the
13  * next paragraph) shall be included in all copies or substantial
14  * portions of the Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
17  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
18  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
19  * NONINFRINGEMENT.  IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
20  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
21  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
22  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
23  * SOFTWARE.
24  */
25 
26 #include "main.h"
27 
28 #include <errno.h>
29 #include <fcntl.h>
30 #include <inttypes.h>
31 #include <limits.h>
32 #include <poll.h>
33 #include <stdlib.h>
34 #include <string.h>
35 #include <sys/socket.h>
36 #include <sys/uio.h>
37 #include <unistd.h>
38 
39 // The maximum number of fds libwayland can recvmsg at once
40 #define MAX_LIBWAY_FDS 28
iovec_read(int conn,char * buf,size_t buflen,struct int_window * fds)41 static ssize_t iovec_read(
42 		int conn, char *buf, size_t buflen, struct int_window *fds)
43 {
44 	char cmsgdata[(CMSG_LEN(MAX_LIBWAY_FDS * sizeof(int32_t)))];
45 	struct iovec the_iovec;
46 	the_iovec.iov_len = buflen;
47 	the_iovec.iov_base = buf;
48 	struct msghdr msg;
49 	msg.msg_name = NULL;
50 	msg.msg_namelen = 0;
51 	msg.msg_iov = &the_iovec;
52 	msg.msg_iovlen = 1;
53 	msg.msg_control = &cmsgdata;
54 	msg.msg_controllen = sizeof(cmsgdata);
55 	msg.msg_flags = 0;
56 	ssize_t ret = recvmsg(conn, &msg, 0);
57 
58 	// Read cmsg
59 	struct cmsghdr *header = CMSG_FIRSTHDR(&msg);
60 	while (header) {
61 		struct cmsghdr *nxt_hdr = CMSG_NXTHDR(&msg, header);
62 		if (header->cmsg_level != SOL_SOCKET ||
63 				header->cmsg_type != SCM_RIGHTS) {
64 			header = nxt_hdr;
65 			continue;
66 		}
67 
68 		int *data = (int *)CMSG_DATA(header);
69 		int nf = (int)((header->cmsg_len - CMSG_LEN(0)) / sizeof(int));
70 
71 		if (buf_ensure_size(fds->zone_end + nf, sizeof(int), &fds->size,
72 				    (void **)&fds->data) == -1) {
73 			wp_error("Failed to allocate space for new fds");
74 			errno = ENOMEM;
75 			ret = -1;
76 		} else {
77 			for (int i = 0; i < nf; i++) {
78 				fds->data[fds->zone_end++] = data[i];
79 			}
80 		}
81 
82 		header = nxt_hdr;
83 	}
84 	return ret;
85 }
86 
iovec_write(int conn,const char * buf,size_t buflen,const int * fds,int numfds,int * nfds_written)87 static ssize_t iovec_write(int conn, const char *buf, size_t buflen,
88 		const int *fds, int numfds, int *nfds_written)
89 {
90 	bool overflow = numfds > MAX_LIBWAY_FDS;
91 
92 	struct iovec the_iovec;
93 	the_iovec.iov_len = overflow ? 1 : buflen;
94 	the_iovec.iov_base = (char *)buf;
95 	struct msghdr msg;
96 	msg.msg_name = NULL;
97 	msg.msg_namelen = 0;
98 	msg.msg_iov = &the_iovec;
99 	msg.msg_iovlen = 1;
100 	msg.msg_control = NULL;
101 	msg.msg_controllen = 0;
102 	msg.msg_flags = 0;
103 
104 	union {
105 		char buf[CMSG_SPACE(sizeof(int) * MAX_LIBWAY_FDS)];
106 		struct cmsghdr align;
107 	} uc;
108 	memset(uc.buf, 0, sizeof(uc.buf));
109 
110 	if (numfds > 0) {
111 		msg.msg_control = uc.buf;
112 		msg.msg_controllen = sizeof(uc.buf);
113 		struct cmsghdr *frst = CMSG_FIRSTHDR(&msg);
114 		frst->cmsg_level = SOL_SOCKET;
115 		frst->cmsg_type = SCM_RIGHTS;
116 		*nfds_written = min(numfds, MAX_LIBWAY_FDS);
117 		size_t nwritten = (size_t)(*nfds_written);
118 		memcpy(CMSG_DATA(frst), fds, nwritten * sizeof(int));
119 		for (int i = 0; i < numfds; i++) {
120 			int flags = fcntl(fds[i], F_GETFL, 0);
121 			if (flags == -1 && errno == EBADF) {
122 				wp_error("Writing invalid fd %d", fds[i]);
123 			}
124 		}
125 
126 		frst->cmsg_len = CMSG_LEN(nwritten * sizeof(int));
127 		msg.msg_controllen = CMSG_SPACE(nwritten * sizeof(int));
128 		wp_debug("Writing %d fds to cmsg data", *nfds_written);
129 	} else {
130 		*nfds_written = 0;
131 	}
132 
133 	ssize_t ret = sendmsg(conn, &msg, 0);
134 	return ret;
135 }
136 
translate_fds(struct fd_translation_map * map,struct render_data * render,int nfds,const int fds[],int ids[])137 static int translate_fds(struct fd_translation_map *map,
138 		struct render_data *render, int nfds, const int fds[],
139 		int ids[])
140 {
141 	for (int i = 0; i < nfds; i++) {
142 		struct shadow_fd *sfd = get_shadow_for_local_fd(map, fds[i]);
143 		if (!sfd) {
144 			/* Autodetect type + create shadow fd */
145 			size_t fdsz = 0;
146 			enum fdcat fdtype = get_fd_type(fds[i], &fdsz);
147 			sfd = translate_fd(map, render, fds[i], fdtype, fdsz,
148 					NULL, false, false);
149 		}
150 		if (sfd) {
151 			ids[i] = sfd->remote_id;
152 		} else {
153 			return -1;
154 		}
155 	}
156 	return 0;
157 }
158 /** Given a list of global ids, and an up-to-date translation map, produce local
159  * file descriptors */
untranslate_ids(struct fd_translation_map * map,int nids,const int * ids,int * fds)160 static void untranslate_ids(struct fd_translation_map *map, int nids,
161 		const int *ids, int *fds)
162 {
163 	for (int i = 0; i < nids; i++) {
164 		struct shadow_fd *shadow = get_shadow_for_rid(map, ids[i]);
165 		if (!shadow) {
166 			wp_error("Could not untranslate remote id %d in map. Application will probably crash.",
167 					ids[i]);
168 			fds[i] = -1;
169 		} else {
170 			fds[i] = shadow->fd_local;
171 		}
172 	}
173 }
174 
175 enum wm_state { WM_WAITING_FOR_PROGRAM, WM_WAITING_FOR_CHANNEL, WM_TERMINAL };
176 /** This state corresponds to the in-progress transfer from the program
177  * (compositor or application) and its pipes/buffers to the channel. */
178 struct way_msg_state {
179 	enum wm_state state;
180 
181 	/** Window zone contains the message data which has been read
182 	 * but not yet parsed/copied to proto_write */
183 	struct char_window proto_read;
184 	/** Buffer of complete protocol messages to be written to the channel */
185 	struct char_window proto_write;
186 
187 	/** Queue of fds to be used by protocol parser */
188 	struct int_window fds;
189 
190 	/** Individual messages, to be sent out via writev and deleted on
191 	 * acknowledgement */
192 	struct transfer_queue transfers;
193 	/** bytes written in this cycle, for debug */
194 	int total_written;
195 	/** Maximum chunk size to writev at once*/
196 	int max_iov;
197 
198 	/** Transfers to send after the compute queue is empty */
199 	int ntrailing;
200 	struct iovec trailing[3];
201 
202 	/** Statically allocated message acknowledgement messages; due
203 	 * to the way they are updated out of order, at most two are needed */
204 	struct wmsg_ack ack_msgs[2];
205 };
206 
207 enum cm_state { CM_WAITING_FOR_PROGRAM, CM_WAITING_FOR_CHANNEL, CM_TERMINAL };
208 /** This state corresponds to the in-progress transfer from the channel
209  * to the program and the buffers/pipes on which will be written. */
210 struct chan_msg_state {
211 	enum cm_state state;
212 
213 	/** Edited protocol data which is being written to the program */
214 	struct char_window proto_write;
215 
216 	/**< FDs that should immediately be transferred to the program */
217 	struct int_window transf_fds;
218 	/**< FD queue for the protocol parser */
219 	struct int_window proto_fds;
220 
221 #define RECV_GOAL_READ_SIZE 131072
222 	char *recv_buffer; // ring-like buffer for message data
223 	size_t recv_size;
224 	size_t recv_start; // (recv_buffer+rev_start) should be a message header
225 	size_t recv_end;   // last byte read from channel, always >=recv_start
226 	int recv_unhandled_messages; // number of messages to parse
227 };
228 
229 /** State used by both forward and reverse messages */
230 struct cross_state {
231 	/* Which was the last received message received from the other
232 	 * application, for which acknowledgement was sent? */
233 	uint32_t last_acked_msgno;
234 	/* Which was the last message number received from the other
235 	 * application? */
236 	uint32_t last_received_msgno;
237 	/* What was the highest number message received from the other
238 	 * application? (matches last_received, unless we needed a restart */
239 	uint32_t newest_received_msgno;
240 	/* Which was the last message number sent to the other application which
241 	 * was acknowledged by that side? */
242 	uint32_t last_confirmed_msgno;
243 };
244 
interpret_chanmsg(struct chan_msg_state * cmsg,struct cross_state * cxs,struct globals * g,bool display_side,char * packet)245 static int interpret_chanmsg(struct chan_msg_state *cmsg,
246 		struct cross_state *cxs, struct globals *g, bool display_side,
247 		char *packet)
248 {
249 	uint32_t size_and_type = *(uint32_t *)packet;
250 	size_t unpadded_size = transfer_size(size_and_type);
251 	enum wmsg_type type = transfer_type(size_and_type);
252 	if (type == WMSG_CLOSE) {
253 		/* No new messages from the channel to the program will be
254 		 * allowed after this */
255 		cmsg->state = CM_TERMINAL;
256 
257 		wp_debug("Other side has closed");
258 		if (unpadded_size < 8) {
259 			return ERR_FATAL;
260 		}
261 		int32_t code = ((int32_t *)packet)[1];
262 		if (code == ERR_FATAL) {
263 			return ERR_FATAL;
264 		} else if (code == ERR_NOMEM) {
265 			return ERR_NOMEM;
266 		} else {
267 			return ERR_STOP;
268 		}
269 	} else if (type == WMSG_RESTART) {
270 		struct wmsg_restart *ackm = (struct wmsg_restart *)packet;
271 		wp_debug("Received restart message: remote last saw ack %d (we last recvd %d, acked %d)",
272 				ackm->last_ack_received,
273 				cxs->last_received_msgno,
274 				cxs->last_acked_msgno);
275 		cxs->last_received_msgno = ackm->last_ack_received;
276 		return 0;
277 	} else if (type == WMSG_ACK_NBLOCKS) {
278 		struct wmsg_ack *ackm = (struct wmsg_ack *)packet;
279 		if (msgno_gt(ackm->messages_received,
280 				    cxs->last_received_msgno)) {
281 			cxs->last_confirmed_msgno = ackm->messages_received;
282 		}
283 		return 0;
284 	} else {
285 		cxs->last_received_msgno++;
286 		if (msgno_gt(cxs->newest_received_msgno,
287 				    cxs->last_received_msgno)) {
288 			/* Skip packet, as we already received it */
289 			wp_debug("Ignoring replayed message %d (newest=%d)",
290 					cxs->last_received_msgno,
291 					cxs->newest_received_msgno);
292 			return 0;
293 		}
294 		cxs->newest_received_msgno = cxs->last_received_msgno;
295 	}
296 
297 	if (type == WMSG_INJECT_RIDS) {
298 		const int32_t *fds = &((const int32_t *)packet)[1];
299 		int nfds = (int)((unpadded_size - sizeof(uint32_t)) /
300 				 sizeof(int32_t));
301 
302 		if (buf_ensure_size(nfds, sizeof(int), &cmsg->transf_fds.size,
303 				    (void **)&cmsg->transf_fds.data) == -1) {
304 			wp_error("Allocation failure for fd transfer queue, expect a crash");
305 			return ERR_NOMEM;
306 		}
307 		/* Reset transfer buffer; all fds in here were already sent */
308 		cmsg->transf_fds.zone_start = 0;
309 		cmsg->transf_fds.zone_end = nfds;
310 		untranslate_ids(&g->map, nfds, fds, cmsg->transf_fds.data);
311 		if (nfds > 0) {
312 			if (buf_ensure_size(cmsg->proto_fds.zone_end + nfds,
313 					    sizeof(int), &cmsg->proto_fds.size,
314 					    (void **)&cmsg->proto_fds.data) ==
315 					-1) {
316 				wp_error("Allocation failure for fd protocol queue");
317 				return ERR_NOMEM;
318 			}
319 
320 			// Append the new file descriptors to the parsing queue
321 			memcpy(cmsg->proto_fds.data + cmsg->proto_fds.zone_end,
322 					cmsg->transf_fds.data,
323 					sizeof(int) * (size_t)nfds);
324 			cmsg->proto_fds.zone_end += nfds;
325 		}
326 		return 0;
327 	} else if (type == WMSG_PROTOCOL) {
328 		/* While by construction, the provided message buffer should be
329 		 * aligned with individual message boundaries, it is not
330 		 * guaranteed that all file descriptors provided will be used by
331 		 * the messages. This makes fd handling more complicated. */
332 		int protosize = (int)(unpadded_size - sizeof(uint32_t));
333 		// TODO: have message editing routines ensure size, so
334 		// that this limit can be tighter
335 		if (buf_ensure_size(protosize + 1024, 1,
336 				    &cmsg->proto_write.size,
337 				    (void **)&cmsg->proto_write.data) == -1) {
338 			wp_error("Allocation failure for message workspace");
339 			return ERR_NOMEM;
340 		}
341 		cmsg->proto_write.zone_end = 0;
342 		cmsg->proto_write.zone_start = 0;
343 
344 		struct char_window src;
345 		src.data = packet + sizeof(uint32_t);
346 		src.zone_start = 0;
347 		src.zone_end = protosize;
348 		src.size = protosize;
349 		parse_and_prune_messages(g, display_side, display_side, &src,
350 				&cmsg->proto_write, &cmsg->proto_fds);
351 		if (src.zone_start != src.zone_end) {
352 			wp_error("did not expect partial messages over channel, only parsed %d/%d bytes",
353 					src.zone_start, src.zone_end);
354 			return ERR_FATAL;
355 		}
356 		/* Update file descriptor queue */
357 		if (cmsg->proto_fds.zone_end > cmsg->proto_fds.zone_start) {
358 			memmove(cmsg->proto_fds.data,
359 					cmsg->proto_fds.data +
360 							cmsg->proto_fds.zone_start,
361 					sizeof(int) * (size_t)(cmsg->proto_fds.zone_end >
362 								      cmsg->proto_fds.zone_start));
363 			cmsg->proto_fds.zone_end -= cmsg->proto_fds.zone_start;
364 		}
365 		return 0;
366 	} else {
367 		if (unpadded_size < sizeof(struct wmsg_basic)) {
368 			wp_error("Message is too small to contain header+RID, %d bytes",
369 					unpadded_size);
370 			return ERR_FATAL;
371 		}
372 		const struct wmsg_basic *op_header =
373 				(const struct wmsg_basic *)packet;
374 		struct bytebuf msg = {
375 				.data = packet,
376 				.size = unpadded_size,
377 		};
378 		wp_debug("Received %s for RID=%d (len %d)",
379 				wmsg_type_to_str(type), op_header->remote_id,
380 				unpadded_size);
381 		return apply_update(&g->map, &g->threads, &g->render, type,
382 				op_header->remote_id, &msg);
383 	}
384 }
385 
advance_chanmsg_chanread(struct chan_msg_state * cmsg,struct cross_state * cxs,int chanfd,bool display_side,struct globals * g)386 static int advance_chanmsg_chanread(struct chan_msg_state *cmsg,
387 		struct cross_state *cxs, int chanfd, bool display_side,
388 		struct globals *g)
389 {
390 	/* Setup read operation to be able to read a minimum number of bytes,
391 	 * wrapping around as early as overlap conditions permit */
392 	if (cmsg->recv_unhandled_messages == 0) {
393 		struct iovec vec[2];
394 		memset(vec, 0, sizeof(vec));
395 		int nvec;
396 		if (cmsg->recv_start == cmsg->recv_end) {
397 			/* A fresh packet */
398 			cmsg->recv_start = 0;
399 			cmsg->recv_end = 0;
400 			nvec = 1;
401 			vec[0].iov_base = cmsg->recv_buffer;
402 			vec[0].iov_len = (size_t)(cmsg->recv_size / 2);
403 		} else if (cmsg->recv_end <
404 				cmsg->recv_start + sizeof(uint32_t)) {
405 			/* Didn't quite finish reading the header */
406 			int recvsz = (int)cmsg->recv_size;
407 			if (buf_ensure_size((int)cmsg->recv_end +
408 							    RECV_GOAL_READ_SIZE,
409 					    1, &recvsz,
410 					    (void **)&cmsg->recv_buffer) ==
411 					-1) {
412 				wp_error("Allocation failure, resizing receive buffer failed");
413 				return ERR_NOMEM;
414 			}
415 			cmsg->recv_size = (size_t)recvsz;
416 
417 			nvec = 1;
418 			vec[0].iov_base = cmsg->recv_buffer + cmsg->recv_end;
419 			vec[0].iov_len = RECV_GOAL_READ_SIZE;
420 		} else {
421 			/* Continuing an old packet; space made available last
422 			 * time */
423 			uint32_t *header = (uint32_t *)&cmsg->recv_buffer
424 							   [cmsg->recv_start];
425 			size_t sz = alignz(transfer_size(*header), 4);
426 
427 			size_t read_end = cmsg->recv_start + sz;
428 			bool wraparound =
429 					cmsg->recv_start >= RECV_GOAL_READ_SIZE;
430 			if (!wraparound) {
431 				read_end = maxu(read_end,
432 						cmsg->recv_end +
433 								RECV_GOAL_READ_SIZE);
434 			}
435 			int recvsz = (int)cmsg->recv_size;
436 			if (buf_ensure_size((int)read_end, 1, &recvsz,
437 					    (void **)&cmsg->recv_buffer) ==
438 					-1) {
439 				wp_error("Allocation failure, resizing receive buffer failed");
440 				return ERR_NOMEM;
441 			}
442 			cmsg->recv_size = (size_t)recvsz;
443 
444 			nvec = 1;
445 			vec[0].iov_base = cmsg->recv_buffer + cmsg->recv_end;
446 			vec[0].iov_len = read_end - cmsg->recv_end;
447 			if (wraparound) {
448 				nvec = 2;
449 				vec[1].iov_base = cmsg->recv_buffer;
450 				vec[1].iov_len = cmsg->recv_start;
451 			}
452 		}
453 
454 		ssize_t r = readv(chanfd, vec, nvec);
455 		if (r == -1 && (errno == EWOULDBLOCK || errno == EAGAIN)) {
456 			wp_debug("Read would block");
457 			return 0;
458 		} else if (r == 0 || (r == -1 && errno == ECONNRESET)) {
459 			wp_debug("Channel connection closed");
460 			return ERR_DISCONN;
461 		} else if (r == -1) {
462 			wp_error("chanfd read failure: %s", strerror(errno));
463 			return ERR_FATAL;
464 		} else {
465 			if (nvec == 2 && (size_t)r >= vec[0].iov_len) {
466 				/* Complete parsing this message */
467 				int cm_ret = interpret_chanmsg(cmsg, cxs, g,
468 						display_side,
469 						cmsg->recv_buffer +
470 								cmsg->recv_start);
471 				if (cm_ret < 0) {
472 					return cm_ret;
473 				}
474 
475 				cmsg->recv_start = 0;
476 				cmsg->recv_end = (size_t)r - vec[0].iov_len;
477 
478 				if (cmsg->proto_write.zone_start <
479 						cmsg->proto_write.zone_end) {
480 					goto next_stage;
481 				}
482 			} else {
483 				cmsg->recv_end += (size_t)r;
484 			}
485 		}
486 	}
487 
488 	/* Recount unhandled messages */
489 	cmsg->recv_unhandled_messages = 0;
490 	size_t i = cmsg->recv_start;
491 	while (i + sizeof(uint32_t) <= cmsg->recv_end) {
492 		uint32_t *header = (uint32_t *)&cmsg->recv_buffer[i];
493 		size_t sz = alignz(transfer_size(*header), 4);
494 		if (sz == 0) {
495 			wp_error("Encountered malformed zero size packet");
496 			return ERR_FATAL;
497 		}
498 		i += sz;
499 		if (i > cmsg->recv_end) {
500 			break;
501 		}
502 		cmsg->recv_unhandled_messages++;
503 	}
504 
505 	while (cmsg->recv_unhandled_messages > 0) {
506 		char *packet_start = &cmsg->recv_buffer[cmsg->recv_start];
507 		uint32_t *header = (uint32_t *)packet_start;
508 		size_t sz = transfer_size(*header);
509 		int cm_ret = interpret_chanmsg(
510 				cmsg, cxs, g, display_side, packet_start);
511 		if (cm_ret < 0) {
512 			return cm_ret;
513 		}
514 		cmsg->recv_start += alignz(sz, 4);
515 		cmsg->recv_unhandled_messages--;
516 
517 		if (cmsg->proto_write.zone_start < cmsg->proto_write.zone_end) {
518 			goto next_stage;
519 		}
520 	}
521 	return 0;
522 next_stage:
523 	/* When protocol data was sent, switch to trying to write the protocol
524 	 * data to its socket, before trying to parse any other message */
525 	cmsg->state = CM_WAITING_FOR_PROGRAM;
526 	DTRACE_PROBE(waypipe, chanmsg_program_wait);
527 	return 0;
528 }
advance_chanmsg_progwrite(struct chan_msg_state * cmsg,int progfd,bool display_side,struct globals * g)529 static int advance_chanmsg_progwrite(struct chan_msg_state *cmsg, int progfd,
530 		bool display_side, struct globals *g)
531 {
532 	const char *progdesc = display_side ? "compositor" : "application";
533 	// Write as much as possible
534 	while (cmsg->proto_write.zone_start < cmsg->proto_write.zone_end) {
535 		ssize_t wc = iovec_write(progfd,
536 				cmsg->proto_write.data +
537 						cmsg->proto_write.zone_start,
538 				(size_t)(cmsg->proto_write.zone_end -
539 						cmsg->proto_write.zone_start),
540 				cmsg->transf_fds.data,
541 				cmsg->transf_fds.zone_end,
542 				&cmsg->transf_fds.zone_start);
543 		if (wc == -1 && (errno == EWOULDBLOCK || errno == EAGAIN)) {
544 			wp_debug("Write to the %s would block", progdesc);
545 			return 0;
546 		} else if (wc == -1 &&
547 				(errno == EPIPE || errno == ECONNRESET)) {
548 			wp_error("%s has closed", progdesc);
549 			/* The program has closed its end of the connection,
550 			 * so waypipe can also cease to process all messages and
551 			 * data updates that would be directed to it */
552 			cmsg->state = CM_TERMINAL;
553 			return ERR_STOP;
554 		} else if (wc == -1) {
555 			wp_error("%s write failure %zd: %s", progdesc, wc,
556 					strerror(errno));
557 			return ERR_FATAL;
558 		} else {
559 			cmsg->proto_write.zone_start += (int)wc;
560 			wp_debug("Wrote to %s, %d/%d bytes in chunk %zd, %d/%d fds",
561 					progdesc, cmsg->proto_write.zone_start,
562 					cmsg->proto_write.zone_end, wc,
563 					cmsg->transf_fds.zone_start,
564 					cmsg->transf_fds.zone_end);
565 
566 			if (cmsg->transf_fds.zone_start > 0) {
567 				decref_transferred_fds(&g->map,
568 						cmsg->transf_fds.zone_start,
569 						cmsg->transf_fds.data);
570 				memmove(cmsg->transf_fds.data,
571 						cmsg->transf_fds.data +
572 								cmsg->transf_fds.zone_start,
573 						(size_t)(cmsg->transf_fds.zone_end -
574 								cmsg->transf_fds.zone_start) *
575 								sizeof(int));
576 				cmsg->transf_fds.zone_end -=
577 						cmsg->transf_fds.zone_start;
578 			}
579 		}
580 	}
581 	if (cmsg->proto_write.zone_start == cmsg->proto_write.zone_end) {
582 		wp_debug("Write to the %s succeeded", progdesc);
583 		cmsg->state = CM_WAITING_FOR_CHANNEL;
584 		DTRACE_PROBE(waypipe, chanmsg_channel_wait);
585 	}
586 	return 0;
587 }
advance_chanmsg_transfer(struct globals * g,struct chan_msg_state * cmsg,struct cross_state * cxs,bool display_side,int chanfd,int progfd,bool any_changes)588 static int advance_chanmsg_transfer(struct globals *g,
589 		struct chan_msg_state *cmsg, struct cross_state *cxs,
590 		bool display_side, int chanfd, int progfd, bool any_changes)
591 {
592 	if (!any_changes) {
593 		return 0;
594 	}
595 	if (cmsg->state == CM_WAITING_FOR_CHANNEL) {
596 		return advance_chanmsg_chanread(
597 				cmsg, cxs, chanfd, display_side, g);
598 	} else if (cmsg->state == CM_WAITING_FOR_PROGRAM) {
599 		return advance_chanmsg_progwrite(cmsg, progfd, display_side, g);
600 	}
601 	return 0;
602 }
603 
clear_old_transfers(struct transfer_queue * td,uint32_t inclusive_cutoff)604 static void clear_old_transfers(
605 		struct transfer_queue *td, uint32_t inclusive_cutoff)
606 {
607 	for (int i = 0; i < td->end; i++) {
608 		if (td->vecs[i].iov_len == 0) {
609 			wp_error("Unexpected zero sized item %d [%d,%d)", i,
610 					td->start, td->end);
611 		}
612 	}
613 	int k = 0;
614 	for (int i = 0; i < td->start; i++) {
615 		if (!msgno_gt(inclusive_cutoff, td->meta[i].msgno)) {
616 			break;
617 		}
618 		if (!td->meta[i].static_alloc) {
619 			free(td->vecs[i].iov_base);
620 		}
621 		td->vecs[i].iov_base = NULL;
622 		td->vecs[i].iov_len = 0;
623 		k = i + 1;
624 	}
625 	if (k > 0) {
626 		size_t nshift = (size_t)(td->end - k);
627 		memmove(td->meta, td->meta + k, nshift * sizeof(td->meta[0]));
628 		memmove(td->vecs, td->vecs + k, nshift * sizeof(td->vecs[0]));
629 		td->start -= k;
630 		td->end -= k;
631 	}
632 }
633 
634 /* Returns 0 sucessful -1 if fatal error, -2 if closed */
partial_write_transfer(int chanfd,struct transfer_queue * td,int * total_written,int max_iov)635 static int partial_write_transfer(int chanfd, struct transfer_queue *td,
636 		int *total_written, int max_iov)
637 {
638 	// Waiting for channel write to complete
639 	if (td->start < td->end) {
640 		/* Advance the current element by amount actually written */
641 		char *orig_base = td->vecs[td->start].iov_base;
642 		size_t orig_len = td->vecs[td->start].iov_len;
643 		td->vecs[td->start].iov_base =
644 				orig_base + td->partial_write_amt;
645 		td->vecs[td->start].iov_len = orig_len - td->partial_write_amt;
646 		int count = min(max_iov, td->end - td->start);
647 		ssize_t wr = writev(chanfd, &td->vecs[td->start], count);
648 		td->vecs[td->start].iov_base = orig_base;
649 		td->vecs[td->start].iov_len = orig_len;
650 
651 		if (wr == -1 && (errno == EWOULDBLOCK || errno == EAGAIN)) {
652 			return 0;
653 		} else if (wr == -1 &&
654 				(errno == ECONNRESET || errno == EPIPE)) {
655 			wp_debug("Channel connection closed");
656 			return ERR_DISCONN;
657 		} else if (wr == -1) {
658 			wp_error("chanfd write failure: %s", strerror(errno));
659 			return ERR_FATAL;
660 		}
661 
662 		size_t uwr = (size_t)wr;
663 		*total_written += (int)wr;
664 		while (uwr > 0 && td->start < td->end) {
665 			/* Skip past zero-length blocks */
666 			if (td->vecs[td->start].iov_len == 0) {
667 				td->start++;
668 				continue;
669 			}
670 			size_t left = td->vecs[td->start].iov_len -
671 				      td->partial_write_amt;
672 			if (left > uwr) {
673 				/* Block partially completed */
674 				td->partial_write_amt += uwr;
675 				uwr = 0;
676 			} else {
677 				/* Block completed */
678 				td->partial_write_amt = 0;
679 				uwr -= left;
680 				td->start++;
681 			}
682 		}
683 	}
684 	return 0;
685 }
686 
advance_waymsg_chanwrite(struct way_msg_state * wmsg,struct cross_state * cxs,struct globals * g,int chanfd,bool display_side)687 static int advance_waymsg_chanwrite(struct way_msg_state *wmsg,
688 		struct cross_state *cxs, struct globals *g, int chanfd,
689 		bool display_side)
690 {
691 	const char *progdesc = display_side ? "compositor" : "application";
692 
693 	/* Copy the data in the transfer queue to the write queue. */
694 	(void)transfer_load_async(&wmsg->transfers);
695 
696 	// First, clear out any transfers that are no longer needed
697 	clear_old_transfers(&wmsg->transfers, cxs->last_confirmed_msgno);
698 
699 	/* Acknowledge the other side's transfers as soon as possible */
700 	if (cxs->last_acked_msgno != cxs->last_received_msgno) {
701 		if (transfer_ensure_size(&wmsg->transfers,
702 				    wmsg->transfers.end + 1) == -1) {
703 			wp_error("Failed to allocate space for ack message transfer");
704 			goto ackmsg_fail;
705 		}
706 		/* To avoid infinite regress, receive acknowledgement
707 		 * messages do not themselves increase the message counters. */
708 		uint32_t ack_msgno;
709 		if (wmsg->transfers.start == wmsg->transfers.end) {
710 			ack_msgno = wmsg->transfers.last_msgno;
711 		} else {
712 			ack_msgno = wmsg->transfers.meta[wmsg->transfers.start]
713 						    .msgno;
714 		}
715 
716 		/* This is the next point where messages can be changed */
717 		int next_slot = (wmsg->transfers.partial_write_amt > 0)
718 						? wmsg->transfers.start + 1
719 						: wmsg->transfers.start;
720 		struct wmsg_ack *not_in_prog_msg = NULL;
721 		struct wmsg_ack *queued_msg = NULL;
722 		for (size_t i = 0; i < 2; i++) {
723 			if (wmsg->transfers.partial_write_amt > 0 &&
724 					wmsg->transfers.vecs[wmsg->transfers.start]
725 									.iov_base ==
726 							&wmsg->ack_msgs[i]) {
727 				not_in_prog_msg = &wmsg->ack_msgs[1 - i];
728 			}
729 			if (wmsg->transfers.vecs[next_slot].iov_base ==
730 					&wmsg->ack_msgs[i]) {
731 				queued_msg = &wmsg->ack_msgs[i];
732 			}
733 		}
734 
735 		if (!queued_msg) {
736 			/* Insert a message--which is not partially written--
737 			 * in the next available slot, pushing forward other
738 			 * messages */
739 			if (!not_in_prog_msg) {
740 				queued_msg = &wmsg->ack_msgs[0];
741 			} else {
742 				queued_msg = not_in_prog_msg;
743 			}
744 
745 			if (next_slot < wmsg->transfers.end) {
746 				size_t nmoved = (size_t)(wmsg->transfers.end -
747 							 next_slot);
748 				memmove(wmsg->transfers.vecs + next_slot + 1,
749 						wmsg->transfers.vecs +
750 								next_slot,
751 						sizeof(*wmsg->transfers.vecs) *
752 								nmoved);
753 				memmove(wmsg->transfers.meta + next_slot + 1,
754 						wmsg->transfers.meta +
755 								next_slot,
756 						sizeof(*wmsg->transfers.meta) *
757 								nmoved);
758 			}
759 			wmsg->transfers.vecs[next_slot].iov_len =
760 					sizeof(struct wmsg_ack);
761 			wmsg->transfers.vecs[next_slot].iov_base = queued_msg;
762 			wmsg->transfers.meta[next_slot].msgno = ack_msgno;
763 			wmsg->transfers.meta[next_slot].static_alloc = true;
764 			wmsg->transfers.end++;
765 		}
766 
767 		/* Modify the message which is now next up in the transfer
768 		 * queue */
769 		queued_msg->size_and_type = transfer_header(
770 				sizeof(struct wmsg_ack), WMSG_ACK_NBLOCKS);
771 		queued_msg->messages_received = cxs->last_received_msgno;
772 		cxs->last_acked_msgno = cxs->last_received_msgno;
773 	ackmsg_fail:;
774 	}
775 
776 	int ret = partial_write_transfer(chanfd, &wmsg->transfers,
777 			&wmsg->total_written, wmsg->max_iov);
778 	if (ret < 0) {
779 		return ret;
780 	}
781 
782 	bool is_done = false;
783 	struct task_data task;
784 	bool has_task = request_work_task(&g->threads, &task, &is_done);
785 
786 	/* Run a task ourselves, making use of the main thread */
787 	if (has_task) {
788 		run_task(&task, &g->threads.threads[0]);
789 
790 		pthread_mutex_lock(&g->threads.work_mutex);
791 		g->threads.tasks_in_progress--;
792 		pthread_mutex_unlock(&g->threads.work_mutex);
793 		/* To skip the next poll */
794 		uint8_t triv = 0;
795 		if (write(g->threads.selfpipe_w, &triv, 1) == -1) {
796 			wp_error("Failed to write to self-pipe");
797 		}
798 	}
799 
800 	if (is_done) {
801 		/* It's possible for the last task to complete between
802 		 * `transfer_load_async` and `request_work_task` in this
803 		 * function, so copy out any remaining messages.`*/
804 		(void)transfer_load_async(&wmsg->transfers);
805 	}
806 
807 	if (is_done && wmsg->ntrailing > 0) {
808 		for (int i = 0; i < wmsg->ntrailing; i++) {
809 			transfer_add(&wmsg->transfers,
810 					wmsg->trailing[i].iov_len,
811 					wmsg->trailing[i].iov_base);
812 		}
813 
814 		wmsg->ntrailing = 0;
815 		memset(wmsg->trailing, 0, sizeof(wmsg->trailing));
816 	}
817 
818 	if (wmsg->transfers.start == wmsg->transfers.end && is_done) {
819 		for (struct shadow_fd_link *lcur = g->map.link.l_next,
820 					   *lnxt = lcur->l_next;
821 				lcur != &g->map.link;
822 				lcur = lnxt, lnxt = lcur->l_next) {
823 			/* Note: finish_update() may delete `cur` */
824 			struct shadow_fd *cur = (struct shadow_fd *)lcur;
825 			finish_update(cur);
826 			destroy_shadow_if_unreferenced(cur);
827 		}
828 
829 		/* Reset work queue */
830 		pthread_mutex_lock(&g->threads.work_mutex);
831 		if (g->threads.stack_count > 0 ||
832 				g->threads.tasks_in_progress > 0) {
833 			wp_error("Multithreading state failure");
834 		}
835 		g->threads.do_work = false;
836 		g->threads.stack_count = 0;
837 		g->threads.tasks_in_progress = 0;
838 		pthread_mutex_unlock(&g->threads.work_mutex);
839 
840 		DTRACE_PROBE(waypipe, channel_write_end);
841 		size_t unacked_bytes = 0;
842 		for (int i = 0; i < wmsg->transfers.end; i++) {
843 			unacked_bytes += wmsg->transfers.vecs[i].iov_len;
844 		}
845 
846 		wp_debug("Sent %d-byte message from %s to channel; %zu-bytes in flight",
847 				wmsg->total_written, progdesc, unacked_bytes);
848 
849 		/* do not delete the used transfers yet; we need a remote
850 		 * acknowledgement */
851 		wmsg->total_written = 0;
852 		wmsg->state = WM_WAITING_FOR_PROGRAM;
853 	}
854 	return 0;
855 }
advance_waymsg_progread(struct way_msg_state * wmsg,struct globals * g,int progfd,bool display_side,bool progsock_readable)856 static int advance_waymsg_progread(struct way_msg_state *wmsg,
857 		struct globals *g, int progfd, bool display_side,
858 		bool progsock_readable)
859 {
860 	const char *progdesc = display_side ? "compositor" : "application";
861 	// We have data to read from programs/pipes
862 	bool new_proto_data = false;
863 	int old_fbuffer_end = wmsg->fds.zone_end;
864 	if (progsock_readable) {
865 		// Read /once/
866 		ssize_t rc = iovec_read(progfd,
867 				wmsg->proto_read.data +
868 						wmsg->proto_read.zone_end,
869 				(size_t)(wmsg->proto_read.size -
870 						wmsg->proto_read.zone_end),
871 				&wmsg->fds);
872 		if (rc == -1 && (errno == EWOULDBLOCK || errno == EAGAIN)) {
873 			// do nothing
874 		} else if (rc == 0 || (rc == -1 && errno == ECONNRESET)) {
875 			wp_error("%s has closed", progdesc);
876 			// state transitions handled in main loop
877 			return ERR_STOP;
878 		} else if (rc == -1) {
879 			wp_error("%s read failure: %s", progdesc,
880 					strerror(errno));
881 			return ERR_FATAL;
882 		} else {
883 			// We have successfully read some data.
884 			wmsg->proto_read.zone_end += (int)rc;
885 			new_proto_data = true;
886 		}
887 	}
888 
889 	if (new_proto_data) {
890 		wp_debug("Read %d new file descriptors, have %d total now",
891 				wmsg->fds.zone_end - old_fbuffer_end,
892 				wmsg->fds.zone_end);
893 
894 		if (buf_ensure_size(wmsg->proto_read.size + 1024, 1,
895 				    &wmsg->proto_write.size,
896 				    (void **)&wmsg->proto_write.data) == -1) {
897 			wp_error("Allocation failure for message workspace");
898 			return ERR_NOMEM;
899 		}
900 
901 		wmsg->proto_write.zone_start = 0;
902 		wmsg->proto_write.zone_end = 0;
903 		parse_and_prune_messages(g, display_side, !display_side,
904 				&wmsg->proto_read, &wmsg->proto_write,
905 				&wmsg->fds);
906 
907 		/* Recycle partial message bytes */
908 		if (wmsg->proto_read.zone_start > 0) {
909 			if (wmsg->proto_read.zone_end >
910 					wmsg->proto_read.zone_start) {
911 				memmove(wmsg->proto_read.data,
912 						wmsg->proto_read.data +
913 								wmsg->proto_read.zone_start,
914 						(size_t)(wmsg->proto_read.zone_end -
915 								wmsg->proto_read.zone_start));
916 			}
917 			wmsg->proto_read.zone_end -=
918 					wmsg->proto_read.zone_start;
919 			wmsg->proto_read.zone_start = 0;
920 		}
921 	}
922 
923 	read_readable_pipes(&g->map);
924 
925 	for (struct shadow_fd_link *lcur = g->map.link.l_next,
926 				   *lnxt = lcur->l_next;
927 			lcur != &g->map.link;
928 			lcur = lnxt, lnxt = lcur->l_next) {
929 		/* Note: finish_update() may delete `cur` */
930 		struct shadow_fd *cur = (struct shadow_fd *)lcur;
931 		collect_update(&g->threads, cur, &wmsg->transfers,
932 				g->config->old_video_mode);
933 		/* collecting updates can reset `pipe.remote_can_X` state, so
934 		 * garbage collect the sfd immediately after */
935 		destroy_shadow_if_unreferenced(cur);
936 	}
937 
938 	int num_mt_tasks = start_parallel_work(
939 			&g->threads, &wmsg->transfers.async_recv_queue);
940 
941 	if (new_proto_data) {
942 		/* Send all file descriptors which have been used by the
943 		 * protocol parser, translating them if this has not already
944 		 * been done */
945 		if (wmsg->fds.zone_start > 0) {
946 			size_t act_size = (size_t)wmsg->fds.zone_start *
947 							  sizeof(int32_t) +
948 					  sizeof(uint32_t);
949 			uint32_t *msg = malloc(act_size);
950 			if (!msg) {
951 				// TODO: use a ring buffer for allocations,
952 				// and figure out how to block until it is clear
953 				wp_error("Failed to allocate file desc tx msg");
954 				return ERR_NOMEM;
955 			}
956 			msg[0] = transfer_header(act_size, WMSG_INJECT_RIDS);
957 			int32_t *rbuffer = (int32_t *)(msg + 1);
958 
959 			/* Translate and adjust refcounts */
960 			if (translate_fds(&g->map, &g->render,
961 					    wmsg->fds.zone_start,
962 					    wmsg->fds.data, rbuffer) == -1) {
963 				free(msg);
964 				return ERR_FATAL;
965 			}
966 			decref_transferred_rids(
967 					&g->map, wmsg->fds.zone_start, rbuffer);
968 			memmove(wmsg->fds.data,
969 					wmsg->fds.data + wmsg->fds.zone_start,
970 					sizeof(int) * (size_t)(wmsg->fds.zone_end -
971 								      wmsg->fds.zone_start));
972 			wmsg->fds.zone_end -= wmsg->fds.zone_start;
973 			wmsg->fds.zone_start = 0;
974 
975 			/* Add message to trailing queue */
976 			wmsg->trailing[wmsg->ntrailing].iov_len = act_size;
977 			wmsg->trailing[wmsg->ntrailing].iov_base = msg;
978 			wmsg->ntrailing++;
979 		}
980 		if (wmsg->proto_write.zone_end > 0) {
981 			wp_debug("We are transferring a data buffer with %d bytes",
982 					wmsg->proto_write.zone_end);
983 			size_t act_size = (size_t)wmsg->proto_write.zone_end +
984 					  sizeof(uint32_t);
985 			uint32_t protoh = transfer_header(
986 					act_size, WMSG_PROTOCOL);
987 
988 			uint8_t *copy_proto = malloc(alignz(act_size, 4));
989 			if (!copy_proto) {
990 				wp_error("Failed to allocate protocol tx msg");
991 				return ERR_NOMEM;
992 			}
993 			memcpy(copy_proto, &protoh, sizeof(uint32_t));
994 			memcpy(copy_proto + sizeof(uint32_t),
995 					wmsg->proto_write.data,
996 					(size_t)wmsg->proto_write.zone_end);
997 			memset(copy_proto + sizeof(uint32_t) +
998 							wmsg->proto_write
999 									.zone_end,
1000 					0, alignz(act_size, 4) - act_size);
1001 
1002 			wmsg->trailing[wmsg->ntrailing].iov_len =
1003 					alignz(act_size, 4);
1004 			wmsg->trailing[wmsg->ntrailing].iov_base = copy_proto;
1005 			wmsg->ntrailing++;
1006 		}
1007 	}
1008 
1009 	int n_transfers = wmsg->transfers.end - wmsg->transfers.start;
1010 	size_t net_bytes = 0;
1011 	for (int i = wmsg->transfers.start; i < wmsg->transfers.end; i++) {
1012 		net_bytes += wmsg->transfers.vecs[i].iov_len;
1013 	}
1014 
1015 	if (n_transfers > 0 || num_mt_tasks > 0 || wmsg->ntrailing > 0) {
1016 		wp_debug("Channel message start (%d blobs, %d bytes, %d trailing, %d tasks)",
1017 				n_transfers, net_bytes, wmsg->ntrailing,
1018 				num_mt_tasks);
1019 		wmsg->state = WM_WAITING_FOR_CHANNEL;
1020 		DTRACE_PROBE(waypipe, channel_write_start);
1021 	}
1022 	return 0;
1023 }
advance_waymsg_transfer(struct globals * g,struct way_msg_state * wmsg,struct cross_state * cxs,bool display_side,int chanfd,int progfd,bool progsock_readable)1024 static int advance_waymsg_transfer(struct globals *g,
1025 		struct way_msg_state *wmsg, struct cross_state *cxs,
1026 		bool display_side, int chanfd, int progfd,
1027 		bool progsock_readable)
1028 {
1029 	if (wmsg->state == WM_WAITING_FOR_CHANNEL) {
1030 		return advance_waymsg_chanwrite(
1031 				wmsg, cxs, g, chanfd, display_side);
1032 	} else if (wmsg->state == WM_WAITING_FOR_PROGRAM) {
1033 		return advance_waymsg_progread(wmsg, g, progfd, display_side,
1034 				progsock_readable);
1035 	}
1036 	return 0;
1037 }
1038 
read_new_chanfd(int linkfd,struct int_window * recon_fds)1039 static int read_new_chanfd(int linkfd, struct int_window *recon_fds)
1040 {
1041 	uint8_t tmp = 0;
1042 	ssize_t rd = iovec_read(linkfd, (char *)&tmp, 1, recon_fds);
1043 	if (rd == -1 && (errno == EWOULDBLOCK || errno == EAGAIN)) {
1044 		// do nothing
1045 		return -1;
1046 	} else if (rd == 0 || (rd == -1 && errno == ECONNRESET)) {
1047 		wp_error("link has closed");
1048 		// sentinel value, to indicate that linkfd should be closed
1049 		return -2;
1050 	} else if (rd == -1) {
1051 		wp_error("link read failure: %s", strerror(errno));
1052 		return -1;
1053 	}
1054 	for (int i = 0; i < recon_fds->zone_end - 1; i++) {
1055 		checked_close(recon_fds->data[i]);
1056 	}
1057 	int ret_fd = -1;
1058 	if (recon_fds->zone_end > 0) {
1059 		ret_fd = recon_fds->data[recon_fds->zone_end - 1];
1060 	}
1061 	recon_fds->zone_end = 0;
1062 	return ret_fd;
1063 }
1064 
reconnect_loop(int linkfd,int progfd,struct int_window * recon_fds)1065 static int reconnect_loop(int linkfd, int progfd, struct int_window *recon_fds)
1066 {
1067 	while (!shutdown_flag) {
1068 		struct pollfd rcfs[2];
1069 		rcfs[0].fd = linkfd;
1070 		rcfs[0].events = POLLIN;
1071 		rcfs[0].revents = 0;
1072 		rcfs[1].fd = progfd;
1073 		rcfs[1].events = 0;
1074 		rcfs[1].revents = 0;
1075 		int r = poll(rcfs, 2, -1);
1076 		if (r == -1) {
1077 			if (errno == EINTR) {
1078 				continue;
1079 			} else {
1080 				break;
1081 			}
1082 		}
1083 		if (rcfs[0].revents & POLLIN) {
1084 			int nfd = read_new_chanfd(linkfd, recon_fds);
1085 			if (nfd != -1) {
1086 				return nfd;
1087 			}
1088 		}
1089 		if (rcfs[0].revents & POLLHUP || rcfs[1].revents & POLLHUP) {
1090 			return -1;
1091 		}
1092 	}
1093 	return -1;
1094 }
1095 
reset_connection(struct cross_state * cxs,struct chan_msg_state * cmsg,struct way_msg_state * wmsg,int chanfd)1096 static void reset_connection(struct cross_state *cxs,
1097 		struct chan_msg_state *cmsg, struct way_msg_state *wmsg,
1098 		int chanfd)
1099 {
1100 	/* Discard partial read transfer, throwing away complete but unread
1101 	 * messages, and trailing remnants */
1102 	cmsg->recv_end = 0;
1103 	cmsg->recv_start = 0;
1104 	cmsg->recv_unhandled_messages = 0;
1105 
1106 	clear_old_transfers(&wmsg->transfers, cxs->last_confirmed_msgno);
1107 	wp_debug("Resetting connection: %d blocks unacknowledged",
1108 			wmsg->transfers.end);
1109 	if (wmsg->transfers.end > 0) {
1110 		/* If there was any data in flight, restart. If there wasn't
1111 		 * anything in flight, then the remote side shouldn't notice the
1112 		 * difference */
1113 		struct wmsg_restart restart;
1114 		restart.size_and_type =
1115 				transfer_header(sizeof(restart), WMSG_RESTART);
1116 		restart.last_ack_received = cxs->last_confirmed_msgno;
1117 		wmsg->transfers.start = 0;
1118 		wmsg->transfers.partial_write_amt = 0;
1119 		wp_debug("Sending restart message: last ack=%d",
1120 				restart.last_ack_received);
1121 		if (write(chanfd, &restart, sizeof(restart)) !=
1122 				sizeof(restart)) {
1123 			wp_error("Failed to write restart message");
1124 		}
1125 	}
1126 
1127 	if (set_nonblocking(chanfd) == -1) {
1128 		wp_error("Error making new channel connection nonblocking: %s",
1129 				strerror(errno));
1130 	}
1131 
1132 	(void)cxs;
1133 }
1134 
set_connections_nonblocking(int chanfd,int progfd,int linkfd,bool display_side)1135 static int set_connections_nonblocking(
1136 		int chanfd, int progfd, int linkfd, bool display_side)
1137 {
1138 	const char *progdesc = display_side ? "compositor" : "application";
1139 	if (set_nonblocking(chanfd) == -1) {
1140 		wp_error("Error making channel connection nonblocking: %s",
1141 				strerror(errno));
1142 		return -1;
1143 	}
1144 	if (set_nonblocking(progfd) == -1) {
1145 		wp_error("Error making %s connection nonblocking: %s", progdesc,
1146 				strerror(errno));
1147 		return -1;
1148 	}
1149 	if (linkfd != -1 && set_nonblocking(linkfd) == -1) {
1150 		wp_error("Error making link connection nonblocking: %s",
1151 				strerror(errno));
1152 		return -1;
1153 	}
1154 	return 0;
1155 }
1156 
main_interface_loop(int chanfd,int progfd,int linkfd,const struct main_config * config,bool display_side)1157 int main_interface_loop(int chanfd, int progfd, int linkfd,
1158 		const struct main_config *config, bool display_side)
1159 {
1160 	if (set_connections_nonblocking(chanfd, progfd, linkfd, display_side) ==
1161 			-1) {
1162 		if (linkfd != -1) {
1163 			checked_close(linkfd);
1164 		}
1165 		checked_close(chanfd);
1166 		checked_close(progfd);
1167 		return EXIT_FAILURE;
1168 	}
1169 	const char *progdesc = display_side ? "compositor" : "application";
1170 	wp_debug("Running main loop on %s side", progdesc);
1171 
1172 	struct way_msg_state way_msg;
1173 	memset(&way_msg, 0, sizeof(way_msg));
1174 	struct chan_msg_state chan_msg;
1175 	memset(&chan_msg, 0, sizeof(chan_msg));
1176 	struct cross_state cross_data;
1177 	memset(&cross_data, 0, sizeof(cross_data));
1178 	struct globals g;
1179 	memset(&g, 0, sizeof(g));
1180 
1181 	way_msg.state = WM_WAITING_FOR_PROGRAM;
1182 	/* AFAIK, there is no documented upper bound for the size of a
1183 	 * Wayland protocol message, but libwayland (in wl_buffer_put)
1184 	 * effectively limits message sizes to 4096 bytes. We must
1185 	 * therefore adopt a limit as least as large. */
1186 	const int max_read_size = 4096;
1187 	way_msg.proto_read.size = max_read_size;
1188 	way_msg.proto_read.data = malloc((size_t)way_msg.proto_read.size);
1189 	way_msg.fds.size = 128;
1190 	way_msg.fds.data = malloc((size_t)way_msg.fds.size * sizeof(int));
1191 	way_msg.proto_write.size = 2 * max_read_size;
1192 	way_msg.proto_write.data = malloc((size_t)way_msg.proto_write.size);
1193 	way_msg.max_iov = get_iov_max();
1194 	int mut_ret = pthread_mutex_init(
1195 			&way_msg.transfers.async_recv_queue.lock, NULL);
1196 	if (mut_ret) {
1197 		wp_error("Mutex creation failed: %s", strerror(mut_ret));
1198 		goto init_failure_cleanup;
1199 	}
1200 
1201 	chan_msg.state = CM_WAITING_FOR_CHANNEL;
1202 	chan_msg.recv_size = 2 * RECV_GOAL_READ_SIZE;
1203 	chan_msg.recv_buffer = malloc((size_t)chan_msg.recv_size);
1204 	chan_msg.proto_write.size = max_read_size * 2;
1205 	chan_msg.proto_write.data = malloc((size_t)chan_msg.proto_write.size);
1206 	if (!chan_msg.proto_write.data || !chan_msg.recv_buffer ||
1207 			!way_msg.proto_write.data || !way_msg.fds.data ||
1208 			!way_msg.proto_read.data) {
1209 		wp_error("Failed to allocate a message scratch buffer");
1210 		goto init_failure_cleanup;
1211 	}
1212 
1213 	/* The first packet received will be #1 */
1214 	way_msg.transfers.last_msgno = 1;
1215 
1216 	g.config = config;
1217 	g.render = (struct render_data){
1218 			.drm_node_path = config->drm_node,
1219 			.drm_fd = -1,
1220 			.dev = NULL,
1221 			.disabled = config->no_gpu,
1222 			.av_disabled = config->no_gpu ||
1223 				       !config->prefer_hwvideo,
1224 			.av_bpf = config->video_bpf,
1225 			.av_video_fmt = (int)config->video_fmt,
1226 			.av_hwdevice_ref = NULL,
1227 			.av_drmdevice_ref = NULL,
1228 			.av_vadisplay = NULL,
1229 			.av_copy_config = 0,
1230 	};
1231 	if (setup_thread_pool(&g.threads, config->compression,
1232 			    config->compression_level,
1233 			    config->n_worker_threads) == -1) {
1234 		goto init_failure_cleanup;
1235 	}
1236 	setup_translation_map(&g.map, display_side);
1237 	if (init_message_tracker(&g.tracker) == -1) {
1238 		goto init_failure_cleanup;
1239 	}
1240 
1241 	struct int_window recon_fds = {
1242 			.data = NULL,
1243 			.size = 0,
1244 			.zone_start = 0,
1245 			.zone_end = 0,
1246 	};
1247 
1248 	bool needs_new_channel = false;
1249 	struct pollfd *pfds = NULL;
1250 	int pfds_size = 0;
1251 	int exit_code = 0;
1252 	while (!shutdown_flag && exit_code == 0 &&
1253 			!(way_msg.state == WM_TERMINAL &&
1254 					chan_msg.state == CM_TERMINAL)) {
1255 		int psize = 4 + count_npipes(&g.map);
1256 		if (buf_ensure_size(psize, sizeof(struct pollfd), &pfds_size,
1257 				    (void **)&pfds) == -1) {
1258 			wp_error("Allocation failure, not enough space for pollfds");
1259 			exit_code = ERR_NOMEM;
1260 			break;
1261 		}
1262 		pfds[0].fd = chanfd;
1263 		pfds[1].fd = progfd;
1264 		pfds[2].fd = linkfd;
1265 		pfds[3].fd = g.threads.selfpipe_r;
1266 		pfds[0].events = 0;
1267 		pfds[1].events = 0;
1268 		pfds[2].events = POLLIN;
1269 		pfds[3].events = POLLIN;
1270 		if (way_msg.state == WM_WAITING_FOR_CHANNEL) {
1271 			pfds[0].events |= POLLOUT;
1272 		} else if (way_msg.state == WM_WAITING_FOR_PROGRAM) {
1273 			pfds[1].events |= POLLIN;
1274 		}
1275 		if (chan_msg.state == CM_WAITING_FOR_CHANNEL) {
1276 			pfds[0].events |= POLLIN;
1277 		} else if (chan_msg.state == CM_WAITING_FOR_PROGRAM) {
1278 			pfds[1].events |= POLLOUT;
1279 		}
1280 		bool check_read = way_msg.state == WM_WAITING_FOR_PROGRAM;
1281 		int npoll = 4 + fill_with_pipes(&g.map, pfds + 4, check_read);
1282 
1283 		bool own_msg_pending =
1284 				(cross_data.last_acked_msgno !=
1285 						cross_data.last_received_msgno) &&
1286 				way_msg.state == WM_WAITING_FOR_PROGRAM;
1287 		bool unread_chan_msgs =
1288 				chan_msg.state == CM_WAITING_FOR_CHANNEL &&
1289 				chan_msg.recv_unhandled_messages > 0;
1290 
1291 		int poll_delay = -1;
1292 		if (unread_chan_msgs) {
1293 			/* There is work to do, so continue */
1294 			poll_delay = 0;
1295 		}
1296 		if (own_msg_pending) {
1297 			/* To coalesce acknowledgements, we wait for a minimum
1298 			 * amount */
1299 			poll_delay = 20;
1300 		}
1301 		int r = poll(pfds, (nfds_t)npoll, poll_delay);
1302 		if (r == -1) {
1303 			if (errno == EINTR) {
1304 				wp_error("poll interrupted: shutdown=%c",
1305 						shutdown_flag ? 'Y' : 'n');
1306 				continue;
1307 			} else {
1308 				wp_error("poll failed due to, stopping: %s",
1309 						strerror(errno));
1310 				exit_code = ERR_FATAL;
1311 				break;
1312 			}
1313 		}
1314 		if (pfds[3].revents & POLLIN) {
1315 			/* After the self pipe has been used to wake up the
1316 			 * connection, drain it */
1317 			char tmp[64];
1318 			(void)read(g.threads.selfpipe_r, tmp, sizeof(tmp));
1319 		}
1320 
1321 		mark_pipe_object_statuses(&g.map, npoll - 4, pfds + 4);
1322 		/* POLLHUP sometimes implies POLLIN, but not on all systems.
1323 		 * Checking POLLHUP|POLLIN means that we can detect EOF when
1324 		 * we actually do try to read from the sockets, but also, if
1325 		 * there was data in the pipe just before the hang up, then we
1326 		 * can read and handle that data. */
1327 		bool progsock_readable = pfds[1].revents & (POLLIN | POLLHUP);
1328 		bool chanmsg_active = (pfds[0].revents & (POLLIN | POLLHUP)) ||
1329 				      (pfds[1].revents & POLLOUT) ||
1330 				      unread_chan_msgs;
1331 
1332 		bool maybe_new_channel = (pfds[2].revents & (POLLIN | POLLHUP));
1333 		if (maybe_new_channel) {
1334 			int new_fd = read_new_chanfd(linkfd, &recon_fds);
1335 			if (new_fd >= 0) {
1336 				if (chanfd != -1) {
1337 					checked_close(chanfd);
1338 				}
1339 				chanfd = new_fd;
1340 				reset_connection(&cross_data, &chan_msg,
1341 						&way_msg, chanfd);
1342 				needs_new_channel = false;
1343 			} else if (new_fd == -2) {
1344 				wp_error("Link to root process hang-up detected");
1345 				checked_close(linkfd);
1346 				linkfd = -1;
1347 			}
1348 		}
1349 		if (needs_new_channel && linkfd != -1) {
1350 			wp_error("Channel hang up detected, waiting for reconnection");
1351 			int new_fd = reconnect_loop(linkfd, progfd, &recon_fds);
1352 			if (new_fd < 0) {
1353 				// -1 is read failure or misc error, -2 is HUP
1354 				exit_code = ERR_FATAL;
1355 				break;
1356 			} else {
1357 				/* Actually handle the reconnection/reset state
1358 				 */
1359 				if (chanfd != -1) {
1360 					checked_close(chanfd);
1361 				}
1362 				chanfd = new_fd;
1363 				reset_connection(&cross_data, &chan_msg,
1364 						&way_msg, chanfd);
1365 				needs_new_channel = false;
1366 			}
1367 		} else if (needs_new_channel) {
1368 			wp_error("Channel hang up detected, no reconnection link, fatal");
1369 			exit_code = ERR_FATAL;
1370 			break;
1371 		}
1372 
1373 		// Q: randomize the order of these?, to highlight
1374 		// accidental dependencies?
1375 		for (int m = 0; m < 2; m++) {
1376 			int tr;
1377 			if (m == 0) {
1378 				tr = advance_chanmsg_transfer(&g, &chan_msg,
1379 						&cross_data, display_side,
1380 						chanfd, progfd, chanmsg_active);
1381 			} else {
1382 				tr = advance_waymsg_transfer(&g, &way_msg,
1383 						&cross_data, display_side,
1384 						chanfd, progfd,
1385 						progsock_readable);
1386 			}
1387 
1388 			if (tr >= 0) {
1389 				/* do nothing */
1390 			} else if (tr == ERR_DISCONN) {
1391 				/* Channel connection has at least
1392 				 * partially been shut down, so close it
1393 				 * fully. */
1394 				checked_close(chanfd);
1395 				chanfd = -1;
1396 				if (linkfd == -1) {
1397 					wp_error("Channel hang up detected, no reconnection link, fatal");
1398 					exit_code = ERR_FATAL;
1399 					break;
1400 				}
1401 				needs_new_channel = true;
1402 			} else if (tr == ERR_STOP) {
1403 				if (m == 0) {
1404 					/* Stop returned while writing: Wayland
1405 					 * connection has at least partially
1406 					 * shut down, so close it fully. */
1407 					checked_close(progfd);
1408 					progfd = -1;
1409 				} else {
1410 					/* Stop returned while reading */
1411 					checked_close(progfd);
1412 					progfd = -1;
1413 					if (way_msg.state ==
1414 							WM_WAITING_FOR_PROGRAM) {
1415 						way_msg.state = WM_TERMINAL;
1416 					}
1417 					if (chan_msg.state == CM_WAITING_FOR_PROGRAM ||
1418 							chan_msg.recv_start ==
1419 									chan_msg.recv_end) {
1420 						chan_msg.state = CM_TERMINAL;
1421 					}
1422 				}
1423 			} else {
1424 				/* Fatal error, close and flush */
1425 				exit_code = tr;
1426 				break;
1427 			}
1428 
1429 			/* If the program connection has closed, and
1430 			 * there waypipe is not currently transferring
1431 			 * any message to the channel, then shutdown the
1432 			 * program->channel transfers. (The reverse
1433 			 * situation with the chnanel connection is not
1434 			 * a cause for permanent closure, thanks to
1435 			 * reconnection support */
1436 			if (progfd == -1) {
1437 				if (way_msg.state == WM_WAITING_FOR_PROGRAM) {
1438 					way_msg.state = WM_TERMINAL;
1439 				}
1440 				if (chan_msg.state == CM_WAITING_FOR_PROGRAM ||
1441 						chan_msg.recv_start ==
1442 								chan_msg.recv_end) {
1443 					chan_msg.state = CM_TERMINAL;
1444 				}
1445 			}
1446 		}
1447 
1448 		// Periodic maintenance. It doesn't matter who does this
1449 		flush_writable_pipes(&g.map);
1450 	}
1451 	free(pfds);
1452 	free(recon_fds.data);
1453 	wp_debug("Exiting main loop (%d, %d, %d), attempting close message",
1454 			exit_code, way_msg.state, chan_msg.state);
1455 
1456 init_failure_cleanup:
1457 	/* It's possible, but very very unlikely, that waypipe gets closed
1458 	 * while Wayland protocol messages are being written to the program
1459 	 * and the most recent message was only partially written. */
1460 	exit_code = ERR_FATAL;
1461 	if (chan_msg.proto_write.zone_start != chan_msg.proto_write.zone_end) {
1462 		wp_debug("Final write to %s was incomplete, %d/%d", progdesc,
1463 				chan_msg.proto_write.zone_start,
1464 				chan_msg.proto_write.zone_end);
1465 	}
1466 
1467 	if (!display_side && progfd != -1) {
1468 		char error[128];
1469 		if (exit_code == ERR_FATAL) {
1470 			size_t len = print_display_error(error, sizeof(error),
1471 					3, "waypipe internal error");
1472 			if (write(progfd, error, len) == -1) {
1473 				wp_error("Failed to send waypipe error notification: %s",
1474 						strerror(errno));
1475 			}
1476 		} else if (exit_code == ERR_NOMEM) {
1477 			size_t len = print_display_error(
1478 					error, sizeof(error), 2, "no memory");
1479 			if (write(progfd, error, len) == -1) {
1480 				wp_error("Failed to send OOM notification: %s",
1481 						strerror(errno));
1482 			}
1483 		}
1484 	}
1485 
1486 	/* Attempt to notify remote end that the application has closed,
1487 	 * waiting at most for a very short amount of time */
1488 	if (way_msg.transfers.start != way_msg.transfers.end) {
1489 		wp_error("Final write to channel was incomplete, %d+%zu/%d",
1490 				way_msg.transfers.start,
1491 				way_msg.transfers.partial_write_amt,
1492 				way_msg.transfers.end);
1493 	}
1494 
1495 	if (chanfd != -1) {
1496 		struct pollfd close_poll;
1497 		close_poll.fd = chanfd;
1498 		close_poll.events = POLLOUT;
1499 		int close_ret = poll(&close_poll, 1, 200);
1500 		if (close_ret == 0) {
1501 			wp_debug("Exit poll timed out");
1502 		}
1503 		uint32_t close_msg[2];
1504 		close_msg[0] = transfer_header(sizeof(close_msg), WMSG_CLOSE);
1505 		close_msg[1] = exit_code == ERR_STOP ? 0 : (uint32_t)exit_code;
1506 		wp_debug("Sending close message, modecode=%d", close_msg[1]);
1507 		if (write(chanfd, &close_msg, sizeof(close_msg)) == -1) {
1508 			wp_error("Failed to send close notification: %s",
1509 					strerror(errno));
1510 		}
1511 	} else {
1512 		wp_debug("Channel closed, hence no close notification");
1513 	}
1514 
1515 	cleanup_thread_pool(&g.threads);
1516 	cleanup_message_tracker(&g.tracker);
1517 	cleanup_translation_map(&g.map);
1518 	cleanup_render_data(&g.render);
1519 	cleanup_hwcontext(&g.render);
1520 	free(way_msg.proto_read.data);
1521 	free(way_msg.proto_write.data);
1522 	free(way_msg.fds.data);
1523 	cleanup_transfer_queue(&way_msg.transfers);
1524 	for (int i = 0; i < way_msg.ntrailing; i++) {
1525 		free(way_msg.trailing[i].iov_base);
1526 	}
1527 	free(chan_msg.transf_fds.data);
1528 	free(chan_msg.proto_fds.data);
1529 	free(chan_msg.recv_buffer);
1530 	free(chan_msg.proto_write.data);
1531 
1532 	if (chanfd != -1) {
1533 		checked_close(chanfd);
1534 	}
1535 	if (progfd != -1) {
1536 		checked_close(progfd);
1537 	}
1538 	if (linkfd != -1) {
1539 		checked_close(linkfd);
1540 	}
1541 	return EXIT_SUCCESS;
1542 }
1543