1 /* PipeWire
2  *
3  * Copyright © 2018 Wim Taymans
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining a
6  * copy of this software and associated documentation files (the "Software"),
7  * to deal in the Software without restriction, including without limitation
8  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9  * and/or sell copies of the Software, and to permit persons to whom the
10  * Software is furnished to do so, subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice (including the next
13  * paragraph) shall be included in all copies or substantial portions of the
14  * Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.  IN NO EVENT SHALL
19  * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
22  * DEALINGS IN THE SOFTWARE.
23  */
24 
25 #include <stdint.h>
26 #include <stddef.h>
27 #include <stdio.h>
28 #include <string.h>
29 #include <errno.h>
30 #include <unistd.h>
31 #include <fcntl.h>
32 #include <sys/socket.h>
33 
34 #include <spa/utils/result.h>
35 #include <spa/pod/builder.h>
36 
37 #include <pipewire/pipewire.h>
38 
39 PW_LOG_TOPIC_EXTERN(mod_topic);
40 #define PW_LOG_TOPIC_DEFAULT mod_topic
41 PW_LOG_TOPIC_EXTERN(mod_topic_connection);
42 
43 #undef spa_debug
44 #define spa_debug(...) pw_logt_debug(mod_topic_connection, __VA_ARGS__)
45 #include <spa/debug/pod.h>
46 
47 #include "connection.h"
48 
49 #define MAX_BUFFER_SIZE (1024 * 32)
50 #define MAX_FDS 1024u
51 #define MAX_FDS_MSG 28
52 
53 #define HDR_SIZE_V0	8
54 #define HDR_SIZE	16
55 
56 struct buffer {
57 	uint8_t *buffer_data;
58 	size_t buffer_size;
59 	size_t buffer_maxsize;
60 	int fds[MAX_FDS];
61 	uint32_t n_fds;
62 
63 	uint32_t seq;
64 	size_t offset;
65 	size_t fds_offset;
66 	struct pw_protocol_native_message msg;
67 };
68 
69 struct reenter_item {
70 	void *old_buffer_data;
71 	struct pw_protocol_native_message return_msg;
72 	struct spa_list link;
73 };
74 
75 struct impl {
76 	struct pw_protocol_native_connection this;
77 	struct pw_context *context;
78 
79 	struct buffer in, out;
80 	struct spa_pod_builder builder;
81 
82 	struct spa_list reenter_stack;
83 	uint32_t pending_reentering;
84 
85 	uint32_t version;
86 	size_t hdr_size;
87 };
88 
89 /** \endcond */
90 
91 /** Get an fd from a connection
92  *
93  * \param conn the connection
94  * \param index the index of the fd to get
95  * \return the fd at \a index or -ENOENT when no such fd exists
96  *
97  * \memberof pw_protocol_native_connection
98  */
pw_protocol_native_connection_get_fd(struct pw_protocol_native_connection * conn,uint32_t index)99 int pw_protocol_native_connection_get_fd(struct pw_protocol_native_connection *conn, uint32_t index)
100 {
101 	struct impl *impl = SPA_CONTAINER_OF(conn, struct impl, this);
102 	struct buffer *buf = &impl->in;
103 
104 	if (index == SPA_ID_INVALID)
105 		return -1;
106 
107 	if (index >= buf->msg.n_fds)
108 		return -ENOENT;
109 
110 	return buf->msg.fds[index];
111 }
112 
113 /** Add an fd to a connection
114  *
115  * \param conn the connection
116  * \param fd the fd to add
117  * \return the index of the fd or SPA_IDX_INVALID when an error occurred
118  *
119  * \memberof pw_protocol_native_connection
120  */
pw_protocol_native_connection_add_fd(struct pw_protocol_native_connection * conn,int fd)121 uint32_t pw_protocol_native_connection_add_fd(struct pw_protocol_native_connection *conn, int fd)
122 {
123 	struct impl *impl = SPA_CONTAINER_OF(conn, struct impl, this);
124 	struct buffer *buf = &impl->out;
125 	uint32_t index, i;
126 
127 	if (fd < 0)
128 		return SPA_IDX_INVALID;
129 
130 	for (i = 0; i < buf->msg.n_fds; i++) {
131 		if (buf->msg.fds[i] == fd)
132 			return i;
133 	}
134 
135 	index = buf->msg.n_fds;
136 	if (index + buf->n_fds >= MAX_FDS) {
137 		pw_log_error("connection %p: too many fds (%d)", conn, MAX_FDS);
138 		return SPA_IDX_INVALID;
139 	}
140 
141 	buf->msg.fds[index] = fcntl(fd, F_DUPFD_CLOEXEC, 0);
142 	buf->msg.n_fds++;
143 	pw_log_debug("connection %p: add fd %d at index %d", conn, fd, index);
144 
145 	return index;
146 }
147 
connection_ensure_size(struct pw_protocol_native_connection * conn,struct buffer * buf,size_t size)148 static void *connection_ensure_size(struct pw_protocol_native_connection *conn, struct buffer *buf, size_t size)
149 {
150 	int res;
151 
152 	if (buf->buffer_size + size > buf->buffer_maxsize) {
153 		buf->buffer_maxsize = SPA_ROUND_UP_N(buf->buffer_size + size, MAX_BUFFER_SIZE);
154 		buf->buffer_data = realloc(buf->buffer_data, buf->buffer_maxsize);
155 		if (buf->buffer_data == NULL) {
156 			res = -errno;
157 			buf->buffer_maxsize = 0;
158 			spa_hook_list_call(&conn->listener_list,
159 					struct pw_protocol_native_connection_events,
160 					error, 0, -res);
161 			errno = -res;
162 			return NULL;
163 		}
164 		pw_log_debug("connection %p: resize buffer to %zd %zd %zd",
165 			    conn, buf->buffer_size, size, buf->buffer_maxsize);
166 	}
167 	return (uint8_t *) buf->buffer_data + buf->buffer_size;
168 }
169 
handle_connection_error(struct pw_protocol_native_connection * conn,int res)170 static void handle_connection_error(struct pw_protocol_native_connection *conn, int res)
171 {
172 	if (res == EPIPE || res == ECONNRESET)
173 		pw_log_info("connection %p: could not recvmsg on fd:%d: %s", conn, conn->fd, strerror(res));
174 	else
175 		pw_log_error("connection %p: could not recvmsg on fd:%d: %s", conn, conn->fd, strerror(res));
176 }
177 
refill_buffer(struct pw_protocol_native_connection * conn,struct buffer * buf)178 static int refill_buffer(struct pw_protocol_native_connection *conn, struct buffer *buf)
179 {
180 	ssize_t len;
181 	struct cmsghdr *cmsg;
182 	struct msghdr msg = { 0 };
183 	struct iovec iov[1];
184 	char cmsgbuf[CMSG_SPACE(MAX_FDS_MSG * sizeof(int))];
185 	int n_fds = 0;
186 	size_t avail;
187 
188 	avail = buf->buffer_maxsize - buf->buffer_size;
189 
190 	iov[0].iov_base = buf->buffer_data + buf->buffer_size;
191 	iov[0].iov_len = avail;
192 	msg.msg_iov = iov;
193 	msg.msg_iovlen = 1;
194 	msg.msg_control = cmsgbuf;
195 	msg.msg_controllen = sizeof(cmsgbuf);
196 	msg.msg_flags = MSG_CMSG_CLOEXEC | MSG_DONTWAIT;
197 
198 	while (true) {
199 		len = recvmsg(conn->fd, &msg, msg.msg_flags);
200 		if (msg.msg_flags & MSG_CTRUNC)
201 			return -EPROTO;
202 		if (len == 0 && avail != 0)
203 			return -EPIPE;
204 		else if (len < 0) {
205 			if (errno == EINTR)
206 				continue;
207 			if (errno != EAGAIN && errno != EWOULDBLOCK)
208 				goto recv_error;
209 			return -EAGAIN;
210 		}
211 		break;
212 	}
213 
214 	buf->buffer_size += len;
215 
216 	/* handle control messages */
217 	for (cmsg = CMSG_FIRSTHDR(&msg); cmsg != NULL; cmsg = CMSG_NXTHDR(&msg, cmsg)) {
218 		if (cmsg->cmsg_level != SOL_SOCKET || cmsg->cmsg_type != SCM_RIGHTS)
219 			continue;
220 
221 		n_fds =
222 		    (cmsg->cmsg_len - ((char *) CMSG_DATA(cmsg) - (char *) cmsg)) / sizeof(int);
223 		if (n_fds + buf->n_fds > MAX_FDS)
224 			return -EPROTO;
225 		memcpy(&buf->fds[buf->n_fds], CMSG_DATA(cmsg), n_fds * sizeof(int));
226 		buf->n_fds += n_fds;
227 	}
228 	pw_log_trace("connection %p: %d read %zd bytes and %d fds", conn, conn->fd, len,
229 		     n_fds);
230 
231 	return 0;
232 
233 	/* ERRORS */
234 recv_error:
235 	handle_connection_error(conn, errno);
236 	return -errno;
237 }
238 
clear_buffer(struct buffer * buf,bool fds)239 static void clear_buffer(struct buffer *buf, bool fds)
240 {
241 	uint32_t i;
242 	if (fds) {
243 		for (i = 0; i < buf->n_fds; i++)
244 			close(buf->fds[i]);
245 	}
246 	buf->n_fds = 0;
247 	buf->buffer_size = 0;
248 	buf->offset = 0;
249 	buf->fds_offset = 0;
250 }
251 
252 /** Prepare connection for calling from reentered context.
253  *
254  * This ensures that message buffers returned by get_next are not invalidated by additional
255  * calls made after enter. Leave invalidates the buffers at the higher stack level.
256  *
257  * \memberof pw_protocol_native_connection
258  */
pw_protocol_native_connection_enter(struct pw_protocol_native_connection * conn)259 void pw_protocol_native_connection_enter(struct pw_protocol_native_connection *conn)
260 {
261 	struct impl *impl = SPA_CONTAINER_OF(conn, struct impl, this);
262 
263 	/* Postpone processing until get_next is actually called */
264 	++impl->pending_reentering;
265 }
266 
pop_reenter_stack(struct impl * impl,uint32_t count)267 static void pop_reenter_stack(struct impl *impl, uint32_t count)
268 {
269 	while (count > 0) {
270 		struct reenter_item *item;
271 
272 		item = spa_list_last(&impl->reenter_stack, struct reenter_item, link);
273 		spa_list_remove(&item->link);
274 
275 		free(item->return_msg.fds);
276 		free(item->old_buffer_data);
277 		free(item);
278 
279 		--count;
280 	}
281 }
282 
pw_protocol_native_connection_leave(struct pw_protocol_native_connection * conn)283 void pw_protocol_native_connection_leave(struct pw_protocol_native_connection *conn)
284 {
285 	struct impl *impl = SPA_CONTAINER_OF(conn, struct impl, this);
286 
287 	if (impl->pending_reentering > 0) {
288 		--impl->pending_reentering;
289 	} else {
290 		pw_log_trace("connection %p: reenter: pop", impl);
291 		pop_reenter_stack(impl, 1);
292 	}
293 }
294 
ensure_stack_level(struct impl * impl,struct pw_protocol_native_message ** msg)295 static int ensure_stack_level(struct impl *impl, struct pw_protocol_native_message **msg)
296 {
297 	void *data;
298 	struct buffer *buf = &impl->in;
299 	struct reenter_item *item, *new_item = NULL;
300 
301 	item = spa_list_last(&impl->reenter_stack, struct reenter_item, link);
302 
303 	if (SPA_LIKELY(impl->pending_reentering == 0)) {
304 		new_item = item;
305 	} else {
306 		uint32_t new_count;
307 
308 		pw_log_trace("connection %p: reenter: push %d levels",
309 		             impl, impl->pending_reentering);
310 
311 		/* Append empty item(s) to the reenter stack */
312 		for (new_count = 0; new_count < impl->pending_reentering; ++new_count) {
313 			new_item = calloc(1, sizeof(struct reenter_item));
314 			if (new_item == NULL) {
315 				pop_reenter_stack(impl, new_count);
316 				return -ENOMEM;
317 			}
318 			spa_list_append(&impl->reenter_stack, &new_item->link);
319 		}
320 
321 		/*
322 		 * Stack level increased: we have to switch to a new message data buffer, because
323 		 * data of returned messages is contained in the buffer and might still be in
324 		 * use on the lower stack levels.
325 		 *
326 		 * We stash the buffer for the previous stack level, and allocate a new one for
327 		 * the new stack level.  If there was a previous buffer for the previous level, we
328 		 * know its contents are no longer in use (the only active buffer at that stack
329 		 * level is buf->buffer_data), and we can recycle it as the new buffer (realloc
330 		 * instead of calloc).
331 		 *
332 		 * The current data contained in the buffer needs to be copied to the new buffer.
333 		 */
334 
335 		data = realloc(item->old_buffer_data, buf->buffer_maxsize);
336 		if (data == NULL) {
337 			pop_reenter_stack(impl, new_count);
338 			return -ENOMEM;
339 		}
340 
341 		item->old_buffer_data = buf->buffer_data;
342 
343 		memcpy(data, buf->buffer_data, buf->buffer_size);
344 		buf->buffer_data = data;
345 
346 		impl->pending_reentering = 0;
347 	}
348 	if (new_item == NULL)
349 		return -EIO;
350 
351 	/* Ensure fds buffer is allocated */
352 	if (SPA_UNLIKELY(new_item->return_msg.fds == NULL)) {
353 		data = calloc(MAX_FDS, sizeof(int));
354 		if (data == NULL)
355 			return -ENOMEM;
356 		new_item->return_msg.fds = data;
357 	}
358 
359 	*msg = &new_item->return_msg;
360 
361 	return 0;
362 }
363 
364 /** Make a new connection object for the given socket
365  *
366  * \param fd the socket
367  * \returns a newly allocated connection object
368  *
369  * \memberof pw_protocol_native_connection
370  */
pw_protocol_native_connection_new(struct pw_context * context,int fd)371 struct pw_protocol_native_connection *pw_protocol_native_connection_new(struct pw_context *context, int fd)
372 {
373 	struct impl *impl;
374 	struct pw_protocol_native_connection *this;
375 	struct reenter_item *reenter_item;
376 
377 	impl = calloc(1, sizeof(struct impl));
378 	if (impl == NULL)
379 		return NULL;
380 
381 	impl->context = context;
382 
383 	this = &impl->this;
384 
385 	pw_log_debug("connection %p: new fd:%d", this, fd);
386 
387 	this->fd = fd;
388 	spa_hook_list_init(&this->listener_list);
389 
390 	impl->hdr_size = HDR_SIZE;
391 	impl->version = 3;
392 
393 	impl->out.buffer_data = calloc(1, MAX_BUFFER_SIZE);
394 	impl->out.buffer_maxsize = MAX_BUFFER_SIZE;
395 	impl->in.buffer_data = calloc(1, MAX_BUFFER_SIZE);
396 	impl->in.buffer_maxsize = MAX_BUFFER_SIZE;
397 
398 	reenter_item = calloc(1, sizeof(struct reenter_item));
399 
400 	if (impl->out.buffer_data == NULL || impl->in.buffer_data == NULL || reenter_item == NULL)
401 		goto no_mem;
402 
403 	spa_list_init(&impl->reenter_stack);
404 	spa_list_append(&impl->reenter_stack, &reenter_item->link);
405 
406 	return this;
407 
408 no_mem:
409 	free(impl->out.buffer_data);
410 	free(impl->in.buffer_data);
411 	free(reenter_item);
412 	free(impl);
413 	return NULL;
414 }
415 
pw_protocol_native_connection_set_fd(struct pw_protocol_native_connection * conn,int fd)416 int pw_protocol_native_connection_set_fd(struct pw_protocol_native_connection *conn, int fd)
417 {
418 	pw_log_debug("connection %p: fd:%d", conn, fd);
419 	conn->fd = fd;
420 	return 0;
421 }
422 
423 /** Destroy a connection
424  *
425  * \param conn the connection to destroy
426  *
427  * \memberof pw_protocol_native_connection
428  */
pw_protocol_native_connection_destroy(struct pw_protocol_native_connection * conn)429 void pw_protocol_native_connection_destroy(struct pw_protocol_native_connection *conn)
430 {
431 	struct impl *impl = SPA_CONTAINER_OF(conn, struct impl, this);
432 
433 	pw_log_debug("connection %p: destroy", conn);
434 
435 	spa_hook_list_call(&conn->listener_list, struct pw_protocol_native_connection_events, destroy, 0);
436 
437 	spa_hook_list_clean(&conn->listener_list);
438 
439 	clear_buffer(&impl->out, true);
440 	clear_buffer(&impl->in, true);
441 	free(impl->out.buffer_data);
442 	free(impl->in.buffer_data);
443 
444 	while (!spa_list_is_empty(&impl->reenter_stack))
445 		pop_reenter_stack(impl, 1);
446 
447 	free(impl);
448 }
449 
prepare_packet(struct pw_protocol_native_connection * conn,struct buffer * buf)450 static int prepare_packet(struct pw_protocol_native_connection *conn, struct buffer *buf)
451 {
452 	struct impl *impl = SPA_CONTAINER_OF(conn, struct impl, this);
453 	uint8_t *data;
454 	size_t size, len;
455 	uint32_t *p;
456 
457 	data = buf->buffer_data + buf->offset;
458 	size = buf->buffer_size - buf->offset;
459 
460 	if (size < impl->hdr_size)
461 		return impl->hdr_size;
462 
463 	p = (uint32_t *) data;
464 
465 	buf->msg.id = p[0];
466 	buf->msg.opcode = p[1] >> 24;
467 	len = p[1] & 0xffffff;
468 
469 	if (buf->msg.id == 0 && buf->msg.opcode == 1) {
470 		if (p[3] >= 4) {
471 			pw_log_warn("old version detected");
472 			impl->version = 0;
473 			impl->hdr_size = HDR_SIZE_V0;
474 		} else {
475 			impl->version = 3;
476 			impl->hdr_size = HDR_SIZE;
477 		}
478 		spa_hook_list_call(&conn->listener_list,
479 				struct pw_protocol_native_connection_events,
480 				start, 0, impl->version);
481 	}
482 	if (impl->version >= 3) {
483 		buf->msg.seq = p[2];
484 		buf->msg.n_fds = p[3];
485 	} else {
486 		buf->msg.seq = 0;
487 		buf->msg.n_fds = 0;
488 	}
489 
490 	data += impl->hdr_size;
491 	size -= impl->hdr_size;
492 	buf->msg.fds = &buf->fds[buf->fds_offset];
493 
494 	if (buf->msg.n_fds + buf->fds_offset > MAX_FDS)
495 		return -EPROTO;
496 
497 	if (size < len)
498 		return len;
499 
500 	buf->msg.size = len;
501 	buf->msg.data = data;
502 
503 	buf->offset += impl->hdr_size + len;
504 	buf->fds_offset += buf->msg.n_fds;
505 
506 	if (buf->offset >= buf->buffer_size)
507 		clear_buffer(buf, false);
508 
509 	return 0;
510 }
511 
512 /** Move to the next packet in the connection
513  *
514  * \param conn the connection
515  * \param opcode address of result opcode
516  * \param dest_id address of result destination id
517  * \param dt pointer to packet data
518  * \param sz size of packet data
519  * \return true on success
520  *
521  * Get the next packet in \a conn and store the opcode and destination
522  * id as well as the packet data and size.
523  *
524  * \memberof pw_protocol_native_connection
525  */
526 int
pw_protocol_native_connection_get_next(struct pw_protocol_native_connection * conn,const struct pw_protocol_native_message ** msg)527 pw_protocol_native_connection_get_next(struct pw_protocol_native_connection *conn,
528 		const struct pw_protocol_native_message **msg)
529 {
530 	struct impl *impl = SPA_CONTAINER_OF(conn, struct impl, this);
531 	int len, res;
532 	struct buffer *buf;
533 	struct pw_protocol_native_message *return_msg;
534 	int *fds;
535 
536 	if ((res = ensure_stack_level(impl, &return_msg)) < 0)
537 		return res;
538 
539 	buf = &impl->in;
540 
541 	while (1) {
542 		len = prepare_packet(conn, buf);
543 		if (len < 0)
544 			return len;
545 		if (len == 0)
546 			break;
547 
548 		if (connection_ensure_size(conn, buf, len) == NULL)
549 			return -errno;
550 		if ((res = refill_buffer(conn, buf)) < 0)
551 			return res;
552 	}
553 
554 	/* Returned msg struct should be safe vs. reentering */
555 	fds = return_msg->fds;
556 	*return_msg = buf->msg;
557 	if (buf->msg.n_fds > 0) {
558 		memcpy(fds, buf->msg.fds, buf->msg.n_fds * sizeof(int));
559 	}
560 	return_msg->fds = fds;
561 
562 	*msg = return_msg;
563 
564 	return 1;
565 }
566 
begin_write(struct pw_protocol_native_connection * conn,uint32_t size)567 static inline void *begin_write(struct pw_protocol_native_connection *conn, uint32_t size)
568 {
569 	struct impl *impl = SPA_CONTAINER_OF(conn, struct impl, this);
570 	uint32_t *p;
571 	struct buffer *buf = &impl->out;
572 	/* header and size for payload */
573 	if ((p = connection_ensure_size(conn, buf, impl->hdr_size + size)) == NULL)
574 		return NULL;
575 
576 	return SPA_PTROFF(p, impl->hdr_size, void);
577 }
578 
builder_overflow(void * data,uint32_t size)579 static int builder_overflow(void *data, uint32_t size)
580 {
581 	struct impl *impl = data;
582 	struct spa_pod_builder *b = &impl->builder;
583 
584 	b->size = SPA_ROUND_UP_N(size, 4096);
585 	if ((b->data = begin_write(&impl->this, b->size)) == NULL)
586 		return -errno;
587         return 0;
588 }
589 
590 static const struct spa_pod_builder_callbacks builder_callbacks = {
591 	SPA_VERSION_POD_BUILDER_CALLBACKS,
592 	.overflow = builder_overflow
593 };
594 
595 struct spa_pod_builder *
pw_protocol_native_connection_begin(struct pw_protocol_native_connection * conn,uint32_t id,uint8_t opcode,struct pw_protocol_native_message ** msg)596 pw_protocol_native_connection_begin(struct pw_protocol_native_connection *conn,
597 			uint32_t id, uint8_t opcode,
598 			struct pw_protocol_native_message **msg)
599 {
600 	struct impl *impl = SPA_CONTAINER_OF(conn, struct impl, this);
601 	struct buffer *buf = &impl->out;
602 
603 	buf->msg.id = id;
604 	buf->msg.opcode = opcode;
605 	impl->builder = SPA_POD_BUILDER_INIT(NULL, 0);
606 	spa_pod_builder_set_callbacks(&impl->builder, &builder_callbacks, impl);
607 	if (impl->version >= 3) {
608 		buf->msg.n_fds = 0;
609 		buf->msg.fds = &buf->fds[buf->n_fds];
610 	} else {
611 		buf->msg.n_fds = buf->n_fds;
612 		buf->msg.fds = &buf->fds[0];
613 	}
614 
615 	buf->msg.seq = buf->seq;
616 	if (msg)
617 		*msg = &buf->msg;
618 	return &impl->builder;
619 }
620 
621 int
pw_protocol_native_connection_end(struct pw_protocol_native_connection * conn,struct spa_pod_builder * builder)622 pw_protocol_native_connection_end(struct pw_protocol_native_connection *conn,
623 				  struct spa_pod_builder *builder)
624 {
625 	struct impl *impl = SPA_CONTAINER_OF(conn, struct impl, this);
626 	uint32_t *p, size = builder->state.offset;
627 	struct buffer *buf = &impl->out;
628 	int res;
629 
630 	if ((p = connection_ensure_size(conn, buf, impl->hdr_size + size)) == NULL)
631 		return -errno;
632 
633 	p[0] = buf->msg.id;
634 	p[1] = (buf->msg.opcode << 24) | (size & 0xffffff);
635 	if (impl->version >= 3) {
636 		p[2] = buf->msg.seq;
637 		p[3] = buf->msg.n_fds;
638 	}
639 
640 	buf->buffer_size += impl->hdr_size + size;
641 	if (impl->version >= 3)
642 		buf->n_fds += buf->msg.n_fds;
643 	else
644 		buf->n_fds = buf->msg.n_fds;
645 
646 	if (mod_topic_connection->level >= SPA_LOG_LEVEL_DEBUG) {
647 		pw_log_debug(">>>>>>>>> out: id:%d op:%d size:%d seq:%d",
648 				buf->msg.id, buf->msg.opcode, size, buf->msg.seq);
649 	        spa_debug_pod(0, NULL, SPA_PTROFF(p, impl->hdr_size, struct spa_pod));
650 	}
651 
652 	buf->seq = (buf->seq + 1) & SPA_ASYNC_SEQ_MASK;
653 	res = SPA_RESULT_RETURN_ASYNC(buf->msg.seq);
654 
655 	spa_hook_list_call(&conn->listener_list,
656 			struct pw_protocol_native_connection_events, need_flush, 0);
657 
658 	return res;
659 }
660 
661 /** Flush the connection object
662  *
663  * \param conn the connection object
664  * \return 0 on success < 0 error code on error
665  *
666  * Write the queued messages on the connection to the socket
667  *
668  * \memberof pw_protocol_native_connection
669  */
pw_protocol_native_connection_flush(struct pw_protocol_native_connection * conn)670 int pw_protocol_native_connection_flush(struct pw_protocol_native_connection *conn)
671 {
672 	struct impl *impl = SPA_CONTAINER_OF(conn, struct impl, this);
673 	ssize_t sent, outsize;
674 	struct msghdr msg = { 0 };
675 	struct iovec iov[1];
676 	struct cmsghdr *cmsg;
677 	char cmsgbuf[CMSG_SPACE(MAX_FDS_MSG * sizeof(int))];
678 	int res = 0, *fds;
679 	uint32_t fds_len, to_close, n_fds, outfds, i;
680 	struct buffer *buf;
681 	void *data;
682 	size_t size;
683 
684 	buf = &impl->out;
685 	data = buf->buffer_data;
686 	size = buf->buffer_size;
687 	fds = buf->fds;
688 	n_fds = buf->n_fds;
689 	to_close = 0;
690 
691 	while (size > 0) {
692 		if (n_fds > MAX_FDS_MSG) {
693 			outfds = MAX_FDS_MSG;
694 			outsize = SPA_MIN(sizeof(uint32_t), size);
695 		} else {
696 			outfds = n_fds;
697 			outsize = size;
698 		}
699 
700 		fds_len = outfds * sizeof(int);
701 
702 		iov[0].iov_base = data;
703 		iov[0].iov_len = outsize;
704 		msg.msg_iov = iov;
705 		msg.msg_iovlen = 1;
706 
707 		if (outfds > 0) {
708 			msg.msg_control = cmsgbuf;
709 			msg.msg_controllen = CMSG_SPACE(fds_len);
710 			cmsg = CMSG_FIRSTHDR(&msg);
711 			cmsg->cmsg_level = SOL_SOCKET;
712 			cmsg->cmsg_type = SCM_RIGHTS;
713 			cmsg->cmsg_len = CMSG_LEN(fds_len);
714 			memcpy(CMSG_DATA(cmsg), fds, fds_len);
715 			msg.msg_controllen = cmsg->cmsg_len;
716 		} else {
717 			msg.msg_control = NULL;
718 			msg.msg_controllen = 0;
719 		}
720 
721 		while (true) {
722 			sent = sendmsg(conn->fd, &msg, MSG_NOSIGNAL | MSG_DONTWAIT);
723 			if (sent < 0) {
724 				if (errno == EINTR)
725 					continue;
726 				else {
727 					res = -errno;
728 					goto exit;
729 				}
730 			}
731 			break;
732 		}
733 		pw_log_trace("connection %p: %d written %zd bytes and %u fds", conn, conn->fd, sent,
734 			     outfds);
735 
736 		size -= sent;
737 		data = SPA_PTROFF(data, sent, void);
738 		n_fds -= outfds;
739 		fds += outfds;
740 		to_close += outfds;
741 	}
742 
743 	res = 0;
744 
745 exit:
746 	if (size > 0)
747 		memmove(buf->buffer_data, data, size);
748 	buf->buffer_size = size;
749 	for (i = 0; i < to_close; i++)
750 		close(buf->fds[i]);
751 	if (n_fds > 0)
752 		memmove(buf->fds, fds, n_fds * sizeof(int));
753 	buf->n_fds = n_fds;
754 	return res;
755 }
756 
757 /** Clear the connection object
758  *
759  * \param conn the connection object
760  * \return 0 on success
761  *
762  * Remove all queued messages from \a conn
763  *
764  * \memberof pw_protocol_native_connection
765  */
pw_protocol_native_connection_clear(struct pw_protocol_native_connection * conn)766 int pw_protocol_native_connection_clear(struct pw_protocol_native_connection *conn)
767 {
768 	struct impl *impl = SPA_CONTAINER_OF(conn, struct impl, this);
769 
770 	clear_buffer(&impl->out, true);
771 	clear_buffer(&impl->in, true);
772 
773 	return 0;
774 }
775