1 /* PipeWire
2 *
3 * Copyright © 2018 Wim Taymans
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining a
6 * copy of this software and associated documentation files (the "Software"),
7 * to deal in the Software without restriction, including without limitation
8 * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9 * and/or sell copies of the Software, and to permit persons to whom the
10 * Software is furnished to do so, subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice (including the next
13 * paragraph) shall be included in all copies or substantial portions of the
14 * Software.
15 *
16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
19 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
22 * DEALINGS IN THE SOFTWARE.
23 */
24
25 #include <stdint.h>
26 #include <stddef.h>
27 #include <stdio.h>
28 #include <string.h>
29 #include <errno.h>
30 #include <unistd.h>
31 #include <fcntl.h>
32 #include <sys/socket.h>
33
34 #include <spa/utils/result.h>
35 #include <spa/pod/builder.h>
36
37 #include <pipewire/pipewire.h>
38
39 PW_LOG_TOPIC_EXTERN(mod_topic);
40 #define PW_LOG_TOPIC_DEFAULT mod_topic
41 PW_LOG_TOPIC_EXTERN(mod_topic_connection);
42
43 #undef spa_debug
44 #define spa_debug(...) pw_logt_debug(mod_topic_connection, __VA_ARGS__)
45 #include <spa/debug/pod.h>
46
47 #include "connection.h"
48
49 #define MAX_BUFFER_SIZE (1024 * 32)
50 #define MAX_FDS 1024u
51 #define MAX_FDS_MSG 28
52
53 #define HDR_SIZE_V0 8
54 #define HDR_SIZE 16
55
56 struct buffer {
57 uint8_t *buffer_data;
58 size_t buffer_size;
59 size_t buffer_maxsize;
60 int fds[MAX_FDS];
61 uint32_t n_fds;
62
63 uint32_t seq;
64 size_t offset;
65 size_t fds_offset;
66 struct pw_protocol_native_message msg;
67 };
68
69 struct reenter_item {
70 void *old_buffer_data;
71 struct pw_protocol_native_message return_msg;
72 struct spa_list link;
73 };
74
75 struct impl {
76 struct pw_protocol_native_connection this;
77 struct pw_context *context;
78
79 struct buffer in, out;
80 struct spa_pod_builder builder;
81
82 struct spa_list reenter_stack;
83 uint32_t pending_reentering;
84
85 uint32_t version;
86 size_t hdr_size;
87 };
88
89 /** \endcond */
90
91 /** Get an fd from a connection
92 *
93 * \param conn the connection
94 * \param index the index of the fd to get
95 * \return the fd at \a index or -ENOENT when no such fd exists
96 *
97 * \memberof pw_protocol_native_connection
98 */
pw_protocol_native_connection_get_fd(struct pw_protocol_native_connection * conn,uint32_t index)99 int pw_protocol_native_connection_get_fd(struct pw_protocol_native_connection *conn, uint32_t index)
100 {
101 struct impl *impl = SPA_CONTAINER_OF(conn, struct impl, this);
102 struct buffer *buf = &impl->in;
103
104 if (index == SPA_ID_INVALID)
105 return -1;
106
107 if (index >= buf->msg.n_fds)
108 return -ENOENT;
109
110 return buf->msg.fds[index];
111 }
112
113 /** Add an fd to a connection
114 *
115 * \param conn the connection
116 * \param fd the fd to add
117 * \return the index of the fd or SPA_IDX_INVALID when an error occurred
118 *
119 * \memberof pw_protocol_native_connection
120 */
pw_protocol_native_connection_add_fd(struct pw_protocol_native_connection * conn,int fd)121 uint32_t pw_protocol_native_connection_add_fd(struct pw_protocol_native_connection *conn, int fd)
122 {
123 struct impl *impl = SPA_CONTAINER_OF(conn, struct impl, this);
124 struct buffer *buf = &impl->out;
125 uint32_t index, i;
126
127 if (fd < 0)
128 return SPA_IDX_INVALID;
129
130 for (i = 0; i < buf->msg.n_fds; i++) {
131 if (buf->msg.fds[i] == fd)
132 return i;
133 }
134
135 index = buf->msg.n_fds;
136 if (index + buf->n_fds >= MAX_FDS) {
137 pw_log_error("connection %p: too many fds (%d)", conn, MAX_FDS);
138 return SPA_IDX_INVALID;
139 }
140
141 buf->msg.fds[index] = fcntl(fd, F_DUPFD_CLOEXEC, 0);
142 buf->msg.n_fds++;
143 pw_log_debug("connection %p: add fd %d at index %d", conn, fd, index);
144
145 return index;
146 }
147
connection_ensure_size(struct pw_protocol_native_connection * conn,struct buffer * buf,size_t size)148 static void *connection_ensure_size(struct pw_protocol_native_connection *conn, struct buffer *buf, size_t size)
149 {
150 int res;
151
152 if (buf->buffer_size + size > buf->buffer_maxsize) {
153 buf->buffer_maxsize = SPA_ROUND_UP_N(buf->buffer_size + size, MAX_BUFFER_SIZE);
154 buf->buffer_data = realloc(buf->buffer_data, buf->buffer_maxsize);
155 if (buf->buffer_data == NULL) {
156 res = -errno;
157 buf->buffer_maxsize = 0;
158 spa_hook_list_call(&conn->listener_list,
159 struct pw_protocol_native_connection_events,
160 error, 0, -res);
161 errno = -res;
162 return NULL;
163 }
164 pw_log_debug("connection %p: resize buffer to %zd %zd %zd",
165 conn, buf->buffer_size, size, buf->buffer_maxsize);
166 }
167 return (uint8_t *) buf->buffer_data + buf->buffer_size;
168 }
169
handle_connection_error(struct pw_protocol_native_connection * conn,int res)170 static void handle_connection_error(struct pw_protocol_native_connection *conn, int res)
171 {
172 if (res == EPIPE || res == ECONNRESET)
173 pw_log_info("connection %p: could not recvmsg on fd:%d: %s", conn, conn->fd, strerror(res));
174 else
175 pw_log_error("connection %p: could not recvmsg on fd:%d: %s", conn, conn->fd, strerror(res));
176 }
177
refill_buffer(struct pw_protocol_native_connection * conn,struct buffer * buf)178 static int refill_buffer(struct pw_protocol_native_connection *conn, struct buffer *buf)
179 {
180 ssize_t len;
181 struct cmsghdr *cmsg;
182 struct msghdr msg = { 0 };
183 struct iovec iov[1];
184 char cmsgbuf[CMSG_SPACE(MAX_FDS_MSG * sizeof(int))];
185 int n_fds = 0;
186 size_t avail;
187
188 avail = buf->buffer_maxsize - buf->buffer_size;
189
190 iov[0].iov_base = buf->buffer_data + buf->buffer_size;
191 iov[0].iov_len = avail;
192 msg.msg_iov = iov;
193 msg.msg_iovlen = 1;
194 msg.msg_control = cmsgbuf;
195 msg.msg_controllen = sizeof(cmsgbuf);
196 msg.msg_flags = MSG_CMSG_CLOEXEC | MSG_DONTWAIT;
197
198 while (true) {
199 len = recvmsg(conn->fd, &msg, msg.msg_flags);
200 if (msg.msg_flags & MSG_CTRUNC)
201 return -EPROTO;
202 if (len == 0 && avail != 0)
203 return -EPIPE;
204 else if (len < 0) {
205 if (errno == EINTR)
206 continue;
207 if (errno != EAGAIN && errno != EWOULDBLOCK)
208 goto recv_error;
209 return -EAGAIN;
210 }
211 break;
212 }
213
214 buf->buffer_size += len;
215
216 /* handle control messages */
217 for (cmsg = CMSG_FIRSTHDR(&msg); cmsg != NULL; cmsg = CMSG_NXTHDR(&msg, cmsg)) {
218 if (cmsg->cmsg_level != SOL_SOCKET || cmsg->cmsg_type != SCM_RIGHTS)
219 continue;
220
221 n_fds =
222 (cmsg->cmsg_len - ((char *) CMSG_DATA(cmsg) - (char *) cmsg)) / sizeof(int);
223 if (n_fds + buf->n_fds > MAX_FDS)
224 return -EPROTO;
225 memcpy(&buf->fds[buf->n_fds], CMSG_DATA(cmsg), n_fds * sizeof(int));
226 buf->n_fds += n_fds;
227 }
228 pw_log_trace("connection %p: %d read %zd bytes and %d fds", conn, conn->fd, len,
229 n_fds);
230
231 return 0;
232
233 /* ERRORS */
234 recv_error:
235 handle_connection_error(conn, errno);
236 return -errno;
237 }
238
clear_buffer(struct buffer * buf,bool fds)239 static void clear_buffer(struct buffer *buf, bool fds)
240 {
241 uint32_t i;
242 if (fds) {
243 for (i = 0; i < buf->n_fds; i++)
244 close(buf->fds[i]);
245 }
246 buf->n_fds = 0;
247 buf->buffer_size = 0;
248 buf->offset = 0;
249 buf->fds_offset = 0;
250 }
251
252 /** Prepare connection for calling from reentered context.
253 *
254 * This ensures that message buffers returned by get_next are not invalidated by additional
255 * calls made after enter. Leave invalidates the buffers at the higher stack level.
256 *
257 * \memberof pw_protocol_native_connection
258 */
pw_protocol_native_connection_enter(struct pw_protocol_native_connection * conn)259 void pw_protocol_native_connection_enter(struct pw_protocol_native_connection *conn)
260 {
261 struct impl *impl = SPA_CONTAINER_OF(conn, struct impl, this);
262
263 /* Postpone processing until get_next is actually called */
264 ++impl->pending_reentering;
265 }
266
pop_reenter_stack(struct impl * impl,uint32_t count)267 static void pop_reenter_stack(struct impl *impl, uint32_t count)
268 {
269 while (count > 0) {
270 struct reenter_item *item;
271
272 item = spa_list_last(&impl->reenter_stack, struct reenter_item, link);
273 spa_list_remove(&item->link);
274
275 free(item->return_msg.fds);
276 free(item->old_buffer_data);
277 free(item);
278
279 --count;
280 }
281 }
282
pw_protocol_native_connection_leave(struct pw_protocol_native_connection * conn)283 void pw_protocol_native_connection_leave(struct pw_protocol_native_connection *conn)
284 {
285 struct impl *impl = SPA_CONTAINER_OF(conn, struct impl, this);
286
287 if (impl->pending_reentering > 0) {
288 --impl->pending_reentering;
289 } else {
290 pw_log_trace("connection %p: reenter: pop", impl);
291 pop_reenter_stack(impl, 1);
292 }
293 }
294
ensure_stack_level(struct impl * impl,struct pw_protocol_native_message ** msg)295 static int ensure_stack_level(struct impl *impl, struct pw_protocol_native_message **msg)
296 {
297 void *data;
298 struct buffer *buf = &impl->in;
299 struct reenter_item *item, *new_item = NULL;
300
301 item = spa_list_last(&impl->reenter_stack, struct reenter_item, link);
302
303 if (SPA_LIKELY(impl->pending_reentering == 0)) {
304 new_item = item;
305 } else {
306 uint32_t new_count;
307
308 pw_log_trace("connection %p: reenter: push %d levels",
309 impl, impl->pending_reentering);
310
311 /* Append empty item(s) to the reenter stack */
312 for (new_count = 0; new_count < impl->pending_reentering; ++new_count) {
313 new_item = calloc(1, sizeof(struct reenter_item));
314 if (new_item == NULL) {
315 pop_reenter_stack(impl, new_count);
316 return -ENOMEM;
317 }
318 spa_list_append(&impl->reenter_stack, &new_item->link);
319 }
320
321 /*
322 * Stack level increased: we have to switch to a new message data buffer, because
323 * data of returned messages is contained in the buffer and might still be in
324 * use on the lower stack levels.
325 *
326 * We stash the buffer for the previous stack level, and allocate a new one for
327 * the new stack level. If there was a previous buffer for the previous level, we
328 * know its contents are no longer in use (the only active buffer at that stack
329 * level is buf->buffer_data), and we can recycle it as the new buffer (realloc
330 * instead of calloc).
331 *
332 * The current data contained in the buffer needs to be copied to the new buffer.
333 */
334
335 data = realloc(item->old_buffer_data, buf->buffer_maxsize);
336 if (data == NULL) {
337 pop_reenter_stack(impl, new_count);
338 return -ENOMEM;
339 }
340
341 item->old_buffer_data = buf->buffer_data;
342
343 memcpy(data, buf->buffer_data, buf->buffer_size);
344 buf->buffer_data = data;
345
346 impl->pending_reentering = 0;
347 }
348 if (new_item == NULL)
349 return -EIO;
350
351 /* Ensure fds buffer is allocated */
352 if (SPA_UNLIKELY(new_item->return_msg.fds == NULL)) {
353 data = calloc(MAX_FDS, sizeof(int));
354 if (data == NULL)
355 return -ENOMEM;
356 new_item->return_msg.fds = data;
357 }
358
359 *msg = &new_item->return_msg;
360
361 return 0;
362 }
363
364 /** Make a new connection object for the given socket
365 *
366 * \param fd the socket
367 * \returns a newly allocated connection object
368 *
369 * \memberof pw_protocol_native_connection
370 */
pw_protocol_native_connection_new(struct pw_context * context,int fd)371 struct pw_protocol_native_connection *pw_protocol_native_connection_new(struct pw_context *context, int fd)
372 {
373 struct impl *impl;
374 struct pw_protocol_native_connection *this;
375 struct reenter_item *reenter_item;
376
377 impl = calloc(1, sizeof(struct impl));
378 if (impl == NULL)
379 return NULL;
380
381 impl->context = context;
382
383 this = &impl->this;
384
385 pw_log_debug("connection %p: new fd:%d", this, fd);
386
387 this->fd = fd;
388 spa_hook_list_init(&this->listener_list);
389
390 impl->hdr_size = HDR_SIZE;
391 impl->version = 3;
392
393 impl->out.buffer_data = calloc(1, MAX_BUFFER_SIZE);
394 impl->out.buffer_maxsize = MAX_BUFFER_SIZE;
395 impl->in.buffer_data = calloc(1, MAX_BUFFER_SIZE);
396 impl->in.buffer_maxsize = MAX_BUFFER_SIZE;
397
398 reenter_item = calloc(1, sizeof(struct reenter_item));
399
400 if (impl->out.buffer_data == NULL || impl->in.buffer_data == NULL || reenter_item == NULL)
401 goto no_mem;
402
403 spa_list_init(&impl->reenter_stack);
404 spa_list_append(&impl->reenter_stack, &reenter_item->link);
405
406 return this;
407
408 no_mem:
409 free(impl->out.buffer_data);
410 free(impl->in.buffer_data);
411 free(reenter_item);
412 free(impl);
413 return NULL;
414 }
415
pw_protocol_native_connection_set_fd(struct pw_protocol_native_connection * conn,int fd)416 int pw_protocol_native_connection_set_fd(struct pw_protocol_native_connection *conn, int fd)
417 {
418 pw_log_debug("connection %p: fd:%d", conn, fd);
419 conn->fd = fd;
420 return 0;
421 }
422
423 /** Destroy a connection
424 *
425 * \param conn the connection to destroy
426 *
427 * \memberof pw_protocol_native_connection
428 */
pw_protocol_native_connection_destroy(struct pw_protocol_native_connection * conn)429 void pw_protocol_native_connection_destroy(struct pw_protocol_native_connection *conn)
430 {
431 struct impl *impl = SPA_CONTAINER_OF(conn, struct impl, this);
432
433 pw_log_debug("connection %p: destroy", conn);
434
435 spa_hook_list_call(&conn->listener_list, struct pw_protocol_native_connection_events, destroy, 0);
436
437 spa_hook_list_clean(&conn->listener_list);
438
439 clear_buffer(&impl->out, true);
440 clear_buffer(&impl->in, true);
441 free(impl->out.buffer_data);
442 free(impl->in.buffer_data);
443
444 while (!spa_list_is_empty(&impl->reenter_stack))
445 pop_reenter_stack(impl, 1);
446
447 free(impl);
448 }
449
prepare_packet(struct pw_protocol_native_connection * conn,struct buffer * buf)450 static int prepare_packet(struct pw_protocol_native_connection *conn, struct buffer *buf)
451 {
452 struct impl *impl = SPA_CONTAINER_OF(conn, struct impl, this);
453 uint8_t *data;
454 size_t size, len;
455 uint32_t *p;
456
457 data = buf->buffer_data + buf->offset;
458 size = buf->buffer_size - buf->offset;
459
460 if (size < impl->hdr_size)
461 return impl->hdr_size;
462
463 p = (uint32_t *) data;
464
465 buf->msg.id = p[0];
466 buf->msg.opcode = p[1] >> 24;
467 len = p[1] & 0xffffff;
468
469 if (buf->msg.id == 0 && buf->msg.opcode == 1) {
470 if (p[3] >= 4) {
471 pw_log_warn("old version detected");
472 impl->version = 0;
473 impl->hdr_size = HDR_SIZE_V0;
474 } else {
475 impl->version = 3;
476 impl->hdr_size = HDR_SIZE;
477 }
478 spa_hook_list_call(&conn->listener_list,
479 struct pw_protocol_native_connection_events,
480 start, 0, impl->version);
481 }
482 if (impl->version >= 3) {
483 buf->msg.seq = p[2];
484 buf->msg.n_fds = p[3];
485 } else {
486 buf->msg.seq = 0;
487 buf->msg.n_fds = 0;
488 }
489
490 data += impl->hdr_size;
491 size -= impl->hdr_size;
492 buf->msg.fds = &buf->fds[buf->fds_offset];
493
494 if (buf->msg.n_fds + buf->fds_offset > MAX_FDS)
495 return -EPROTO;
496
497 if (size < len)
498 return len;
499
500 buf->msg.size = len;
501 buf->msg.data = data;
502
503 buf->offset += impl->hdr_size + len;
504 buf->fds_offset += buf->msg.n_fds;
505
506 if (buf->offset >= buf->buffer_size)
507 clear_buffer(buf, false);
508
509 return 0;
510 }
511
512 /** Move to the next packet in the connection
513 *
514 * \param conn the connection
515 * \param opcode address of result opcode
516 * \param dest_id address of result destination id
517 * \param dt pointer to packet data
518 * \param sz size of packet data
519 * \return true on success
520 *
521 * Get the next packet in \a conn and store the opcode and destination
522 * id as well as the packet data and size.
523 *
524 * \memberof pw_protocol_native_connection
525 */
526 int
pw_protocol_native_connection_get_next(struct pw_protocol_native_connection * conn,const struct pw_protocol_native_message ** msg)527 pw_protocol_native_connection_get_next(struct pw_protocol_native_connection *conn,
528 const struct pw_protocol_native_message **msg)
529 {
530 struct impl *impl = SPA_CONTAINER_OF(conn, struct impl, this);
531 int len, res;
532 struct buffer *buf;
533 struct pw_protocol_native_message *return_msg;
534 int *fds;
535
536 if ((res = ensure_stack_level(impl, &return_msg)) < 0)
537 return res;
538
539 buf = &impl->in;
540
541 while (1) {
542 len = prepare_packet(conn, buf);
543 if (len < 0)
544 return len;
545 if (len == 0)
546 break;
547
548 if (connection_ensure_size(conn, buf, len) == NULL)
549 return -errno;
550 if ((res = refill_buffer(conn, buf)) < 0)
551 return res;
552 }
553
554 /* Returned msg struct should be safe vs. reentering */
555 fds = return_msg->fds;
556 *return_msg = buf->msg;
557 if (buf->msg.n_fds > 0) {
558 memcpy(fds, buf->msg.fds, buf->msg.n_fds * sizeof(int));
559 }
560 return_msg->fds = fds;
561
562 *msg = return_msg;
563
564 return 1;
565 }
566
begin_write(struct pw_protocol_native_connection * conn,uint32_t size)567 static inline void *begin_write(struct pw_protocol_native_connection *conn, uint32_t size)
568 {
569 struct impl *impl = SPA_CONTAINER_OF(conn, struct impl, this);
570 uint32_t *p;
571 struct buffer *buf = &impl->out;
572 /* header and size for payload */
573 if ((p = connection_ensure_size(conn, buf, impl->hdr_size + size)) == NULL)
574 return NULL;
575
576 return SPA_PTROFF(p, impl->hdr_size, void);
577 }
578
builder_overflow(void * data,uint32_t size)579 static int builder_overflow(void *data, uint32_t size)
580 {
581 struct impl *impl = data;
582 struct spa_pod_builder *b = &impl->builder;
583
584 b->size = SPA_ROUND_UP_N(size, 4096);
585 if ((b->data = begin_write(&impl->this, b->size)) == NULL)
586 return -errno;
587 return 0;
588 }
589
590 static const struct spa_pod_builder_callbacks builder_callbacks = {
591 SPA_VERSION_POD_BUILDER_CALLBACKS,
592 .overflow = builder_overflow
593 };
594
595 struct spa_pod_builder *
pw_protocol_native_connection_begin(struct pw_protocol_native_connection * conn,uint32_t id,uint8_t opcode,struct pw_protocol_native_message ** msg)596 pw_protocol_native_connection_begin(struct pw_protocol_native_connection *conn,
597 uint32_t id, uint8_t opcode,
598 struct pw_protocol_native_message **msg)
599 {
600 struct impl *impl = SPA_CONTAINER_OF(conn, struct impl, this);
601 struct buffer *buf = &impl->out;
602
603 buf->msg.id = id;
604 buf->msg.opcode = opcode;
605 impl->builder = SPA_POD_BUILDER_INIT(NULL, 0);
606 spa_pod_builder_set_callbacks(&impl->builder, &builder_callbacks, impl);
607 if (impl->version >= 3) {
608 buf->msg.n_fds = 0;
609 buf->msg.fds = &buf->fds[buf->n_fds];
610 } else {
611 buf->msg.n_fds = buf->n_fds;
612 buf->msg.fds = &buf->fds[0];
613 }
614
615 buf->msg.seq = buf->seq;
616 if (msg)
617 *msg = &buf->msg;
618 return &impl->builder;
619 }
620
621 int
pw_protocol_native_connection_end(struct pw_protocol_native_connection * conn,struct spa_pod_builder * builder)622 pw_protocol_native_connection_end(struct pw_protocol_native_connection *conn,
623 struct spa_pod_builder *builder)
624 {
625 struct impl *impl = SPA_CONTAINER_OF(conn, struct impl, this);
626 uint32_t *p, size = builder->state.offset;
627 struct buffer *buf = &impl->out;
628 int res;
629
630 if ((p = connection_ensure_size(conn, buf, impl->hdr_size + size)) == NULL)
631 return -errno;
632
633 p[0] = buf->msg.id;
634 p[1] = (buf->msg.opcode << 24) | (size & 0xffffff);
635 if (impl->version >= 3) {
636 p[2] = buf->msg.seq;
637 p[3] = buf->msg.n_fds;
638 }
639
640 buf->buffer_size += impl->hdr_size + size;
641 if (impl->version >= 3)
642 buf->n_fds += buf->msg.n_fds;
643 else
644 buf->n_fds = buf->msg.n_fds;
645
646 if (mod_topic_connection->level >= SPA_LOG_LEVEL_DEBUG) {
647 pw_log_debug(">>>>>>>>> out: id:%d op:%d size:%d seq:%d",
648 buf->msg.id, buf->msg.opcode, size, buf->msg.seq);
649 spa_debug_pod(0, NULL, SPA_PTROFF(p, impl->hdr_size, struct spa_pod));
650 }
651
652 buf->seq = (buf->seq + 1) & SPA_ASYNC_SEQ_MASK;
653 res = SPA_RESULT_RETURN_ASYNC(buf->msg.seq);
654
655 spa_hook_list_call(&conn->listener_list,
656 struct pw_protocol_native_connection_events, need_flush, 0);
657
658 return res;
659 }
660
661 /** Flush the connection object
662 *
663 * \param conn the connection object
664 * \return 0 on success < 0 error code on error
665 *
666 * Write the queued messages on the connection to the socket
667 *
668 * \memberof pw_protocol_native_connection
669 */
pw_protocol_native_connection_flush(struct pw_protocol_native_connection * conn)670 int pw_protocol_native_connection_flush(struct pw_protocol_native_connection *conn)
671 {
672 struct impl *impl = SPA_CONTAINER_OF(conn, struct impl, this);
673 ssize_t sent, outsize;
674 struct msghdr msg = { 0 };
675 struct iovec iov[1];
676 struct cmsghdr *cmsg;
677 char cmsgbuf[CMSG_SPACE(MAX_FDS_MSG * sizeof(int))];
678 int res = 0, *fds;
679 uint32_t fds_len, to_close, n_fds, outfds, i;
680 struct buffer *buf;
681 void *data;
682 size_t size;
683
684 buf = &impl->out;
685 data = buf->buffer_data;
686 size = buf->buffer_size;
687 fds = buf->fds;
688 n_fds = buf->n_fds;
689 to_close = 0;
690
691 while (size > 0) {
692 if (n_fds > MAX_FDS_MSG) {
693 outfds = MAX_FDS_MSG;
694 outsize = SPA_MIN(sizeof(uint32_t), size);
695 } else {
696 outfds = n_fds;
697 outsize = size;
698 }
699
700 fds_len = outfds * sizeof(int);
701
702 iov[0].iov_base = data;
703 iov[0].iov_len = outsize;
704 msg.msg_iov = iov;
705 msg.msg_iovlen = 1;
706
707 if (outfds > 0) {
708 msg.msg_control = cmsgbuf;
709 msg.msg_controllen = CMSG_SPACE(fds_len);
710 cmsg = CMSG_FIRSTHDR(&msg);
711 cmsg->cmsg_level = SOL_SOCKET;
712 cmsg->cmsg_type = SCM_RIGHTS;
713 cmsg->cmsg_len = CMSG_LEN(fds_len);
714 memcpy(CMSG_DATA(cmsg), fds, fds_len);
715 msg.msg_controllen = cmsg->cmsg_len;
716 } else {
717 msg.msg_control = NULL;
718 msg.msg_controllen = 0;
719 }
720
721 while (true) {
722 sent = sendmsg(conn->fd, &msg, MSG_NOSIGNAL | MSG_DONTWAIT);
723 if (sent < 0) {
724 if (errno == EINTR)
725 continue;
726 else {
727 res = -errno;
728 goto exit;
729 }
730 }
731 break;
732 }
733 pw_log_trace("connection %p: %d written %zd bytes and %u fds", conn, conn->fd, sent,
734 outfds);
735
736 size -= sent;
737 data = SPA_PTROFF(data, sent, void);
738 n_fds -= outfds;
739 fds += outfds;
740 to_close += outfds;
741 }
742
743 res = 0;
744
745 exit:
746 if (size > 0)
747 memmove(buf->buffer_data, data, size);
748 buf->buffer_size = size;
749 for (i = 0; i < to_close; i++)
750 close(buf->fds[i]);
751 if (n_fds > 0)
752 memmove(buf->fds, fds, n_fds * sizeof(int));
753 buf->n_fds = n_fds;
754 return res;
755 }
756
757 /** Clear the connection object
758 *
759 * \param conn the connection object
760 * \return 0 on success
761 *
762 * Remove all queued messages from \a conn
763 *
764 * \memberof pw_protocol_native_connection
765 */
pw_protocol_native_connection_clear(struct pw_protocol_native_connection * conn)766 int pw_protocol_native_connection_clear(struct pw_protocol_native_connection *conn)
767 {
768 struct impl *impl = SPA_CONTAINER_OF(conn, struct impl, this);
769
770 clear_buffer(&impl->out, true);
771 clear_buffer(&impl->in, true);
772
773 return 0;
774 }
775