1 /* PipeWire
2  *
3  * Copyright © 2018 Wim Taymans
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining a
6  * copy of this software and associated documentation files (the "Software"),
7  * to deal in the Software without restriction, including without limitation
8  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9  * and/or sell copies of the Software, and to permit persons to whom the
10  * Software is furnished to do so, subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice (including the next
13  * paragraph) shall be included in all copies or substantial portions of the
14  * Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.  IN NO EVENT SHALL
19  * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
22  * DEALINGS IN THE SOFTWARE.
23  */
24 
25 #include <errno.h>
26 #include <stdio.h>
27 #include <math.h>
28 #include <sys/mman.h>
29 #include <time.h>
30 
31 #include <spa/buffer/alloc.h>
32 #include <spa/param/props.h>
33 #include <spa/node/io.h>
34 #include <spa/node/utils.h>
35 #include <spa/utils/ringbuffer.h>
36 #include <spa/pod/filter.h>
37 #include <spa/debug/format.h>
38 #include <spa/debug/types.h>
39 #include <spa/debug/pod.h>
40 
41 #include "pipewire/pipewire.h"
42 #include "pipewire/stream.h"
43 #include "pipewire/private.h"
44 
45 PW_LOG_TOPIC_EXTERN(log_stream);
46 #define PW_LOG_TOPIC_DEFAULT log_stream
47 
48 #define MAX_BUFFERS	64
49 
50 #define MASK_BUFFERS	(MAX_BUFFERS-1)
51 #define MAX_PORTS	1
52 
53 static bool mlock_warned = false;
54 
55 static uint32_t mappable_dataTypes = (1<<SPA_DATA_MemFd);
56 
57 struct buffer {
58 	struct pw_buffer this;
59 	uint32_t id;
60 #define BUFFER_FLAG_MAPPED	(1 << 0)
61 #define BUFFER_FLAG_QUEUED	(1 << 1)
62 #define BUFFER_FLAG_ADDED	(1 << 2)
63 	uint32_t flags;
64 	struct spa_meta_busy *busy;
65 };
66 
67 struct queue {
68 	uint32_t ids[MAX_BUFFERS];
69 	struct spa_ringbuffer ring;
70 	uint64_t incount;
71 	uint64_t outcount;
72 };
73 
74 struct data {
75 	struct pw_context *context;
76 	struct spa_hook stream_listener;
77 };
78 
79 struct param {
80 	uint32_t id;
81 #define PARAM_FLAG_LOCKED	(1 << 0)
82 	uint32_t flags;
83 	struct spa_list link;
84 	struct spa_pod *param;
85 };
86 
87 struct control {
88 	uint32_t id;
89 	uint32_t type;
90 	uint32_t container;
91 	struct spa_list link;
92 	struct pw_stream_control control;
93 	struct spa_pod *info;
94 	unsigned int emitted:1;
95 	float values[64];
96 };
97 
98 struct stream {
99 	struct pw_stream this;
100 
101 	const char *path;
102 
103 	struct pw_context *context;
104 	struct spa_hook context_listener;
105 
106 	enum spa_direction direction;
107 	enum pw_stream_flags flags;
108 
109 	struct pw_impl_node *node;
110 
111 	struct spa_node impl_node;
112 	struct spa_node_methods node_methods;
113 	struct spa_hook_list hooks;
114 	struct spa_callbacks callbacks;
115 
116 	struct spa_io_clock *clock;
117 	struct spa_io_position *position;
118 	struct spa_io_buffers *io;
119 	struct {
120 		struct spa_io_position *position;
121 	} rt;
122 
123 	uint32_t port_change_mask_all;
124 	struct spa_port_info port_info;
125 	struct pw_properties *port_props;
126 #define IDX_EnumFormat	0
127 #define IDX_Meta	1
128 #define IDX_IO		2
129 #define IDX_Format	3
130 #define IDX_Buffers	4
131 #define IDX_Latency	5
132 #define N_PORT_PARAMS	6
133 	struct spa_param_info port_params[N_PORT_PARAMS];
134 
135 	struct spa_list param_list;
136 
137 	uint32_t change_mask_all;
138 	struct spa_node_info info;
139 #define IDX_Props	0
140 #define N_NODE_PARAMS	1
141 	struct spa_param_info params[N_NODE_PARAMS];
142 
143 	uint32_t media_type;
144 	uint32_t media_subtype;
145 
146 	struct buffer buffers[MAX_BUFFERS];
147 	uint32_t n_buffers;
148 
149 	struct queue dequeued;
150 	struct queue queued;
151 
152 	struct data data;
153 	uintptr_t seq;
154 	struct pw_time time;
155 	uint64_t base_pos;
156 	uint32_t clock_id;
157 	struct spa_latency_info latency;
158 	uint64_t quantum;
159 
160 	struct spa_callbacks rt_callbacks;
161 
162 	unsigned int disconnecting:1;
163 	unsigned int disconnect_core:1;
164 	unsigned int draining:1;
165 	unsigned int drained:1;
166 	unsigned int allow_mlock:1;
167 	unsigned int warn_mlock:1;
168 	unsigned int process_rt:1;
169 	unsigned int driving:1;
170 	unsigned int using_trigger:1;
171 	unsigned int trigger:1;
172 };
173 
get_param_index(uint32_t id)174 static int get_param_index(uint32_t id)
175 {
176 	switch (id) {
177 	case SPA_PARAM_Props:
178 		return IDX_Props;
179 	default:
180 		return -1;
181 	}
182 }
183 
get_port_param_index(uint32_t id)184 static int get_port_param_index(uint32_t id)
185 {
186 	switch (id) {
187 	case SPA_PARAM_EnumFormat:
188 		return IDX_EnumFormat;
189 	case SPA_PARAM_Meta:
190 		return IDX_Meta;
191 	case SPA_PARAM_IO:
192 		return IDX_IO;
193 	case SPA_PARAM_Format:
194 		return IDX_Format;
195 	case SPA_PARAM_Buffers:
196 		return IDX_Buffers;
197 	case SPA_PARAM_Latency:
198 		return IDX_Latency;
199 	default:
200 		return -1;
201 	}
202 }
203 
fix_datatype(const struct spa_pod * param)204 static void fix_datatype(const struct spa_pod *param)
205 {
206 	const struct spa_pod_prop *pod_param;
207 	const struct spa_pod *vals;
208 	uint32_t dataType, n_vals, choice;
209 
210 	pod_param = spa_pod_find_prop(param, NULL, SPA_PARAM_BUFFERS_dataType);
211 	if (pod_param == NULL)
212 		return;
213 
214 	vals = spa_pod_get_values(&pod_param->value, &n_vals, &choice);
215 	if (n_vals == 0)
216 		return;
217 
218 	if (spa_pod_get_int(&vals[0], (int32_t*)&dataType) < 0)
219 		return;
220 
221 	pw_log_debug("dataType: %u", dataType);
222 	if (dataType & (1u << SPA_DATA_MemPtr)) {
223 		SPA_POD_VALUE(struct spa_pod_int, &vals[0]) =
224 			dataType | mappable_dataTypes;
225 		pw_log_debug("Change dataType: %u -> %u", dataType,
226 				SPA_POD_VALUE(struct spa_pod_int, &vals[0]));
227 	}
228 }
229 
add_param(struct stream * impl,uint32_t id,uint32_t flags,const struct spa_pod * param)230 static struct param *add_param(struct stream *impl,
231 		uint32_t id, uint32_t flags, const struct spa_pod *param)
232 {
233 	struct param *p;
234 	int idx;
235 
236 	if (param == NULL || !spa_pod_is_object(param)) {
237 		errno = EINVAL;
238 		return NULL;
239 	}
240 	if (id == SPA_ID_INVALID)
241 		id = SPA_POD_OBJECT_ID(param);
242 
243 	p = malloc(sizeof(struct param) + SPA_POD_SIZE(param));
244 	if (p == NULL)
245 		return NULL;
246 
247 	if (id == SPA_PARAM_Buffers &&
248 	    SPA_FLAG_IS_SET(impl->flags, PW_STREAM_FLAG_MAP_BUFFERS) &&
249 	    impl->direction == SPA_DIRECTION_INPUT)
250 		fix_datatype(param);
251 
252 	p->id = id;
253 	p->flags = flags;
254 	p->param = SPA_PTROFF(p, sizeof(struct param), struct spa_pod);
255 	memcpy(p->param, param, SPA_POD_SIZE(param));
256 	SPA_POD_OBJECT_ID(p->param) = id;
257 
258 	spa_list_append(&impl->param_list, &p->link);
259 
260 	if ((idx = get_param_index(id)) != -1) {
261 		impl->info.change_mask |= SPA_NODE_CHANGE_MASK_PARAMS;
262 		impl->params[idx].flags |= SPA_PARAM_INFO_READ;
263 		impl->params[idx].user++;
264 	} else if ((idx = get_port_param_index(id)) != -1) {
265 		impl->port_info.change_mask |= SPA_PORT_CHANGE_MASK_PARAMS;
266 		impl->port_params[idx].flags |= SPA_PARAM_INFO_READ;
267 		impl->port_params[idx].user++;
268 	}
269 	return p;
270 }
271 
clear_params(struct stream * impl,uint32_t id)272 static void clear_params(struct stream *impl, uint32_t id)
273 {
274 	struct param *p, *t;
275 
276 	spa_list_for_each_safe(p, t, &impl->param_list, link) {
277 		if (id == SPA_ID_INVALID ||
278 		    (p->id == id && !(p->flags & PARAM_FLAG_LOCKED))) {
279 			spa_list_remove(&p->link);
280 			free(p);
281 		}
282 	}
283 }
284 
update_params(struct stream * impl,uint32_t id,const struct spa_pod ** params,uint32_t n_params)285 static int update_params(struct stream *impl, uint32_t id,
286 		const struct spa_pod **params, uint32_t n_params)
287 {
288 	uint32_t i;
289 	int res = 0;
290 
291 	if (id != SPA_ID_INVALID) {
292 		clear_params(impl, id);
293 	} else {
294 		for (i = 0; i < n_params; i++) {
295 			if (!spa_pod_is_object(params[i]))
296 				continue;
297 			clear_params(impl, SPA_POD_OBJECT_ID(params[i]));
298 		}
299 	}
300 	for (i = 0; i < n_params; i++) {
301 		if (add_param(impl, id, 0, params[i]) == NULL) {
302 			res = -errno;
303 			break;
304 		}
305 	}
306 	return res;
307 }
308 
309 
push_queue(struct stream * stream,struct queue * queue,struct buffer * buffer)310 static inline int push_queue(struct stream *stream, struct queue *queue, struct buffer *buffer)
311 {
312 	uint32_t index;
313 
314 	if (SPA_FLAG_IS_SET(buffer->flags, BUFFER_FLAG_QUEUED))
315 		return -EINVAL;
316 
317 	SPA_FLAG_SET(buffer->flags, BUFFER_FLAG_QUEUED);
318 	queue->incount += buffer->this.size;
319 
320 	spa_ringbuffer_get_write_index(&queue->ring, &index);
321 	queue->ids[index & MASK_BUFFERS] = buffer->id;
322 	spa_ringbuffer_write_update(&queue->ring, index + 1);
323 
324 	return 0;
325 }
326 
pop_queue(struct stream * stream,struct queue * queue)327 static inline struct buffer *pop_queue(struct stream *stream, struct queue *queue)
328 {
329 	uint32_t index, id;
330 	struct buffer *buffer;
331 
332 	if (spa_ringbuffer_get_read_index(&queue->ring, &index) < 1) {
333 		errno = EPIPE;
334 		return NULL;
335 	}
336 
337 	id = queue->ids[index & MASK_BUFFERS];
338 	spa_ringbuffer_read_update(&queue->ring, index + 1);
339 
340 	buffer = &stream->buffers[id];
341 	queue->outcount += buffer->this.size;
342 	SPA_FLAG_CLEAR(buffer->flags, BUFFER_FLAG_QUEUED);
343 
344 	return buffer;
345 }
clear_queue(struct stream * stream,struct queue * queue)346 static inline void clear_queue(struct stream *stream, struct queue *queue)
347 {
348 	spa_ringbuffer_init(&queue->ring);
349 	queue->incount = queue->outcount;
350 }
351 
stream_set_state(struct pw_stream * stream,enum pw_stream_state state,const char * error)352 static bool stream_set_state(struct pw_stream *stream, enum pw_stream_state state, const char *error)
353 {
354 	enum pw_stream_state old = stream->state;
355 	bool res = old != state;
356 
357 	if (res) {
358 		free(stream->error);
359 		stream->error = error ? strdup(error) : NULL;
360 
361 		pw_log_debug("%p: update state from %s -> %s (%s)", stream,
362 			     pw_stream_state_as_string(old),
363 			     pw_stream_state_as_string(state), stream->error);
364 
365 		if (state == PW_STREAM_STATE_ERROR)
366 			pw_log_error("%p: error %s", stream, error);
367 
368 		stream->state = state;
369 		pw_stream_emit_state_changed(stream, old, state, error);
370 	}
371 	return res;
372 }
373 
get_buffer(struct pw_stream * stream,uint32_t id)374 static struct buffer *get_buffer(struct pw_stream *stream, uint32_t id)
375 {
376 	struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
377 	if (id < impl->n_buffers)
378 		return &impl->buffers[id];
379 
380 	errno = EINVAL;
381 	return NULL;
382 }
383 
384 static int
do_call_process(struct spa_loop * loop,bool async,uint32_t seq,const void * data,size_t size,void * user_data)385 do_call_process(struct spa_loop *loop,
386                  bool async, uint32_t seq, const void *data, size_t size, void *user_data)
387 {
388 	struct stream *impl = user_data;
389 	struct pw_stream *stream = &impl->this;
390 	pw_log_trace("%p: do process", stream);
391 	pw_stream_emit_process(stream);
392 	return 0;
393 }
394 
call_process(struct stream * impl)395 static void call_process(struct stream *impl)
396 {
397 	pw_log_trace("%p: call process rt:%u", impl, impl->process_rt);
398 	if (impl->process_rt)
399 		spa_callbacks_call(&impl->rt_callbacks, struct pw_stream_events, process, 0);
400 	else
401 		pw_loop_invoke(impl->context->main_loop,
402 			do_call_process, 1, NULL, 0, false, impl);
403 }
404 
405 static int
do_call_drained(struct spa_loop * loop,bool async,uint32_t seq,const void * data,size_t size,void * user_data)406 do_call_drained(struct spa_loop *loop,
407                  bool async, uint32_t seq, const void *data, size_t size, void *user_data)
408 {
409 	struct stream *impl = user_data;
410 	struct pw_stream *stream = &impl->this;
411 	pw_log_trace("%p: drained", stream);
412 	pw_stream_emit_drained(stream);
413 	return 0;
414 }
415 
call_drained(struct stream * impl)416 static void call_drained(struct stream *impl)
417 {
418 	pw_loop_invoke(impl->context->main_loop,
419 		do_call_drained, 1, NULL, 0, false, impl);
420 }
421 
422 static int
do_call_trigger_done(struct spa_loop * loop,bool async,uint32_t seq,const void * data,size_t size,void * user_data)423 do_call_trigger_done(struct spa_loop *loop,
424                  bool async, uint32_t seq, const void *data, size_t size, void *user_data)
425 {
426 	struct stream *impl = user_data;
427 	struct pw_stream *stream = &impl->this;
428 	pw_log_trace("%p: trigger_done", stream);
429 	pw_stream_emit_trigger_done(stream);
430 	return 0;
431 }
432 
call_trigger_done(struct stream * impl)433 static void call_trigger_done(struct stream *impl)
434 {
435 	pw_loop_invoke(impl->context->main_loop,
436 		do_call_trigger_done, 1, NULL, 0, false, impl);
437 }
438 
439 static int
do_set_position(struct spa_loop * loop,bool async,uint32_t seq,const void * data,size_t size,void * user_data)440 do_set_position(struct spa_loop *loop,
441 		bool async, uint32_t seq, const void *data, size_t size, void *user_data)
442 {
443 	struct stream *impl = user_data;
444 	impl->rt.position = impl->position;
445 	return 0;
446 }
447 
impl_set_io(void * object,uint32_t id,void * data,size_t size)448 static int impl_set_io(void *object, uint32_t id, void *data, size_t size)
449 {
450 	struct stream *impl = object;
451 	struct pw_stream *stream = &impl->this;
452 
453 	pw_log_debug("%p: set io id %d (%s) %p %zd", impl, id,
454 			spa_debug_type_find_name(spa_type_io, id), data, size);
455 
456 	switch(id) {
457 	case SPA_IO_Clock:
458 		if (data && size >= sizeof(struct spa_io_clock))
459 			impl->clock = data;
460 		else
461 			impl->clock = NULL;
462 		break;
463 	case SPA_IO_Position:
464 		if (data && size >= sizeof(struct spa_io_position))
465 			impl->position = data;
466 		else
467 			impl->position = NULL;
468 
469 		pw_loop_invoke(impl->context->data_loop,
470 				do_set_position, 1, NULL, 0, true, impl);
471 		break;
472 	default:
473 		break;
474 	}
475 	impl->driving = impl->clock && impl->position && impl->position->clock.id == impl->clock->id;
476 	pw_stream_emit_io_changed(stream, id, data, size);
477 
478 	return 0;
479 }
480 
enum_params(void * object,bool is_port,int seq,uint32_t id,uint32_t start,uint32_t num,const struct spa_pod * filter)481 static int enum_params(void *object, bool is_port, int seq, uint32_t id, uint32_t start, uint32_t num,
482 				 const struct spa_pod *filter)
483 {
484 	struct stream *d = object;
485 	struct spa_result_node_params result;
486 	uint8_t buffer[1024];
487 	struct spa_pod_builder b = { 0 };
488 	uint32_t count = 0;
489 	struct param *p;
490 	bool found = false;
491 
492 	spa_return_val_if_fail(num != 0, -EINVAL);
493 
494 	result.id = id;
495 	result.next = 0;
496 
497 	pw_log_debug("%p: param id %d (%s) start:%d num:%d", d, id,
498 			spa_debug_type_find_name(spa_type_param, id),
499 			start, num);
500 
501 	spa_list_for_each(p, &d->param_list, link) {
502 		struct spa_pod *param;
503 
504 		result.index = result.next++;
505 		if (result.index < start)
506 			continue;
507 
508 		param = p->param;
509 		if (param == NULL || p->id != id)
510 			continue;
511 
512 		found = true;
513 
514 		spa_pod_builder_init(&b, buffer, sizeof(buffer));
515 		if (spa_pod_filter(&b, &result.param, param, filter) != 0)
516 			continue;
517 
518 		spa_node_emit_result(&d->hooks, seq, 0, SPA_RESULT_TYPE_NODE_PARAMS, &result);
519 
520 		if (++count == num)
521 			break;
522 	}
523 	return found ? 0 : -ENOENT;
524 }
525 
impl_enum_params(void * object,int seq,uint32_t id,uint32_t start,uint32_t num,const struct spa_pod * filter)526 static int impl_enum_params(void *object, int seq, uint32_t id, uint32_t start, uint32_t num,
527 				 const struct spa_pod *filter)
528 {
529 	return enum_params(object, false, seq, id, start, num, filter);
530 }
531 
impl_set_param(void * object,uint32_t id,uint32_t flags,const struct spa_pod * param)532 static int impl_set_param(void *object, uint32_t id, uint32_t flags, const struct spa_pod *param)
533 {
534 	struct stream *impl = object;
535 	struct pw_stream *stream = &impl->this;
536 
537 	if (id != SPA_PARAM_Props)
538 		return -ENOTSUP;
539 
540 	pw_stream_emit_param_changed(stream, id, param);
541 	return 0;
542 }
543 
impl_send_command(void * object,const struct spa_command * command)544 static int impl_send_command(void *object, const struct spa_command *command)
545 {
546 	struct stream *impl = object;
547 	struct pw_stream *stream = &impl->this;
548 	uint32_t id = SPA_NODE_COMMAND_ID(command);
549 
550 	pw_log_info("%p: command %s", impl,
551 			spa_debug_type_find_name(spa_type_node_command_id, id));
552 
553 	switch (id) {
554 	case SPA_NODE_COMMAND_Suspend:
555 	case SPA_NODE_COMMAND_Flush:
556 	case SPA_NODE_COMMAND_Pause:
557 		pw_loop_invoke(impl->context->main_loop,
558 			NULL, 0, NULL, 0, false, impl);
559 		if (stream->state == PW_STREAM_STATE_STREAMING) {
560 
561 			pw_log_debug("%p: pause", stream);
562 			stream_set_state(stream, PW_STREAM_STATE_PAUSED, NULL);
563 		}
564 		break;
565 	case SPA_NODE_COMMAND_Start:
566 		if (stream->state == PW_STREAM_STATE_PAUSED) {
567 			pw_log_debug("%p: start %d", stream, impl->direction);
568 
569 			if (impl->direction == SPA_DIRECTION_INPUT)
570 				impl->io->status = SPA_STATUS_NEED_DATA;
571 			else if (!impl->process_rt && !impl->driving)
572 				call_process(impl);
573 
574 			stream_set_state(stream, PW_STREAM_STATE_STREAMING, NULL);
575 		}
576 		break;
577 	default:
578 		break;
579 	}
580 	pw_stream_emit_command(stream, command);
581 	return 0;
582 }
583 
emit_node_info(struct stream * d,bool full)584 static void emit_node_info(struct stream *d, bool full)
585 {
586 	uint32_t i;
587 	uint64_t old = full ? d->info.change_mask : 0;
588 	if (full)
589 		d->info.change_mask = d->change_mask_all;
590 	if (d->info.change_mask != 0) {
591 		if (d->info.change_mask & SPA_NODE_CHANGE_MASK_PARAMS) {
592 			for (i = 0; i < d->info.n_params; i++) {
593 				if (d->params[i].user > 0) {
594 					d->params[i].flags ^= SPA_PARAM_INFO_SERIAL;
595 					d->params[i].user = 0;
596 				}
597 			}
598 		}
599 		spa_node_emit_info(&d->hooks, &d->info);
600 	}
601 	d->info.change_mask = old;
602 }
603 
emit_port_info(struct stream * d,bool full)604 static void emit_port_info(struct stream *d, bool full)
605 {
606 	uint32_t i;
607 	uint64_t old = full ? d->port_info.change_mask : 0;
608 	if (full)
609 		d->port_info.change_mask = d->port_change_mask_all;
610 	if (d->port_info.change_mask != 0) {
611 		if (d->port_info.change_mask & SPA_PORT_CHANGE_MASK_PARAMS) {
612 			for (i = 0; i < d->port_info.n_params; i++) {
613 				if (d->port_params[i].user > 0) {
614 					d->port_params[i].flags ^= SPA_PARAM_INFO_SERIAL;
615 					d->port_params[i].user = 0;
616 				}
617 			}
618 		}
619 		spa_node_emit_port_info(&d->hooks, d->direction, 0, &d->port_info);
620 	}
621 	d->port_info.change_mask = old;
622 }
623 
impl_add_listener(void * object,struct spa_hook * listener,const struct spa_node_events * events,void * data)624 static int impl_add_listener(void *object,
625 		struct spa_hook *listener,
626 		const struct spa_node_events *events,
627 		void *data)
628 {
629 	struct stream *d = object;
630 	struct spa_hook_list save;
631 
632 	spa_hook_list_isolate(&d->hooks, &save, listener, events, data);
633 
634 	emit_node_info(d, true);
635 	emit_port_info(d, true);
636 
637 	spa_hook_list_join(&d->hooks, &save);
638 
639 	return 0;
640 }
641 
impl_set_callbacks(void * object,const struct spa_node_callbacks * callbacks,void * data)642 static int impl_set_callbacks(void *object,
643 			      const struct spa_node_callbacks *callbacks, void *data)
644 {
645 	struct stream *d = object;
646 
647 	d->callbacks = SPA_CALLBACKS_INIT(callbacks, data);
648 
649 	return 0;
650 }
651 
impl_port_set_io(void * object,enum spa_direction direction,uint32_t port_id,uint32_t id,void * data,size_t size)652 static int impl_port_set_io(void *object, enum spa_direction direction, uint32_t port_id,
653 			    uint32_t id, void *data, size_t size)
654 {
655 	struct stream *impl = object;
656 	struct pw_stream *stream = &impl->this;
657 
658 	pw_log_debug("%p: id:%d (%s) %p %zd", impl, id,
659 			spa_debug_type_find_name(spa_type_io, id), data, size);
660 
661 	switch (id) {
662 	case SPA_IO_Buffers:
663 		if (data && size >= sizeof(struct spa_io_buffers))
664 			impl->io = data;
665 		else
666 			impl->io = NULL;
667 		break;
668 	}
669 	pw_stream_emit_io_changed(stream, id, data, size);
670 
671 	return 0;
672 }
673 
impl_port_enum_params(void * object,int seq,enum spa_direction direction,uint32_t port_id,uint32_t id,uint32_t start,uint32_t num,const struct spa_pod * filter)674 static int impl_port_enum_params(void *object, int seq,
675 				 enum spa_direction direction, uint32_t port_id,
676 				 uint32_t id, uint32_t start, uint32_t num,
677 				 const struct spa_pod *filter)
678 {
679 	return enum_params(object, true, seq, id, start, num, filter);
680 }
681 
map_data(struct stream * impl,struct spa_data * data,int prot)682 static int map_data(struct stream *impl, struct spa_data *data, int prot)
683 {
684 	void *ptr;
685 	struct pw_map_range range;
686 
687 	pw_map_range_init(&range, data->mapoffset, data->maxsize, impl->context->sc_pagesize);
688 
689 	ptr = mmap(NULL, range.size, prot, MAP_SHARED, data->fd, range.offset);
690 	if (ptr == MAP_FAILED) {
691 		pw_log_error("%p: failed to mmap buffer mem: %m", impl);
692 		return -errno;
693 	}
694 
695 	data->data = SPA_PTROFF(ptr, range.start, void);
696 	pw_log_debug("%p: fd %"PRIi64" mapped %d %d %p", impl, data->fd,
697 			range.offset, range.size, data->data);
698 
699 	if (impl->allow_mlock && mlock(data->data, data->maxsize) < 0) {
700 		if (errno != ENOMEM || !mlock_warned) {
701 			pw_log(impl->warn_mlock ? SPA_LOG_LEVEL_WARN : SPA_LOG_LEVEL_DEBUG,
702 					"%p: Failed to mlock memory %p %u: %s", impl,
703 					data->data, data->maxsize,
704 					errno == ENOMEM ?
705 					"This is not a problem but for best performance, "
706 					"consider increasing RLIMIT_MEMLOCK" : strerror(errno));
707 			mlock_warned |= errno == ENOMEM;
708 		}
709 	}
710 	return 0;
711 }
712 
unmap_data(struct stream * impl,struct spa_data * data)713 static int unmap_data(struct stream *impl, struct spa_data *data)
714 {
715 	struct pw_map_range range;
716 
717 	pw_map_range_init(&range, data->mapoffset, data->maxsize, impl->context->sc_pagesize);
718 
719 	if (munmap(SPA_PTROFF(data->data, -range.start, void), range.size) < 0)
720 		pw_log_warn("%p: failed to unmap: %m", impl);
721 
722 	pw_log_debug("%p: fd %"PRIi64" unmapped", impl, data->fd);
723 	return 0;
724 }
725 
clear_buffers(struct pw_stream * stream)726 static void clear_buffers(struct pw_stream *stream)
727 {
728 	struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
729 	uint32_t i, j;
730 
731 	pw_log_debug("%p: clear buffers %d", stream, impl->n_buffers);
732 
733 	for (i = 0; i < impl->n_buffers; i++) {
734 		struct buffer *b = &impl->buffers[i];
735 
736 		if (SPA_FLAG_IS_SET(b->flags, BUFFER_FLAG_ADDED))
737 			pw_stream_emit_remove_buffer(stream, &b->this);
738 
739 		if (SPA_FLAG_IS_SET(b->flags, BUFFER_FLAG_MAPPED)) {
740 			for (j = 0; j < b->this.buffer->n_datas; j++) {
741 				struct spa_data *d = &b->this.buffer->datas[j];
742 				pw_log_debug("%p: clear buffer %d mem",
743 						stream, b->id);
744 				unmap_data(impl, d);
745 			}
746 		}
747 	}
748 	impl->n_buffers = 0;
749 	if (impl->direction == SPA_DIRECTION_INPUT) {
750 		struct buffer *b;
751 
752 		while ((b = pop_queue(impl, &impl->dequeued))) {
753 			if (b->busy)
754 				ATOMIC_DEC(b->busy->count);
755 		}
756 	} else
757 		clear_queue(impl, &impl->dequeued);
758 	clear_queue(impl, &impl->queued);
759 }
760 
parse_latency(struct pw_stream * stream,const struct spa_pod * param)761 static int parse_latency(struct pw_stream *stream, const struct spa_pod *param)
762 {
763 	struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
764 	struct spa_latency_info info;
765 	int res;
766 
767 	if (param == NULL)
768 		return 0;
769 
770 	if ((res = spa_latency_parse(param, &info)) < 0)
771 		return res;
772 
773 	pw_log_info("stream %p: set %s latency %f-%f %d-%d %"PRIu64"-%"PRIu64, stream,
774 			info.direction == SPA_DIRECTION_INPUT ? "input" : "output",
775 			info.min_quantum, info.max_quantum,
776 			info.min_rate, info.max_rate,
777 			info.min_ns, info.max_ns);
778 
779 	if (info.direction == impl->direction)
780 		return 0;
781 
782 	impl->latency = info;
783 	return 0;
784 }
785 
impl_port_set_param(void * object,enum spa_direction direction,uint32_t port_id,uint32_t id,uint32_t flags,const struct spa_pod * param)786 static int impl_port_set_param(void *object,
787 			       enum spa_direction direction, uint32_t port_id,
788 			       uint32_t id, uint32_t flags,
789 			       const struct spa_pod *param)
790 {
791 	struct stream *impl = object;
792 	struct pw_stream *stream = &impl->this;
793 	int res;
794 
795 	pw_log_debug("%p: port:%d.%d id:%d (%s) param:%p disconnecting:%d", impl,
796 			direction, port_id, id,
797 			spa_debug_type_find_name(spa_type_param, id), param,
798 			impl->disconnecting);
799 
800 	if (impl->disconnecting && param != NULL)
801 		return -EIO;
802 
803 	if (param)
804 		pw_log_pod(SPA_LOG_LEVEL_DEBUG, param);
805 
806 	if ((res = update_params(impl, id, &param, param ? 1 : 0)) < 0)
807 		return res;
808 
809 	switch (id) {
810 	case SPA_PARAM_Format:
811 		clear_buffers(stream);
812 		break;
813 	case SPA_PARAM_Latency:
814 		parse_latency(stream, param);
815 		break;
816 	default:
817 		break;
818 	}
819 
820 	pw_stream_emit_param_changed(stream, id, param);
821 
822 	if (stream->state == PW_STREAM_STATE_ERROR)
823 		return -EIO;
824 
825 	emit_port_info(impl, false);
826 
827 	return 0;
828 }
829 
impl_port_use_buffers(void * object,enum spa_direction direction,uint32_t port_id,uint32_t flags,struct spa_buffer ** buffers,uint32_t n_buffers)830 static int impl_port_use_buffers(void *object,
831 		enum spa_direction direction, uint32_t port_id,
832 		uint32_t flags,
833 		struct spa_buffer **buffers, uint32_t n_buffers)
834 {
835 	struct stream *impl = object;
836 	struct pw_stream *stream = &impl->this;
837 	uint32_t i, j, impl_flags = impl->flags;
838 	int prot, res;
839 	int size = 0;
840 
841 	pw_log_debug("%p: port:%d.%d buffers:%u disconnecting:%d", impl,
842 			direction, port_id, n_buffers, impl->disconnecting);
843 
844 	if (impl->disconnecting && n_buffers > 0)
845 		return -EIO;
846 
847 	prot = PROT_READ | (direction == SPA_DIRECTION_OUTPUT ? PROT_WRITE : 0);
848 
849 	clear_buffers(stream);
850 
851 	for (i = 0; i < n_buffers; i++) {
852 		int buf_size = 0;
853 		struct buffer *b = &impl->buffers[i];
854 
855 		b->flags = 0;
856 		b->id = i;
857 
858 		if (SPA_FLAG_IS_SET(impl_flags, PW_STREAM_FLAG_MAP_BUFFERS)) {
859 			for (j = 0; j < buffers[i]->n_datas; j++) {
860 				struct spa_data *d = &buffers[i]->datas[j];
861 				if ((mappable_dataTypes & (1<<d->type)) > 0) {
862 					if ((res = map_data(impl, d, prot)) < 0)
863 						return res;
864 					SPA_FLAG_SET(b->flags, BUFFER_FLAG_MAPPED);
865 				}
866 				else if (d->type == SPA_DATA_MemPtr && d->data == NULL) {
867 					pw_log_error("%p: invalid buffer mem", stream);
868 					return -EINVAL;
869 				}
870 				buf_size += d->maxsize;
871 			}
872 
873 			if (size > 0 && buf_size != size) {
874 				pw_log_error("%p: invalid buffer size %d", stream, buf_size);
875 				return -EINVAL;
876 			} else
877 				size = buf_size;
878 		}
879 		pw_log_debug("%p: got buffer id:%d datas:%d, mapped size %d", stream, i,
880 				buffers[i]->n_datas, size);
881 	}
882 
883 	for (i = 0; i < n_buffers; i++) {
884 		struct buffer *b = &impl->buffers[i];
885 
886 		b->this.buffer = buffers[i];
887 		b->busy = spa_buffer_find_meta_data(buffers[i], SPA_META_Busy, sizeof(*b->busy));
888 
889 		if (impl->direction == SPA_DIRECTION_OUTPUT) {
890 			pw_log_trace("%p: recycle buffer %d", stream, b->id);
891 			push_queue(impl, &impl->dequeued, b);
892 		}
893 
894 		SPA_FLAG_SET(b->flags, BUFFER_FLAG_ADDED);
895 
896 		pw_stream_emit_add_buffer(stream, &b->this);
897 	}
898 
899 	impl->n_buffers = n_buffers;
900 
901 	return 0;
902 }
903 
impl_port_reuse_buffer(void * object,uint32_t port_id,uint32_t buffer_id)904 static int impl_port_reuse_buffer(void *object, uint32_t port_id, uint32_t buffer_id)
905 {
906 	struct stream *d = object;
907 	pw_log_trace("%p: recycle buffer %d", d, buffer_id);
908 	if (buffer_id < d->n_buffers)
909 		push_queue(d, &d->queued, &d->buffers[buffer_id]);
910 	return 0;
911 }
912 
copy_position(struct stream * impl,int64_t queued)913 static inline void copy_position(struct stream *impl, int64_t queued)
914 {
915 	struct spa_io_position *p = impl->rt.position;
916 	if (SPA_UNLIKELY(p != NULL)) {
917 		SEQ_WRITE(impl->seq);
918 		impl->time.now = p->clock.nsec;
919 		impl->time.rate = p->clock.rate;
920 		if (SPA_UNLIKELY(impl->clock_id != p->clock.id)) {
921 			impl->base_pos = p->clock.position - impl->time.ticks;
922 			impl->clock_id = p->clock.id;
923 		}
924 		impl->time.ticks = p->clock.position - impl->base_pos;
925 		impl->time.delay = 0;
926 		impl->time.queued = queued;
927 		impl->quantum = p->clock.duration;
928 		SEQ_WRITE(impl->seq);
929 	}
930 }
931 
impl_node_process_input(void * object)932 static int impl_node_process_input(void *object)
933 {
934 	struct stream *impl = object;
935 	struct pw_stream *stream = &impl->this;
936 	struct spa_io_buffers *io = impl->io;
937 	struct buffer *b;
938 
939 	pw_log_trace("%p: process in status:%d id:%d ticks:%"PRIu64" delay:%"PRIi64,
940 			stream, io->status, io->buffer_id, impl->time.ticks, impl->time.delay);
941 
942 	if (io->status == SPA_STATUS_HAVE_DATA &&
943 	    (b = get_buffer(stream, io->buffer_id)) != NULL) {
944 		/* push new buffer */
945 		if (push_queue(impl, &impl->dequeued, b) == 0) {
946 			copy_position(impl, impl->dequeued.incount);
947 			if (b->busy)
948 				ATOMIC_INC(b->busy->count);
949 			call_process(impl);
950 		}
951 	}
952 	if (io->status != SPA_STATUS_NEED_DATA) {
953 		/* pop buffer to recycle */
954 		if ((b = pop_queue(impl, &impl->queued))) {
955 			pw_log_trace("%p: recycle buffer %d", stream, b->id);
956 		} else if (io->status == -EPIPE)
957 			return io->status;
958 		io->buffer_id = b ? b->id : SPA_ID_INVALID;
959 		io->status = SPA_STATUS_NEED_DATA;
960 	}
961 	if (impl->driving && impl->using_trigger)
962 		call_trigger_done(impl);
963 
964 	return SPA_STATUS_NEED_DATA | SPA_STATUS_HAVE_DATA;
965 }
966 
impl_node_process_output(void * object)967 static int impl_node_process_output(void *object)
968 {
969 	struct stream *impl = object;
970 	struct pw_stream *stream = &impl->this;
971 	struct spa_io_buffers *io = impl->io;
972 	struct buffer *b;
973 	int res;
974 	uint32_t index;
975 
976 again:
977 	pw_log_trace("%p: process out status:%d id:%d", stream,
978 			io->status, io->buffer_id);
979 
980 	if ((res = io->status) != SPA_STATUS_HAVE_DATA) {
981 		/* recycle old buffer */
982 		if ((b = get_buffer(stream, io->buffer_id)) != NULL) {
983 			pw_log_trace("%p: recycle buffer %d", stream, b->id);
984 			push_queue(impl, &impl->dequeued, b);
985 		}
986 
987 		/* pop new buffer */
988 		if ((b = pop_queue(impl, &impl->queued)) != NULL) {
989 			impl->drained = false;
990 			io->buffer_id = b->id;
991 			res = io->status = SPA_STATUS_HAVE_DATA;
992 			pw_log_trace("%p: pop %d %p", stream, b->id, io);
993 		} else if (impl->draining || impl->drained) {
994 			impl->draining = true;
995 			impl->drained = true;
996 			io->buffer_id = SPA_ID_INVALID;
997 			res = io->status = SPA_STATUS_DRAINED;
998 			pw_log_trace("%p: draining", stream);
999 		} else {
1000 			io->buffer_id = SPA_ID_INVALID;
1001 			res = io->status = SPA_STATUS_NEED_DATA;
1002 			pw_log_trace("%p: no more buffers %p", stream, io);
1003 		}
1004 	}
1005 
1006 	copy_position(impl, impl->queued.outcount);
1007 
1008 	if (!impl->draining && !impl->driving) {
1009 		/* we're not draining, not a driver check if we need to get
1010 		 * more buffers */
1011 		if (!impl->process_rt) {
1012 			/* not realtime and we have a free buffer, trigger process so that we have
1013 			 * data in the next round. */
1014 			if (spa_ringbuffer_get_read_index(&impl->dequeued.ring, &index) > 0)
1015 				call_process(impl);
1016 		} else if (io->status == SPA_STATUS_NEED_DATA) {
1017 			/* realtime and we don't have a buffer, trigger process and try
1018 			 * again when there is something in the queue now */
1019 			call_process(impl);
1020 			if (impl->draining ||
1021 			    spa_ringbuffer_get_read_index(&impl->queued.ring, &index) > 0)
1022 				goto again;
1023 		}
1024 	}
1025 
1026 	pw_log_trace("%p: res %d", stream, res);
1027 
1028 	if (impl->driving && impl->using_trigger && res != SPA_STATUS_HAVE_DATA)
1029 		call_trigger_done(impl);
1030 
1031 	return res;
1032 }
1033 
1034 static const struct spa_node_methods impl_node = {
1035 	SPA_VERSION_NODE_METHODS,
1036 	.add_listener = impl_add_listener,
1037 	.set_callbacks = impl_set_callbacks,
1038 	.enum_params = impl_enum_params,
1039 	.set_param = impl_set_param,
1040 	.set_io = impl_set_io,
1041 	.send_command = impl_send_command,
1042 	.port_set_io = impl_port_set_io,
1043 	.port_enum_params = impl_port_enum_params,
1044 	.port_set_param = impl_port_set_param,
1045 	.port_use_buffers = impl_port_use_buffers,
1046 	.port_reuse_buffer = impl_port_reuse_buffer,
1047 };
1048 
proxy_removed(void * _data)1049 static void proxy_removed(void *_data)
1050 {
1051 	struct pw_stream *stream = _data;
1052 	pw_log_debug("%p: removed", stream);
1053 	spa_hook_remove(&stream->proxy_listener);
1054 	stream->node_id = SPA_ID_INVALID;
1055 	stream_set_state(stream, PW_STREAM_STATE_UNCONNECTED, NULL);
1056 }
1057 
proxy_destroy(void * _data)1058 static void proxy_destroy(void *_data)
1059 {
1060 	struct pw_stream *stream = _data;
1061 	pw_log_debug("%p: destroy", stream);
1062 	proxy_removed(_data);
1063 }
1064 
proxy_error(void * _data,int seq,int res,const char * message)1065 static void proxy_error(void *_data, int seq, int res, const char *message)
1066 {
1067 	struct pw_stream *stream = _data;
1068 	/* we just emit the state change here to inform the application.
1069 	 * If this is supposed to be a permanent error, the app should
1070 	 * do a pw_stream_set_error() */
1071 	pw_stream_emit_state_changed(stream, stream->state,
1072 			PW_STREAM_STATE_ERROR, message);
1073 }
1074 
proxy_bound(void * data,uint32_t global_id)1075 static void proxy_bound(void *data, uint32_t global_id)
1076 {
1077 	struct pw_stream *stream = data;
1078 	stream->node_id = global_id;
1079 	stream_set_state(stream, PW_STREAM_STATE_PAUSED, NULL);
1080 }
1081 
1082 static const struct pw_proxy_events proxy_events = {
1083 	PW_VERSION_PROXY_EVENTS,
1084 	.removed = proxy_removed,
1085 	.destroy = proxy_destroy,
1086 	.error = proxy_error,
1087 	.bound = proxy_bound,
1088 };
1089 
find_control(struct pw_stream * stream,uint32_t id)1090 static struct control *find_control(struct pw_stream *stream, uint32_t id)
1091 {
1092 	struct control *c;
1093 	spa_list_for_each(c, &stream->controls, link) {
1094 		if (c->id == id)
1095 			return c;
1096 	}
1097 	return NULL;
1098 }
1099 
node_event_param(void * object,int seq,uint32_t id,uint32_t index,uint32_t next,struct spa_pod * param)1100 static int node_event_param(void *object, int seq,
1101 		uint32_t id, uint32_t index, uint32_t next,
1102 		struct spa_pod *param)
1103 {
1104 	struct pw_stream *stream = object;
1105 
1106 	switch (id) {
1107 	case SPA_PARAM_PropInfo:
1108 	{
1109 		struct control *c;
1110 		const struct spa_pod *type, *pod;
1111 		uint32_t iid, choice, n_vals, container = SPA_ID_INVALID;
1112 		float *vals, bool_range[3] = { 1.0, 0.0, 1.0 };
1113 
1114 		if (spa_pod_parse_object(param,
1115 					SPA_TYPE_OBJECT_PropInfo, NULL,
1116 					SPA_PROP_INFO_id,   SPA_POD_Id(&iid)) < 0)
1117 			return -EINVAL;
1118 
1119 		c = find_control(stream, iid);
1120 		if (c != NULL)
1121 			return 0;
1122 
1123 		c = calloc(1, sizeof(*c) + SPA_POD_SIZE(param));
1124 		c->info = SPA_PTROFF(c, sizeof(*c), struct spa_pod);
1125 		memcpy(c->info, param, SPA_POD_SIZE(param));
1126 		c->control.n_values = 0;
1127 		c->control.max_values = 0;
1128 		c->control.values = c->values;
1129 
1130 		if (spa_pod_parse_object(c->info,
1131 					SPA_TYPE_OBJECT_PropInfo, NULL,
1132 					SPA_PROP_INFO_name, SPA_POD_String(&c->control.name),
1133 					SPA_PROP_INFO_type, SPA_POD_PodChoice(&type),
1134 					SPA_PROP_INFO_container, SPA_POD_OPT_Id(&container)) < 0) {
1135 			free(c);
1136 			return -EINVAL;
1137 		}
1138 
1139 		spa_list_append(&stream->controls, &c->link);
1140 
1141 		pod = spa_pod_get_values(type, &n_vals, &choice);
1142 
1143 		c->type = SPA_POD_TYPE(pod);
1144 		if (spa_pod_is_float(pod))
1145 			vals = SPA_POD_BODY(pod);
1146 		else if (spa_pod_is_bool(pod) && n_vals > 0) {
1147 			choice = SPA_CHOICE_Range;
1148 			vals = bool_range;
1149 			vals[0] = SPA_POD_VALUE(struct spa_pod_bool, pod);
1150 			n_vals = 3;
1151 		}
1152 		else
1153 			return -ENOTSUP;
1154 
1155 		c->container = container != SPA_ID_INVALID ? container : c->type;
1156 
1157 		switch (choice) {
1158 		case SPA_CHOICE_None:
1159 			if (n_vals < 1)
1160 				return -EINVAL;
1161 			c->control.n_values = 1;
1162 			c->control.max_values = 1;
1163 			c->control.values[0] = c->control.def = c->control.min = c->control.max = vals[0];
1164 			break;
1165 		case SPA_CHOICE_Range:
1166 			if (n_vals < 3)
1167 				return -EINVAL;
1168 			c->control.n_values = 1;
1169 			c->control.max_values = 1;
1170 			c->control.values[0] = vals[0];
1171 			c->control.def = vals[0];
1172 			c->control.min = vals[1];
1173 			c->control.max = vals[2];
1174 			break;
1175 		default:
1176 			return -ENOTSUP;
1177 		}
1178 
1179 		c->id = iid;
1180 		pw_log_debug("%p: add control %d (%s) container:%d (def:%f min:%f max:%f)",
1181 				stream, c->id, c->control.name, c->container,
1182 				c->control.def, c->control.min, c->control.max);
1183 		break;
1184 	}
1185 	case SPA_PARAM_Props:
1186 	{
1187 		struct spa_pod_prop *prop;
1188 		struct spa_pod_object *obj = (struct spa_pod_object *) param;
1189 		union {
1190 			float f;
1191 			bool b;
1192 		} value;
1193 		float *values;
1194 		uint32_t i, n_values;
1195 
1196 		SPA_POD_OBJECT_FOREACH(obj, prop) {
1197 			struct control *c;
1198 
1199 			c = find_control(stream, prop->key);
1200 			if (c == NULL)
1201 				continue;
1202 
1203 			switch (c->container) {
1204 			case SPA_TYPE_Float:
1205 				if (spa_pod_get_float(&prop->value, &value.f) < 0)
1206 					continue;
1207 				n_values = 1;
1208 				values = &value.f;
1209 				break;
1210 			case SPA_TYPE_Bool:
1211 				if (spa_pod_get_bool(&prop->value, &value.b) < 0)
1212 					continue;
1213 				value.f = value.b ? 1.0 : 0.0;
1214 				n_values = 1;
1215 				values = &value.f;
1216 				break;
1217 			case SPA_TYPE_Array:
1218 				if ((values = spa_pod_get_array(&prop->value, &n_values)) == NULL ||
1219 				    !spa_pod_is_float(SPA_POD_ARRAY_CHILD(&prop->value)))
1220 					continue;
1221 				break;
1222 			default:
1223 				continue;
1224 			}
1225 
1226 			if (c->emitted && c->control.n_values == n_values &&
1227 			    memcmp(c->control.values, values, sizeof(float) * n_values) == 0)
1228 				continue;
1229 
1230 			memcpy(c->control.values, values, sizeof(float) * n_values);
1231 			c->control.n_values = n_values;
1232 			c->emitted = true;
1233 
1234 			pw_log_debug("%p: control %d (%s) changed %d:", stream,
1235 					prop->key, c->control.name, n_values);
1236 			for (i = 0; i < n_values; i++)
1237 				pw_log_debug("%p:  value %d %f", stream, i, values[i]);
1238 
1239 			pw_stream_emit_control_info(stream, prop->key, &c->control);
1240 		}
1241 		break;
1242 	}
1243 	default:
1244 		break;
1245 	}
1246 	return 0;
1247 }
1248 
node_event_info(void * object,const struct pw_node_info * info)1249 static void node_event_info(void *object, const struct pw_node_info *info)
1250 {
1251 	struct pw_stream *stream = object;
1252 	struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
1253 	uint32_t i;
1254 
1255 	if (info->change_mask & PW_NODE_CHANGE_MASK_PARAMS) {
1256 		for (i = 0; i < info->n_params; i++) {
1257 			switch (info->params[i].id) {
1258 			case SPA_PARAM_PropInfo:
1259 			case SPA_PARAM_Props:
1260 				pw_impl_node_for_each_param(impl->node,
1261 						0, info->params[i].id,
1262 						0, UINT32_MAX,
1263 						NULL,
1264 						node_event_param,
1265 						stream);
1266 				break;
1267 			default:
1268 				break;
1269 			}
1270 		}
1271 	}
1272 }
1273 
1274 static const struct pw_impl_node_events node_events = {
1275 	PW_VERSION_IMPL_NODE_EVENTS,
1276 	.info_changed = node_event_info,
1277 };
1278 
on_core_error(void * object,uint32_t id,int seq,int res,const char * message)1279 static void on_core_error(void *object, uint32_t id, int seq, int res, const char *message)
1280 {
1281 	struct pw_stream *stream = object;
1282 
1283 	pw_log_debug("%p: error id:%u seq:%d res:%d (%s): %s", stream,
1284 			id, seq, res, spa_strerror(res), message);
1285 
1286 	if (id == PW_ID_CORE && res == -EPIPE) {
1287 		stream_set_state(stream, PW_STREAM_STATE_UNCONNECTED, message);
1288 	}
1289 }
1290 
1291 static const struct pw_core_events core_events = {
1292 	PW_VERSION_CORE_EVENTS,
1293 	.error = on_core_error,
1294 };
1295 
context_drained(void * data,struct pw_impl_node * node)1296 static void context_drained(void *data, struct pw_impl_node *node)
1297 {
1298 	struct stream *impl = data;
1299 	if (impl->node != node)
1300 		return;
1301 	if (impl->draining && impl->drained) {
1302 		impl->draining = false;
1303 		impl->io->status = SPA_STATUS_NEED_DATA;
1304 		call_drained(impl);
1305 	}
1306 }
1307 
1308 static const struct pw_context_driver_events context_events = {
1309 	PW_VERSION_CONTEXT_DRIVER_EVENTS,
1310 	.drained = context_drained,
1311 };
1312 
1313 static struct stream *
stream_new(struct pw_context * context,const char * name,struct pw_properties * props,const struct pw_properties * extra)1314 stream_new(struct pw_context *context, const char *name,
1315 		struct pw_properties *props, const struct pw_properties *extra)
1316 {
1317 	struct stream *impl;
1318 	struct pw_stream *this;
1319 	const char *str;
1320 	int res;
1321 
1322 	impl = calloc(1, sizeof(struct stream));
1323 	if (impl == NULL) {
1324 		res = -errno;
1325 		goto error_cleanup;
1326 	}
1327 	impl->port_props = pw_properties_new(NULL, NULL);
1328 	if (impl->port_props == NULL) {
1329 		res = -errno;
1330 		goto error_properties;
1331 	}
1332 
1333 	this = &impl->this;
1334 	pw_log_debug("%p: new \"%s\"", impl, name);
1335 
1336 	if (props == NULL) {
1337 		props = pw_properties_new(PW_KEY_MEDIA_NAME, name, NULL);
1338 	} else if (pw_properties_get(props, PW_KEY_MEDIA_NAME) == NULL) {
1339 		pw_properties_set(props, PW_KEY_MEDIA_NAME, name);
1340 	}
1341 	if (props == NULL) {
1342 		res = -errno;
1343 		goto error_properties;
1344 	}
1345 	if ((str = pw_context_get_conf_section(context, "stream.properties")) != NULL)
1346 		pw_properties_update_string(props, str, strlen(str));
1347 
1348 	if (pw_properties_get(props, PW_KEY_STREAM_IS_LIVE) == NULL)
1349 		pw_properties_set(props, PW_KEY_STREAM_IS_LIVE, "true");
1350 
1351 	if (pw_properties_get(props, PW_KEY_NODE_NAME) == NULL && extra) {
1352 		str = pw_properties_get(extra, PW_KEY_APP_NAME);
1353 		if (str == NULL)
1354 			str = pw_properties_get(extra, PW_KEY_APP_PROCESS_BINARY);
1355 		if (str == NULL)
1356 			str = name;
1357 		pw_properties_set(props, PW_KEY_NODE_NAME, str);
1358 	}
1359 	if ((str = getenv("PIPEWIRE_LATENCY")) != NULL)
1360 		pw_properties_set(props, PW_KEY_NODE_LATENCY, str);
1361 
1362 	spa_hook_list_init(&impl->hooks);
1363 	this->properties = props;
1364 
1365 	this->name = name ? strdup(name) : NULL;
1366 	this->node_id = SPA_ID_INVALID;
1367 
1368 	spa_ringbuffer_init(&impl->dequeued.ring);
1369 	spa_ringbuffer_init(&impl->queued.ring);
1370 	spa_list_init(&impl->param_list);
1371 
1372 	spa_hook_list_init(&this->listener_list);
1373 	spa_list_init(&this->controls);
1374 
1375 	this->state = PW_STREAM_STATE_UNCONNECTED;
1376 
1377 	impl->context = context;
1378 	impl->allow_mlock = context->settings.mem_allow_mlock;
1379 	impl->warn_mlock = context->settings.mem_warn_mlock;
1380 
1381 	spa_hook_list_append(&impl->context->driver_listener_list,
1382 			&impl->context_listener,
1383 			&context_events, impl);
1384 	return impl;
1385 
1386 error_properties:
1387 	pw_properties_free(impl->port_props);
1388 	free(impl);
1389 error_cleanup:
1390 	pw_properties_free(props);
1391 	errno = -res;
1392 	return NULL;
1393 }
1394 
1395 SPA_EXPORT
pw_stream_new(struct pw_core * core,const char * name,struct pw_properties * props)1396 struct pw_stream * pw_stream_new(struct pw_core *core, const char *name,
1397 	      struct pw_properties *props)
1398 {
1399 	struct stream *impl;
1400 	struct pw_stream *this;
1401 	struct pw_context *context = core->context;
1402 
1403 	impl = stream_new(context, name, props, core->properties);
1404 	if (impl == NULL)
1405 		return NULL;
1406 
1407 	this = &impl->this;
1408 	this->core = core;
1409 	spa_list_append(&core->stream_list, &this->link);
1410 	pw_core_add_listener(core,
1411 			&this->core_listener, &core_events, this);
1412 
1413 	return this;
1414 }
1415 
1416 SPA_EXPORT
1417 struct pw_stream *
pw_stream_new_simple(struct pw_loop * loop,const char * name,struct pw_properties * props,const struct pw_stream_events * events,void * data)1418 pw_stream_new_simple(struct pw_loop *loop,
1419 		     const char *name,
1420 		     struct pw_properties *props,
1421 		     const struct pw_stream_events *events,
1422 		     void *data)
1423 {
1424 	struct pw_stream *this;
1425 	struct stream *impl;
1426 	struct pw_context *context;
1427 	int res;
1428 
1429 	if (props == NULL)
1430 		props = pw_properties_new(NULL, NULL);
1431 	if (props == NULL)
1432 		return NULL;
1433 
1434 	context = pw_context_new(loop, NULL, 0);
1435 	if (context == NULL) {
1436 		res = -errno;
1437 		goto error_cleanup;
1438 	}
1439 
1440 	impl = stream_new(context, name, props, NULL);
1441 	if (impl == NULL) {
1442 		res = -errno;
1443 		props = NULL;
1444 		goto error_cleanup;
1445 	}
1446 
1447 	this = &impl->this;
1448 	impl->data.context = context;
1449 	pw_stream_add_listener(this, &impl->data.stream_listener, events, data);
1450 
1451 	return this;
1452 
1453 error_cleanup:
1454 	if (context)
1455 		pw_context_destroy(context);
1456 	pw_properties_free(props);
1457 	errno = -res;
1458 	return NULL;
1459 }
1460 
1461 SPA_EXPORT
pw_stream_state_as_string(enum pw_stream_state state)1462 const char *pw_stream_state_as_string(enum pw_stream_state state)
1463 {
1464 	switch (state) {
1465 	case PW_STREAM_STATE_ERROR:
1466 		return "error";
1467 	case PW_STREAM_STATE_UNCONNECTED:
1468 		return "unconnected";
1469 	case PW_STREAM_STATE_CONNECTING:
1470 		return "connecting";
1471 	case PW_STREAM_STATE_PAUSED:
1472 		return "paused";
1473 	case PW_STREAM_STATE_STREAMING:
1474 		return "streaming";
1475 	}
1476 	return "invalid-state";
1477 }
1478 
1479 SPA_EXPORT
pw_stream_destroy(struct pw_stream * stream)1480 void pw_stream_destroy(struct pw_stream *stream)
1481 {
1482 	struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
1483 	struct control *c;
1484 
1485 	pw_log_debug("%p: destroy", stream);
1486 
1487 	pw_stream_emit_destroy(stream);
1488 
1489 	if (!impl->disconnecting)
1490 		pw_stream_disconnect(stream);
1491 
1492 	if (stream->core) {
1493 		spa_hook_remove(&stream->core_listener);
1494 		spa_list_remove(&stream->link);
1495 		stream->core = NULL;
1496 	}
1497 
1498 	clear_params(impl, SPA_ID_INVALID);
1499 
1500 	pw_log_debug("%p: free", stream);
1501 	free(stream->error);
1502 
1503 	pw_properties_free(stream->properties);
1504 
1505 	free(stream->name);
1506 
1507 	spa_list_consume(c, &stream->controls, link) {
1508 		spa_list_remove(&c->link);
1509 		free(c);
1510 	}
1511 
1512 	spa_hook_list_clean(&impl->hooks);
1513 	spa_hook_list_clean(&stream->listener_list);
1514 
1515 	spa_hook_remove(&impl->context_listener);
1516 
1517 	if (impl->data.context)
1518 		pw_context_destroy(impl->data.context);
1519 
1520 	pw_properties_free(impl->port_props);
1521 	free(impl);
1522 }
1523 
hook_removed(struct spa_hook * hook)1524 static void hook_removed(struct spa_hook *hook)
1525 {
1526 	struct stream *impl = hook->priv;
1527 	spa_zero(impl->rt_callbacks);
1528 	hook->priv = NULL;
1529 	hook->removed = NULL;
1530 }
1531 
1532 SPA_EXPORT
pw_stream_add_listener(struct pw_stream * stream,struct spa_hook * listener,const struct pw_stream_events * events,void * data)1533 void pw_stream_add_listener(struct pw_stream *stream,
1534 			    struct spa_hook *listener,
1535 			    const struct pw_stream_events *events,
1536 			    void *data)
1537 {
1538 	struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
1539 	spa_hook_list_append(&stream->listener_list, listener, events, data);
1540 
1541 	if (events->process && impl->rt_callbacks.funcs == NULL) {
1542 		impl->rt_callbacks = SPA_CALLBACKS_INIT(events, data);
1543 		listener->removed = hook_removed;
1544 		listener->priv = impl;
1545 	}
1546 }
1547 
1548 SPA_EXPORT
pw_stream_get_state(struct pw_stream * stream,const char ** error)1549 enum pw_stream_state pw_stream_get_state(struct pw_stream *stream, const char **error)
1550 {
1551 	if (error)
1552 		*error = stream->error;
1553 	return stream->state;
1554 }
1555 
1556 SPA_EXPORT
pw_stream_get_name(struct pw_stream * stream)1557 const char *pw_stream_get_name(struct pw_stream *stream)
1558 {
1559 	return stream->name;
1560 }
1561 
1562 SPA_EXPORT
pw_stream_get_properties(struct pw_stream * stream)1563 const struct pw_properties *pw_stream_get_properties(struct pw_stream *stream)
1564 {
1565 	return stream->properties;
1566 }
1567 
1568 SPA_EXPORT
pw_stream_update_properties(struct pw_stream * stream,const struct spa_dict * dict)1569 int pw_stream_update_properties(struct pw_stream *stream, const struct spa_dict *dict)
1570 {
1571 	struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
1572 	int changed, res = 0;
1573 
1574 	changed = pw_properties_update(stream->properties, dict);
1575 
1576 	if (!changed)
1577 		return 0;
1578 
1579 	if (impl->node)
1580 		res = pw_impl_node_update_properties(impl->node, dict);
1581 
1582 	return res;
1583 }
1584 
1585 SPA_EXPORT
pw_stream_get_core(struct pw_stream * stream)1586 struct pw_core *pw_stream_get_core(struct pw_stream *stream)
1587 {
1588 	return stream->core;
1589 }
1590 
add_params(struct stream * impl)1591 static void add_params(struct stream *impl)
1592 {
1593 	uint8_t buffer[4096];
1594 	struct spa_pod_builder b;
1595 
1596 	spa_pod_builder_init(&b, buffer, sizeof(buffer));
1597 
1598 	add_param(impl, SPA_PARAM_IO, PARAM_FLAG_LOCKED,
1599 		spa_pod_builder_add_object(&b,
1600 			SPA_TYPE_OBJECT_ParamIO, SPA_PARAM_IO,
1601 			SPA_PARAM_IO_id,   SPA_POD_Id(SPA_IO_Buffers),
1602 			SPA_PARAM_IO_size, SPA_POD_Int(sizeof(struct spa_io_buffers))));
1603 
1604 	add_param(impl, SPA_PARAM_Meta, PARAM_FLAG_LOCKED,
1605 		spa_pod_builder_add_object(&b,
1606 			SPA_TYPE_OBJECT_ParamMeta, SPA_PARAM_Meta,
1607 			SPA_PARAM_META_type, SPA_POD_Id(SPA_META_Busy),
1608 			SPA_PARAM_META_size, SPA_POD_Int(sizeof(struct spa_meta_busy))));
1609 }
1610 
find_format(struct stream * impl,enum pw_direction direction,uint32_t * media_type,uint32_t * media_subtype)1611 static int find_format(struct stream *impl, enum pw_direction direction,
1612 		uint32_t *media_type, uint32_t *media_subtype)
1613 {
1614 	uint32_t state = 0;
1615 	uint8_t buffer[4096];
1616 	struct spa_pod_builder b;
1617 	int res;
1618 	struct spa_pod *format;
1619 
1620 	spa_pod_builder_init(&b, buffer, sizeof(buffer));
1621 	if (spa_node_port_enum_params_sync(&impl->impl_node,
1622 				impl->direction, 0,
1623 				SPA_PARAM_EnumFormat, &state,
1624 				NULL, &format, &b) != 1) {
1625 		pw_log_warn("%p: no format given", impl);
1626 		return 0;
1627 	}
1628 
1629 	if ((res = spa_format_parse(format, media_type, media_subtype)) < 0)
1630 		return res;
1631 
1632 	pw_log_debug("%p: %s/%s", impl,
1633 			spa_debug_type_find_name(spa_type_media_type, *media_type),
1634 			spa_debug_type_find_name(spa_type_media_subtype, *media_subtype));
1635 	return 0;
1636 }
1637 
get_media_class(struct stream * impl)1638 static const char *get_media_class(struct stream *impl)
1639 {
1640 	switch (impl->media_type) {
1641 	case SPA_MEDIA_TYPE_audio:
1642 		return "Audio";
1643 	case SPA_MEDIA_TYPE_video:
1644 		return "Video";
1645 	case SPA_MEDIA_TYPE_application:
1646 		switch(impl->media_subtype) {
1647 		case SPA_MEDIA_SUBTYPE_control:
1648 			return "Midi";
1649 		}
1650 		return "Data";
1651 	case SPA_MEDIA_TYPE_stream:
1652 		switch(impl->media_subtype) {
1653 		case SPA_MEDIA_SUBTYPE_midi:
1654 			return "Midi";
1655 		}
1656 		return "Data";
1657 	default:
1658 		return "Unknown";
1659 	}
1660 }
1661 
1662 SPA_EXPORT
1663 int
pw_stream_connect(struct pw_stream * stream,enum pw_direction direction,uint32_t target_id,enum pw_stream_flags flags,const struct spa_pod ** params,uint32_t n_params)1664 pw_stream_connect(struct pw_stream *stream,
1665 		  enum pw_direction direction,
1666 		  uint32_t target_id,
1667 		  enum pw_stream_flags flags,
1668 		  const struct spa_pod **params,
1669 		  uint32_t n_params)
1670 {
1671 	struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
1672 	struct pw_impl_factory *factory;
1673 	struct pw_properties *props = NULL;
1674 	struct pw_impl_node *follower;
1675 	const char *str;
1676 	uint32_t i;
1677 	int res;
1678 
1679 	pw_log_debug("%p: connect target:%d", stream, target_id);
1680 	impl->direction =
1681 	    direction == PW_DIRECTION_INPUT ? SPA_DIRECTION_INPUT : SPA_DIRECTION_OUTPUT;
1682 	impl->flags = flags;
1683 	impl->node_methods = impl_node;
1684 
1685 	if (impl->direction == SPA_DIRECTION_INPUT)
1686 		impl->node_methods.process = impl_node_process_input;
1687 	else
1688 		impl->node_methods.process = impl_node_process_output;
1689 
1690 	impl->process_rt = SPA_FLAG_IS_SET(flags, PW_STREAM_FLAG_RT_PROCESS);
1691 
1692 	impl->impl_node.iface = SPA_INTERFACE_INIT(
1693 			SPA_TYPE_INTERFACE_Node,
1694 			SPA_VERSION_NODE,
1695 			&impl->node_methods, impl);
1696 
1697 	impl->change_mask_all =
1698 		SPA_NODE_CHANGE_MASK_FLAGS |
1699 		SPA_NODE_CHANGE_MASK_PROPS |
1700 		SPA_NODE_CHANGE_MASK_PARAMS;
1701 
1702 	impl->info = SPA_NODE_INFO_INIT();
1703 	if (impl->direction == SPA_DIRECTION_INPUT) {
1704 		impl->info.max_input_ports = 1;
1705 		impl->info.max_output_ports = 0;
1706 	} else {
1707 		impl->info.max_input_ports = 0;
1708 		impl->info.max_output_ports = 1;
1709 	}
1710 	/* we're always RT safe, if the stream was marked RT_PROCESS,
1711 	 * the callback must be RT safe */
1712 	impl->info.flags = SPA_NODE_FLAG_RT;
1713 	/* if the callback was not marked RT_PROCESS, we will offload
1714 	 * the process callback in the main thread and we are ASYNC */
1715 	if (!impl->process_rt)
1716 		impl->info.flags |= SPA_NODE_FLAG_ASYNC;
1717 	impl->info.props = &stream->properties->dict;
1718 	impl->params[IDX_Props] = SPA_PARAM_INFO(SPA_PARAM_Props, SPA_PARAM_INFO_WRITE);
1719 	impl->info.params = impl->params;
1720 	impl->info.n_params = N_NODE_PARAMS;
1721 	impl->info.change_mask = impl->change_mask_all;
1722 
1723 	impl->port_change_mask_all =
1724 		SPA_PORT_CHANGE_MASK_FLAGS |
1725 		SPA_PORT_CHANGE_MASK_PROPS |
1726 		SPA_PORT_CHANGE_MASK_PARAMS;
1727 
1728 	impl->port_info = SPA_PORT_INFO_INIT();
1729 	impl->port_info.change_mask = impl->port_change_mask_all;
1730 	impl->port_info.flags = 0;
1731 	if (SPA_FLAG_IS_SET(flags, PW_STREAM_FLAG_ALLOC_BUFFERS))
1732 		impl->port_info.flags |= SPA_PORT_FLAG_CAN_ALLOC_BUFFERS;
1733 	impl->port_params[IDX_EnumFormat] = SPA_PARAM_INFO(SPA_PARAM_EnumFormat, 0);
1734 	impl->port_params[IDX_Meta] = SPA_PARAM_INFO(SPA_PARAM_Meta, 0);
1735 	impl->port_params[IDX_IO] = SPA_PARAM_INFO(SPA_PARAM_IO, 0);
1736 	impl->port_params[IDX_Format] = SPA_PARAM_INFO(SPA_PARAM_Format, SPA_PARAM_INFO_WRITE);
1737 	impl->port_params[IDX_Buffers] = SPA_PARAM_INFO(SPA_PARAM_Buffers, 0);
1738 	impl->port_params[IDX_Latency] = SPA_PARAM_INFO(SPA_PARAM_Latency, SPA_PARAM_INFO_WRITE);
1739 	impl->port_info.props = &impl->port_props->dict;
1740 	impl->port_info.params = impl->port_params;
1741 	impl->port_info.n_params = N_PORT_PARAMS;
1742 
1743 	clear_params(impl, SPA_ID_INVALID);
1744 	for (i = 0; i < n_params; i++)
1745 		add_param(impl, SPA_ID_INVALID, 0, params[i]);
1746 
1747 	add_params(impl);
1748 
1749 	if ((res = find_format(impl, direction, &impl->media_type, &impl->media_subtype)) < 0)
1750 		return res;
1751 
1752 	impl->disconnecting = false;
1753 	stream_set_state(stream, PW_STREAM_STATE_CONNECTING, NULL);
1754 
1755 	if (target_id != PW_ID_ANY)
1756 		pw_properties_setf(stream->properties, PW_KEY_NODE_TARGET, "%d", target_id);
1757 	else if ((str = getenv("PIPEWIRE_NODE")) != NULL)
1758 		pw_properties_set(stream->properties, PW_KEY_NODE_TARGET, str);
1759 	if ((flags & PW_STREAM_FLAG_AUTOCONNECT) &&
1760 	    pw_properties_get(stream->properties, PW_KEY_NODE_AUTOCONNECT) == NULL) {
1761 		str = getenv("PIPEWIRE_AUTOCONNECT");
1762 		pw_properties_set(stream->properties, PW_KEY_NODE_AUTOCONNECT, str ? str : "true");
1763 	}
1764 	if (flags & PW_STREAM_FLAG_DRIVER)
1765 		pw_properties_set(stream->properties, PW_KEY_NODE_DRIVER, "true");
1766 	if (flags & PW_STREAM_FLAG_EXCLUSIVE)
1767 		pw_properties_set(stream->properties, PW_KEY_NODE_EXCLUSIVE, "true");
1768 	if (flags & PW_STREAM_FLAG_DONT_RECONNECT)
1769 		pw_properties_set(stream->properties, PW_KEY_NODE_DONT_RECONNECT, "true");
1770 	if (flags & PW_STREAM_FLAG_TRIGGER) {
1771 		pw_properties_set(stream->properties, PW_KEY_NODE_TRIGGER, "true");
1772 		impl->trigger = true;
1773 	}
1774 
1775 	if ((str = pw_properties_get(stream->properties, "mem.warn-mlock")) != NULL)
1776 		impl->warn_mlock = pw_properties_parse_bool(str);
1777 
1778 	if ((pw_properties_get(stream->properties, PW_KEY_MEDIA_CLASS) == NULL)) {
1779 		const char *media_type = pw_properties_get(stream->properties, PW_KEY_MEDIA_TYPE);
1780 		pw_properties_setf(stream->properties, PW_KEY_MEDIA_CLASS, "Stream/%s/%s",
1781 				direction == PW_DIRECTION_INPUT ? "Input" : "Output",
1782 				media_type ? media_type : get_media_class(impl));
1783 	}
1784 	if ((str = pw_properties_get(stream->properties, PW_KEY_FORMAT_DSP)) != NULL)
1785 		pw_properties_set(impl->port_props, PW_KEY_FORMAT_DSP, str);
1786 	else if (impl->media_type == SPA_MEDIA_TYPE_application &&
1787 	    impl->media_subtype == SPA_MEDIA_SUBTYPE_control)
1788 		pw_properties_set(impl->port_props, PW_KEY_FORMAT_DSP, "8 bit raw midi");
1789 
1790 	impl->port_info.props = &impl->port_props->dict;
1791 
1792 	if (stream->core == NULL) {
1793 		stream->core = pw_context_connect(impl->context,
1794 				pw_properties_copy(stream->properties), 0);
1795 		if (stream->core == NULL) {
1796 			res = -errno;
1797 			goto error_connect;
1798 		}
1799 		spa_list_append(&stream->core->stream_list, &stream->link);
1800 		pw_core_add_listener(stream->core,
1801 				&stream->core_listener, &core_events, stream);
1802 		impl->disconnect_core = true;
1803 	}
1804 
1805 	pw_log_debug("%p: creating node", stream);
1806 	props = pw_properties_copy(stream->properties);
1807 	if (props == NULL) {
1808 		res = -errno;
1809 		goto error_node;
1810 	}
1811 
1812 	if ((str = pw_properties_get(props, PW_KEY_STREAM_MONITOR)) &&
1813 	    pw_properties_parse_bool(str)) {
1814 		pw_properties_set(props, "resample.peaks", "true");
1815 		pw_properties_set(props, "channelmix.normalize", "true");
1816 	}
1817 
1818 	follower = pw_context_create_node(impl->context, pw_properties_copy(props), 0);
1819 	if (follower == NULL) {
1820 		res = -errno;
1821 		goto error_node;
1822 	}
1823 
1824 	pw_impl_node_set_implementation(follower, &impl->impl_node);
1825 
1826 	if (impl->media_type == SPA_MEDIA_TYPE_audio) {
1827 		factory = pw_context_find_factory(impl->context, "adapter");
1828 		if (factory == NULL) {
1829 			pw_log_error("%p: no adapter factory found", stream);
1830 			res = -ENOENT;
1831 			goto error_node;
1832 		}
1833 		pw_properties_setf(props, "adapt.follower.node", "pointer:%p", follower);
1834 		pw_properties_set(props, "object.register", "false");
1835 		impl->node = pw_impl_factory_create_object(factory,
1836 				NULL,
1837 				PW_TYPE_INTERFACE_Node,
1838 				PW_VERSION_NODE,
1839 				props,
1840 				0);
1841 		props = NULL;
1842 		if (impl->node == NULL) {
1843 			res = -errno;
1844 			goto error_node;
1845 		}
1846 	} else {
1847 		impl->node = follower;
1848 		pw_properties_free(props);
1849 		props = NULL;
1850 	}
1851 	pw_impl_node_set_active(impl->node,
1852 			!SPA_FLAG_IS_SET(impl->flags, PW_STREAM_FLAG_INACTIVE));
1853 
1854 	pw_log_debug("%p: export node %p", stream, impl->node);
1855 	stream->proxy = pw_core_export(stream->core,
1856 			PW_TYPE_INTERFACE_Node, NULL, impl->node, 0);
1857 	if (stream->proxy == NULL) {
1858 		res = -errno;
1859 		goto error_proxy;
1860 	}
1861 
1862 	pw_proxy_add_listener(stream->proxy, &stream->proxy_listener, &proxy_events, stream);
1863 
1864 	pw_impl_node_add_listener(impl->node, &stream->node_listener, &node_events, stream);
1865 
1866 	return 0;
1867 
1868 error_connect:
1869 	pw_log_error("%p: can't connect: %s", stream, spa_strerror(res));
1870 	goto exit_cleanup;
1871 error_node:
1872 	pw_log_error("%p: can't make node: %s", stream, spa_strerror(res));
1873 	goto exit_cleanup;
1874 error_proxy:
1875 	pw_log_error("%p: can't make proxy: %s", stream, spa_strerror(res));
1876 	goto exit_cleanup;
1877 
1878 exit_cleanup:
1879 	pw_properties_free(props);
1880 	return res;
1881 }
1882 
1883 SPA_EXPORT
pw_stream_get_node_id(struct pw_stream * stream)1884 uint32_t pw_stream_get_node_id(struct pw_stream *stream)
1885 {
1886 	return stream->node_id;
1887 }
1888 
1889 SPA_EXPORT
pw_stream_disconnect(struct pw_stream * stream)1890 int pw_stream_disconnect(struct pw_stream *stream)
1891 {
1892 	struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
1893 
1894 	pw_log_debug("%p: disconnect", stream);
1895 
1896 	if (impl->disconnecting)
1897 		return 0;
1898 
1899 	impl->disconnecting = true;
1900 
1901 	if (impl->node)
1902 		pw_impl_node_set_active(impl->node, false);
1903 
1904 	if (stream->proxy) {
1905 		pw_proxy_destroy(stream->proxy);
1906 		stream->proxy = NULL;
1907 	}
1908 
1909 	if (impl->node) {
1910 		pw_impl_node_destroy(impl->node);
1911 		impl->node = NULL;
1912 	}
1913 	if (impl->disconnect_core) {
1914 		impl->disconnect_core = false;
1915 		spa_hook_remove(&stream->core_listener);
1916 		spa_list_remove(&stream->link);
1917 		pw_core_disconnect(stream->core);
1918 		stream->core = NULL;
1919 	}
1920 	return 0;
1921 }
1922 
1923 SPA_EXPORT
pw_stream_set_error(struct pw_stream * stream,int res,const char * error,...)1924 int pw_stream_set_error(struct pw_stream *stream,
1925 			int res, const char *error, ...)
1926 {
1927 	if (res < 0) {
1928 		va_list args;
1929 		char *value;
1930 		int r;
1931 
1932 		va_start(args, error);
1933 		r = vasprintf(&value, error, args);
1934 		va_end(args);
1935 		if (r < 0)
1936 			return -errno;
1937 
1938 		if (stream->proxy)
1939 			pw_proxy_error(stream->proxy, res, value);
1940 		stream_set_state(stream, PW_STREAM_STATE_ERROR, value);
1941 
1942 		free(value);
1943 	}
1944 	return res;
1945 }
1946 
1947 SPA_EXPORT
pw_stream_update_params(struct pw_stream * stream,const struct spa_pod ** params,uint32_t n_params)1948 int pw_stream_update_params(struct pw_stream *stream,
1949 			const struct spa_pod **params,
1950 			uint32_t n_params)
1951 {
1952 	struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
1953 	int res;
1954 
1955 	pw_log_debug("%p: update params", stream);
1956 	if ((res = update_params(impl, SPA_ID_INVALID, params, n_params)) < 0)
1957 		return res;
1958 
1959 	emit_node_info(impl, false);
1960 	emit_port_info(impl, false);
1961 
1962 	return res;
1963 }
1964 
1965 SPA_EXPORT
pw_stream_set_control(struct pw_stream * stream,uint32_t id,uint32_t n_values,float * values,...)1966 int pw_stream_set_control(struct pw_stream *stream, uint32_t id, uint32_t n_values, float *values, ...)
1967 {
1968 	struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
1969         va_list varargs;
1970 	char buf[1024];
1971 	struct spa_pod_builder b = SPA_POD_BUILDER_INIT(buf, sizeof(buf));
1972 	struct spa_pod_frame f[1];
1973 	struct spa_pod *pod;
1974 	struct control *c;
1975 
1976 	va_start(varargs, values);
1977 
1978 	spa_pod_builder_push_object(&b, &f[0], SPA_TYPE_OBJECT_Props, SPA_PARAM_Props);
1979 	while (1) {
1980 		pw_log_debug("%p: set control %d %d %f", stream, id, n_values, values[0]);
1981 
1982 		if ((c = find_control(stream, id))) {
1983 			spa_pod_builder_prop(&b, id, 0);
1984 			switch (c->container) {
1985 			case SPA_TYPE_Float:
1986 				spa_pod_builder_float(&b, values[0]);
1987 				break;
1988 			case SPA_TYPE_Bool:
1989 				spa_pod_builder_bool(&b, values[0] < 0.5 ? false : true);
1990 				break;
1991 			case SPA_TYPE_Array:
1992 				spa_pod_builder_array(&b,
1993 						sizeof(float), SPA_TYPE_Float,
1994 						n_values, values);
1995 				break;
1996 			default:
1997 				spa_pod_builder_none(&b);
1998 				break;
1999 			}
2000 		} else {
2001 			pw_log_warn("%p: unknown control with id %d", stream, id);
2002 		}
2003 		if ((id = va_arg(varargs, uint32_t)) == 0)
2004 			break;
2005 		n_values = va_arg(varargs, uint32_t);
2006 		values = va_arg(varargs, float *);
2007 	}
2008 	pod = spa_pod_builder_pop(&b, &f[0]);
2009 
2010 	va_end(varargs);
2011 
2012 	pw_impl_node_set_param(impl->node, SPA_PARAM_Props, 0, pod);
2013 
2014 	return 0;
2015 }
2016 
2017 SPA_EXPORT
pw_stream_get_control(struct pw_stream * stream,uint32_t id)2018 const struct pw_stream_control *pw_stream_get_control(struct pw_stream *stream, uint32_t id)
2019 {
2020 	struct control *c;
2021 
2022 	if (id == 0)
2023 		return NULL;
2024 
2025 	if ((c = find_control(stream, id)))
2026 		return &c->control;
2027 
2028 	return NULL;
2029 }
2030 
2031 SPA_EXPORT
pw_stream_set_active(struct pw_stream * stream,bool active)2032 int pw_stream_set_active(struct pw_stream *stream, bool active)
2033 {
2034 	struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
2035 	pw_log_debug("%p: active:%d", stream, active);
2036 	if (impl->node)
2037 		pw_impl_node_set_active(impl->node, active);
2038 
2039 	if (!active || impl->drained)
2040 		impl->drained = impl->draining = false;
2041 	return 0;
2042 }
2043 
2044 SPA_EXPORT
pw_stream_get_time(struct pw_stream * stream,struct pw_time * time)2045 int pw_stream_get_time(struct pw_stream *stream, struct pw_time *time)
2046 {
2047 	struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
2048 	uintptr_t seq1, seq2;
2049 
2050 	do {
2051 		seq1 = SEQ_READ(impl->seq);
2052 		*time = impl->time;
2053 		seq2 = SEQ_READ(impl->seq);
2054 	} while (!SEQ_READ_SUCCESS(seq1, seq2));
2055 
2056 	if (impl->direction == SPA_DIRECTION_INPUT)
2057 		time->queued = (int64_t)(time->queued - impl->dequeued.outcount);
2058 	else
2059 		time->queued = (int64_t)(impl->queued.incount - time->queued);
2060 
2061 	time->delay += ((impl->latency.min_quantum + impl->latency.max_quantum) / 2) * impl->quantum;
2062 	time->delay += (impl->latency.min_rate + impl->latency.max_rate) / 2;
2063 	time->delay += ((impl->latency.min_ns + impl->latency.max_ns) / 2) * time->rate.denom / SPA_NSEC_PER_SEC;
2064 
2065 	pw_log_trace("%p: %"PRIi64" %"PRIi64" %"PRIu64" %d/%d %"PRIu64" %"
2066 			PRIu64" %"PRIu64" %"PRIu64" %"PRIu64, stream,
2067 			time->now, time->delay, time->ticks,
2068 			time->rate.num, time->rate.denom, time->queued,
2069 			impl->dequeued.outcount, impl->dequeued.incount,
2070 			impl->queued.outcount, impl->queued.incount);
2071 
2072 	return 0;
2073 }
2074 
2075 static int
do_trigger_deprecated(struct spa_loop * loop,bool async,uint32_t seq,const void * data,size_t size,void * user_data)2076 do_trigger_deprecated(struct spa_loop *loop,
2077                  bool async, uint32_t seq, const void *data, size_t size, void *user_data)
2078 {
2079 	struct stream *impl = user_data;
2080 	int res = impl->node_methods.process(impl);
2081 	return spa_node_call_ready(&impl->callbacks, res);
2082 }
2083 
2084 SPA_EXPORT
pw_stream_dequeue_buffer(struct pw_stream * stream)2085 struct pw_buffer *pw_stream_dequeue_buffer(struct pw_stream *stream)
2086 {
2087 	struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
2088 	struct buffer *b;
2089 	int res;
2090 
2091 	if ((b = pop_queue(impl, &impl->dequeued)) == NULL) {
2092 		res = -errno;
2093 		pw_log_trace("%p: no more buffers: %m", stream);
2094 		errno = -res;
2095 		return NULL;
2096 	}
2097 	pw_log_trace("%p: dequeue buffer %d", stream, b->id);
2098 
2099 	if (b->busy && impl->direction == SPA_DIRECTION_OUTPUT) {
2100 		if (ATOMIC_INC(b->busy->count) > 1) {
2101 			ATOMIC_DEC(b->busy->count);
2102 			push_queue(impl, &impl->dequeued, b);
2103 			pw_log_trace("%p: buffer busy", stream);
2104 			errno = EBUSY;
2105 			return NULL;
2106 		}
2107 	}
2108 	return &b->this;
2109 }
2110 
2111 SPA_EXPORT
pw_stream_queue_buffer(struct pw_stream * stream,struct pw_buffer * buffer)2112 int pw_stream_queue_buffer(struct pw_stream *stream, struct pw_buffer *buffer)
2113 {
2114 	struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
2115 	struct buffer *b = SPA_CONTAINER_OF(buffer, struct buffer, this);
2116 	int res;
2117 
2118 	if (b->busy)
2119 		ATOMIC_DEC(b->busy->count);
2120 
2121 	pw_log_trace("%p: queue buffer %d", stream, b->id);
2122 	if ((res = push_queue(impl, &impl->queued, b)) < 0)
2123 		return res;
2124 
2125 	if (impl->direction == SPA_DIRECTION_OUTPUT &&
2126 	    impl->driving && !impl->using_trigger) {
2127 		pw_log_debug("deprecated: use pw_stream_trigger_process() to drive the stream.");
2128 		res = pw_loop_invoke(impl->context->data_loop,
2129 			do_trigger_deprecated, 1, NULL, 0, false, impl);
2130 	}
2131 	return res;
2132 }
2133 
2134 static int
do_flush(struct spa_loop * loop,bool async,uint32_t seq,const void * data,size_t size,void * user_data)2135 do_flush(struct spa_loop *loop,
2136                  bool async, uint32_t seq, const void *data, size_t size, void *user_data)
2137 {
2138 	struct stream *impl = user_data;
2139 	struct buffer *b;
2140 
2141 	pw_log_trace("%p: flush", impl);
2142 	do {
2143 		b = pop_queue(impl, &impl->queued);
2144 		if (b != NULL)
2145 			push_queue(impl, &impl->dequeued, b);
2146 	}
2147 	while (b);
2148 
2149 	impl->queued.outcount = impl->dequeued.incount =
2150 		impl->dequeued.outcount = impl->queued.incount = 0;
2151 
2152 	return 0;
2153 }
2154 static int
do_drain(struct spa_loop * loop,bool async,uint32_t seq,const void * data,size_t size,void * user_data)2155 do_drain(struct spa_loop *loop,
2156                  bool async, uint32_t seq, const void *data, size_t size, void *user_data)
2157 {
2158 	struct stream *impl = user_data;
2159 	pw_log_trace("%p", impl);
2160 	impl->draining = true;
2161 	impl->drained = false;
2162 	return 0;
2163 }
2164 
2165 SPA_EXPORT
pw_stream_flush(struct pw_stream * stream,bool drain)2166 int pw_stream_flush(struct pw_stream *stream, bool drain)
2167 {
2168 	struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
2169 	pw_loop_invoke(impl->context->data_loop,
2170 			drain ? do_drain : do_flush, 1, NULL, 0, true, impl);
2171 	if (!drain)
2172 		spa_node_send_command(impl->node->node,
2173 				&SPA_NODE_COMMAND_INIT(SPA_NODE_COMMAND_Flush));
2174 	return 0;
2175 }
2176 
2177 SPA_EXPORT
pw_stream_is_driving(struct pw_stream * stream)2178 bool pw_stream_is_driving(struct pw_stream *stream)
2179 {
2180 	struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
2181 	return impl->driving;
2182 }
2183 
2184 static int
do_trigger_process(struct spa_loop * loop,bool async,uint32_t seq,const void * data,size_t size,void * user_data)2185 do_trigger_process(struct spa_loop *loop,
2186                  bool async, uint32_t seq, const void *data, size_t size, void *user_data)
2187 {
2188 	struct stream *impl = user_data;
2189 	int res;
2190 	if (impl->direction == SPA_DIRECTION_OUTPUT) {
2191 		if (impl->process_rt)
2192 			spa_callbacks_call(&impl->rt_callbacks, struct pw_stream_events, process, 0);
2193 		res = impl->node_methods.process(impl);
2194 	} else {
2195 		res = SPA_STATUS_NEED_DATA;
2196 	}
2197 	return spa_node_call_ready(&impl->callbacks, res);
2198 }
2199 
trigger_request_process(struct stream * impl)2200 static int trigger_request_process(struct stream *impl)
2201 {
2202 	uint8_t buffer[1024];
2203 	struct spa_pod_builder b = { 0 };
2204 
2205 	spa_pod_builder_init(&b, buffer, sizeof(buffer));
2206 	spa_node_emit_event(&impl->hooks,
2207 			spa_pod_builder_add_object(&b,
2208 				SPA_TYPE_EVENT_Node, SPA_NODE_EVENT_RequestProcess));
2209 	return 0;
2210 }
2211 
2212 SPA_EXPORT
pw_stream_trigger_process(struct pw_stream * stream)2213 int pw_stream_trigger_process(struct pw_stream *stream)
2214 {
2215 	struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
2216 	int res = 0;
2217 
2218 	pw_log_trace("%p", impl);
2219 
2220 	/* flag to check for old or new behaviour */
2221 	impl->using_trigger = true;
2222 
2223 	if (!impl->driving && !impl->trigger) {
2224 		res = trigger_request_process(impl);
2225 	} else {
2226 		if (impl->direction == SPA_DIRECTION_OUTPUT &&
2227 		    !impl->process_rt) {
2228 			pw_loop_invoke(impl->context->main_loop,
2229 				do_call_process, 1, NULL, 0, false, impl);
2230 		}
2231 		res = pw_loop_invoke(impl->context->data_loop,
2232 			do_trigger_process, 1, NULL, 0, false, impl);
2233 	}
2234 	return res;
2235 }
2236