1 /* PipeWire
2  *
3  * Copyright © 2018 Wim Taymans
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining a
6  * copy of this software and associated documentation files (the "Software"),
7  * to deal in the Software without restriction, including without limitation
8  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9  * and/or sell copies of the Software, and to permit persons to whom the
10  * Software is furnished to do so, subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice (including the next
13  * paragraph) shall be included in all copies or substantial portions of the
14  * Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.  IN NO EVENT SHALL
19  * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
22  * DEALINGS IN THE SOFTWARE.
23  */
24 
25 #include <string.h>
26 #include <stdlib.h>
27 #include <errno.h>
28 #include <float.h>
29 
30 #include <spa/pod/parser.h>
31 #include <spa/param/audio/format-utils.h>
32 #include <spa/node/utils.h>
33 #include <spa/utils/names.h>
34 #include <spa/utils/string.h>
35 #include <spa/debug/types.h>
36 #include <spa/pod/filter.h>
37 
38 #include "pipewire/impl.h"
39 #include "pipewire/private.h"
40 
41 PW_LOG_TOPIC_EXTERN(log_port);
42 #define PW_LOG_TOPIC_DEFAULT log_port
43 
44 /** \cond */
45 struct impl {
46 	struct pw_impl_port this;
47 	struct spa_node mix_node;	/**< mix node implementation */
48 
49 	struct spa_list param_list;
50 	struct spa_list pending_list;
51 
52 	unsigned int cache_params:1;
53 };
54 
55 #define pw_port_resource(r,m,v,...)	pw_resource_call(r,struct pw_port_events,m,v,__VA_ARGS__)
56 #define pw_port_resource_info(r,...)	pw_port_resource(r,info,0,__VA_ARGS__)
57 #define pw_port_resource_param(r,...)	pw_port_resource(r,param,0,__VA_ARGS__)
58 
59 struct resource_data {
60 	struct pw_impl_port *port;
61 	struct pw_resource *resource;
62 
63 	struct spa_hook resource_listener;
64 	struct spa_hook object_listener;
65 
66 	uint32_t subscribe_ids[MAX_PARAMS];
67 	uint32_t n_subscribe_ids;
68 };
69 
70 /** \endcond */
71 
emit_info_changed(struct pw_impl_port * port)72 static void emit_info_changed(struct pw_impl_port *port)
73 {
74 	struct pw_resource *resource;
75 
76 	if (port->info.change_mask == 0)
77 		return;
78 
79 	pw_impl_port_emit_info_changed(port, &port->info);
80 	if (port->node)
81 		pw_impl_node_emit_port_info_changed(port->node, port, &port->info);
82 
83 	if (port->global)
84 		spa_list_for_each(resource, &port->global->resource_list, link)
85 			pw_port_resource_info(resource, &port->info);
86 
87 	port->info.change_mask = 0;
88 }
89 
port_state_as_string(enum pw_impl_port_state state)90 static const char *port_state_as_string(enum pw_impl_port_state state)
91 {
92 	switch (state) {
93 	case PW_IMPL_PORT_STATE_ERROR:
94 		return "error";
95 	case PW_IMPL_PORT_STATE_INIT:
96 		return "init";
97 	case PW_IMPL_PORT_STATE_CONFIGURE:
98 		return "configure";
99 	case PW_IMPL_PORT_STATE_READY:
100 		return "ready";
101 	case PW_IMPL_PORT_STATE_PAUSED:
102 		return "paused";
103 	}
104 	return "invalid-state";
105 }
106 
pw_impl_port_update_state(struct pw_impl_port * port,enum pw_impl_port_state state,int res,char * error)107 void pw_impl_port_update_state(struct pw_impl_port *port, enum pw_impl_port_state state, int res, char *error)
108 {
109 	enum pw_impl_port_state old = port->state;
110 
111 	port->state = state;
112 	free((void*)port->error);
113 	port->error = error;
114 
115 	if (old == state)
116 		return;
117 
118 	pw_log(state == PW_IMPL_PORT_STATE_ERROR ?
119 			SPA_LOG_LEVEL_ERROR : SPA_LOG_LEVEL_DEBUG,
120 		"%p: state %s -> %s (%s)", port,
121 		port_state_as_string(old), port_state_as_string(state), error);
122 
123 	pw_impl_port_emit_state_changed(port, old, state, error);
124 
125 	if (state == PW_IMPL_PORT_STATE_ERROR && port->global) {
126 		struct pw_resource *resource;
127 		spa_list_for_each(resource, &port->global->resource_list, link)
128 			pw_resource_error(resource, res, error);
129 	}
130 }
131 
tee_process(void * object)132 static int tee_process(void *object)
133 {
134 	struct impl *impl = object;
135 	struct pw_impl_port *this = &impl->this;
136 	struct pw_impl_port_mix *mix;
137 	struct spa_io_buffers *io = &this->rt.io;
138 
139 	pw_log_trace_fp("%p: tee input %d %d", this, io->status, io->buffer_id);
140 	spa_list_for_each(mix, &this->rt.mix_list, rt_link) {
141 		pw_log_trace_fp("%p: port %d %p->%p %d", this,
142 				mix->port.port_id, io, mix->io, mix->io->buffer_id);
143 		*mix->io = *io;
144 	}
145 	io->status = SPA_STATUS_NEED_DATA;
146 
147         return SPA_STATUS_HAVE_DATA | SPA_STATUS_NEED_DATA;
148 }
149 
tee_reuse_buffer(void * object,uint32_t port_id,uint32_t buffer_id)150 static int tee_reuse_buffer(void *object, uint32_t port_id, uint32_t buffer_id)
151 {
152 	struct impl *impl = object;
153 	struct pw_impl_port *this = &impl->this;
154 
155 	pw_log_trace_fp("%p: tee reuse buffer %d %d", this, port_id, buffer_id);
156 	spa_node_port_reuse_buffer(this->node->node, this->port_id, buffer_id);
157 
158 	return 0;
159 }
160 
161 static const struct spa_node_methods schedule_tee_node = {
162 	SPA_VERSION_NODE_METHODS,
163 	.process = tee_process,
164 	.port_reuse_buffer = tee_reuse_buffer,
165 };
166 
schedule_mix_input(void * object)167 static int schedule_mix_input(void *object)
168 {
169 	struct impl *impl = object;
170 	struct pw_impl_port *this = &impl->this;
171 	struct spa_io_buffers *io = &this->rt.io;
172 	struct pw_impl_port_mix *mix;
173 
174 	if (SPA_UNLIKELY(PW_IMPL_PORT_IS_CONTROL(this)))
175 		return SPA_STATUS_HAVE_DATA | SPA_STATUS_NEED_DATA;
176 
177 	spa_list_for_each(mix, &this->rt.mix_list, rt_link) {
178 		pw_log_trace_fp("%p: mix input %d %p->%p %d %d", this,
179 				mix->port.port_id, mix->io, io, mix->io->status, mix->io->buffer_id);
180 		*io = *mix->io;
181 		mix->io->status = SPA_STATUS_NEED_DATA;
182 		break;
183 	}
184         return SPA_STATUS_HAVE_DATA | SPA_STATUS_NEED_DATA;
185 }
186 
schedule_mix_reuse_buffer(void * object,uint32_t port_id,uint32_t buffer_id)187 static int schedule_mix_reuse_buffer(void *object, uint32_t port_id, uint32_t buffer_id)
188 {
189 	struct impl *impl = object;
190 	struct pw_impl_port *this = &impl->this;
191 	struct pw_impl_port_mix *mix;
192 
193 	spa_list_for_each(mix, &this->rt.mix_list, rt_link) {
194 		pw_log_trace_fp("%p: reuse buffer %d %d", this, port_id, buffer_id);
195 		/* FIXME send reuse buffer to peer */
196 		break;
197 	}
198 	return 0;
199 }
200 
201 static const struct spa_node_methods schedule_mix_node = {
202 	SPA_VERSION_NODE_METHODS,
203 	.process = schedule_mix_input,
204 	.port_reuse_buffer = schedule_mix_reuse_buffer,
205 };
206 
207 SPA_EXPORT
pw_impl_port_init_mix(struct pw_impl_port * port,struct pw_impl_port_mix * mix)208 int pw_impl_port_init_mix(struct pw_impl_port *port, struct pw_impl_port_mix *mix)
209 {
210 	uint32_t port_id;
211 	int res = 0;
212 
213 	port_id = pw_map_insert_new(&port->mix_port_map, mix);
214 	if (port_id == SPA_ID_INVALID)
215 		return -errno;
216 
217 	mix->port.direction = port->direction;
218 	mix->port.port_id = port_id;
219 
220 	spa_list_append(&port->mix_list, &mix->link);
221 	port->n_mix++;
222 	mix->p = port;
223 
224 	spa_node_add_port(port->mix, port->direction, port_id, NULL);
225 
226 	res = pw_impl_port_call_init_mix(port, mix);
227 
228 	/* set the same format on the mixer as on the port if any */
229 	{
230 		uint32_t idx = 0;
231 		uint8_t buffer[1024];
232 		struct spa_pod_builder b;
233 		struct spa_pod *param;
234 
235 		spa_pod_builder_init(&b, buffer, sizeof(buffer));
236 		if (spa_node_port_enum_params_sync(port->mix,
237 				pw_direction_reverse(port->direction), 0,
238 				SPA_PARAM_Format, &idx, NULL, &param, &b) == 1) {
239 			spa_node_port_set_param(port->mix,
240 				port->direction, port_id,
241 				SPA_PARAM_Format, 0, param);
242 		}
243 	}
244 
245 	pw_log_debug("%p: init mix n_mix:%d %d.%d io:%p: (%s)", port,
246 			port->n_mix, port->port_id, mix->port.port_id,
247 			mix->io, spa_strerror(res));
248 
249 	return res;
250 }
251 
252 SPA_EXPORT
pw_impl_port_release_mix(struct pw_impl_port * port,struct pw_impl_port_mix * mix)253 int pw_impl_port_release_mix(struct pw_impl_port *port, struct pw_impl_port_mix *mix)
254 {
255 	int res = 0;
256 	uint32_t port_id = mix->port.port_id;
257 
258 	pw_map_remove(&port->mix_port_map, port_id);
259 	spa_list_remove(&mix->link);
260 	port->n_mix--;
261 
262 	res = pw_impl_port_call_release_mix(port, mix);
263 
264 	spa_node_remove_port(port->mix, port->direction, port_id);
265 
266 	pw_log_debug("%p: release mix %d %d.%d", port,
267 			port->n_mix, port->port_id, mix->port.port_id);
268 
269 	return res;
270 }
271 
update_properties(struct pw_impl_port * port,const struct spa_dict * dict,bool filter)272 static int update_properties(struct pw_impl_port *port, const struct spa_dict *dict, bool filter)
273 {
274 	static const char * const ignored[] = {
275 		PW_KEY_OBJECT_ID,
276 		PW_KEY_PORT_DIRECTION,
277 		PW_KEY_PORT_CONTROL,
278 		PW_KEY_NODE_ID,
279 		PW_KEY_PORT_ID,
280 		NULL
281 	};
282 
283 	int changed;
284 
285 	changed = pw_properties_update_ignore(port->properties, dict, filter ? ignored : NULL);
286 	port->info.props = &port->properties->dict;
287 
288 	if (changed) {
289 		pw_log_debug("%p: updated %d properties", port, changed);
290 		port->info.change_mask |= PW_PORT_CHANGE_MASK_PROPS;
291 	}
292 	return changed;
293 }
294 
resource_is_subscribed(struct pw_resource * resource,uint32_t id)295 static int resource_is_subscribed(struct pw_resource *resource, uint32_t id)
296 {
297 	struct resource_data *data = pw_resource_get_user_data(resource);
298 	uint32_t i;
299 
300 	for (i = 0; i < data->n_subscribe_ids; i++) {
301 		if (data->subscribe_ids[i] == id)
302 			return 1;
303 	}
304 	return 0;
305 }
306 
notify_param(void * data,int seq,uint32_t id,uint32_t index,uint32_t next,struct spa_pod * param)307 static int notify_param(void *data, int seq, uint32_t id,
308 		uint32_t index, uint32_t next, struct spa_pod *param)
309 {
310 	struct pw_impl_port *port = data;
311 	struct pw_resource *resource;
312 
313 	spa_list_for_each(resource, &port->global->resource_list, link) {
314 		if (!resource_is_subscribed(resource, id))
315 			continue;
316 
317 		pw_log_debug("%p: resource %p notify param %d", port, resource, id);
318 		pw_port_resource_param(resource, seq, id, index, next, param);
319 	}
320 	return 0;
321 }
322 
emit_params(struct pw_impl_port * port,uint32_t * changed_ids,uint32_t n_changed_ids)323 static void emit_params(struct pw_impl_port *port, uint32_t *changed_ids, uint32_t n_changed_ids)
324 {
325 	uint32_t i;
326 	int res;
327 
328 	if (port->global == NULL)
329 		return;
330 
331 	pw_log_debug("%p: emit %d params", port, n_changed_ids);
332 
333 	for (i = 0; i < n_changed_ids; i++) {
334 		struct pw_resource *resource;
335 		int subscribed = 0;
336 
337 		pw_log_debug("%p: emit param %d/%d: %d", port, i, n_changed_ids,
338 				changed_ids[i]);
339 
340 		pw_impl_port_emit_param_changed(port, changed_ids[i]);
341 
342 		/* first check if anyone is subscribed */
343 		spa_list_for_each(resource, &port->global->resource_list, link) {
344 			if ((subscribed = resource_is_subscribed(resource, changed_ids[i])))
345 				break;
346 		}
347 		if (!subscribed)
348 			continue;
349 
350 		if ((res = pw_impl_port_for_each_param(port, 1, changed_ids[i], 0, UINT32_MAX,
351 					NULL, notify_param, port)) < 0) {
352 			pw_log_error("%p: error %d (%s)", port, res, spa_strerror(res));
353 		}
354 	}
355 }
356 
process_latency_param(void * data,int seq,uint32_t id,uint32_t index,uint32_t next,struct spa_pod * param)357 static int process_latency_param(void *data, int seq,
358 		uint32_t id, uint32_t index, uint32_t next, struct spa_pod *param)
359 {
360 	struct pw_impl_port *this = data;
361 	struct spa_latency_info latency;
362 
363 	if (id != SPA_PARAM_Latency)
364 		return -EINVAL;
365 
366 	if (spa_latency_parse(param, &latency) < 0)
367 		return 0;
368 	if (spa_latency_info_compare(&this->latency[latency.direction], &latency) == 0)
369 		return 0;
370 
371 	pw_log_debug("port %p: got %s latency %f-%f %d-%d %"PRIu64"-%"PRIu64, this,
372 			pw_direction_as_string(latency.direction),
373 			latency.min_quantum, latency.max_quantum,
374 			latency.min_rate, latency.max_rate,
375 			latency.min_ns, latency.max_ns);
376 
377 	this->latency[latency.direction] = latency;
378 	if (latency.direction == this->direction)
379 		pw_impl_port_emit_latency_changed(this);
380 
381 	return 0;
382 }
383 
update_info(struct pw_impl_port * port,const struct spa_port_info * info)384 static void update_info(struct pw_impl_port *port, const struct spa_port_info *info)
385 {
386 	uint32_t changed_ids[MAX_PARAMS], n_changed_ids = 0;
387 
388 	pw_log_debug("%p: %p flags:%08"PRIx64" change_mask:%08"PRIx64,
389 			port, info, info->flags, info->change_mask);
390 
391 	if (info->change_mask & SPA_PORT_CHANGE_MASK_FLAGS) {
392 		port->spa_flags = info->flags;
393 	}
394 	if (info->change_mask & SPA_PORT_CHANGE_MASK_PROPS) {
395 		if (info->props) {
396 			update_properties(port, info->props, true);
397 		} else {
398 			pw_log_warn("%p: port PROPS update but no properties", port);
399 		}
400 	}
401 	if (info->change_mask & SPA_PORT_CHANGE_MASK_PARAMS) {
402 		uint32_t i;
403 
404 		port->info.change_mask |= PW_PORT_CHANGE_MASK_PARAMS;
405 		port->info.n_params = SPA_MIN(info->n_params, SPA_N_ELEMENTS(port->params));
406 
407 		for (i = 0; i < port->info.n_params; i++) {
408 			uint32_t id = info->params[i].id;
409 
410 			pw_log_debug("%p: param %d id:%d (%s) %08x:%08x", port, i,
411 					id, spa_debug_type_find_name(spa_type_param, id),
412 					port->info.params[i].flags, info->params[i].flags);
413 
414 			port->info.params[i].id = info->params[i].id;
415 			if (port->info.params[i].flags == info->params[i].flags)
416 				continue;
417 
418 			pw_log_debug("%p: update param %d", port, id);
419 			port->info.params[i] = info->params[i];
420 			port->info.params[i].user = 0;
421 
422 			if (info->params[i].flags & SPA_PARAM_INFO_READ)
423 				changed_ids[n_changed_ids++] = id;
424 
425 			switch (id) {
426 			case SPA_PARAM_Latency:
427 				port->have_latency_param =
428 					SPA_FLAG_IS_SET(info->params[i].flags, SPA_PARAM_INFO_WRITE);
429 				if (port->node != NULL)
430 					pw_impl_port_for_each_param(port, 0, id, 0, UINT32_MAX,
431 							NULL, process_latency_param, port);
432 				break;
433 			default:
434 				break;
435 			}
436 		}
437 	}
438 
439 	if (n_changed_ids > 0)
440 		emit_params(port, changed_ids, n_changed_ids);
441 }
442 
443 SPA_EXPORT
pw_context_create_port(struct pw_context * context,enum pw_direction direction,uint32_t port_id,const struct spa_port_info * info,size_t user_data_size)444 struct pw_impl_port *pw_context_create_port(
445 		struct pw_context *context,
446 		enum pw_direction direction,
447 		uint32_t port_id,
448 		const struct spa_port_info *info,
449 		size_t user_data_size)
450 {
451 	struct impl *impl;
452 	struct pw_impl_port *this;
453 	struct pw_properties *properties;
454 	const struct spa_node_methods *mix_methods;
455 	int res;
456 
457 	impl = calloc(1, sizeof(struct impl) + user_data_size);
458 	if (impl == NULL)
459 		return NULL;
460 
461 	spa_list_init(&impl->param_list);
462 	spa_list_init(&impl->pending_list);
463 	impl->cache_params = true;
464 
465 	this = &impl->this;
466 	pw_log_debug("%p: new %s %d", this,
467 			pw_direction_as_string(direction), port_id);
468 
469 	if (info && info->change_mask & SPA_PORT_CHANGE_MASK_PROPS && info->props)
470 		properties = pw_properties_new_dict(info->props);
471 	else
472 		properties = pw_properties_new(NULL, NULL);
473 
474 	if (properties == NULL) {
475 		res = -errno;
476 		goto error_no_mem;
477 	}
478 	pw_properties_setf(properties, PW_KEY_PORT_ID, "%u", port_id);
479 
480 	if (info) {
481 		if (SPA_FLAG_IS_SET(info->flags, SPA_PORT_FLAG_PHYSICAL))
482 			pw_properties_set(properties, PW_KEY_PORT_PHYSICAL, "true");
483 		if (SPA_FLAG_IS_SET(info->flags, SPA_PORT_FLAG_TERMINAL))
484 			pw_properties_set(properties, PW_KEY_PORT_TERMINAL, "true");
485 		this->spa_flags = info->flags;
486 	}
487 
488 	this->direction = direction;
489 	this->port_id = port_id;
490 	this->properties = properties;
491 	this->state = PW_IMPL_PORT_STATE_INIT;
492 	this->rt.io = SPA_IO_BUFFERS_INIT;
493 
494         if (user_data_size > 0)
495 		this->user_data = SPA_PTROFF(impl, sizeof(struct impl), void);
496 
497 	this->info.direction = direction;
498 	this->info.params = this->params;
499 	this->info.change_mask = PW_PORT_CHANGE_MASK_PROPS;
500 	this->info.props = &this->properties->dict;
501 
502 	spa_list_init(&this->links);
503 	spa_list_init(&this->mix_list);
504 	spa_list_init(&this->rt.mix_list);
505 	spa_list_init(&this->control_list[0]);
506 	spa_list_init(&this->control_list[1]);
507 
508 	spa_hook_list_init(&this->listener_list);
509 
510 	if (this->direction == PW_DIRECTION_INPUT)
511 		mix_methods = &schedule_mix_node;
512 	else
513 		mix_methods = &schedule_tee_node;
514 
515 	impl->mix_node.iface = SPA_INTERFACE_INIT(
516 			SPA_TYPE_INTERFACE_Node,
517 			SPA_VERSION_NODE,
518 			mix_methods, impl);
519 
520 	pw_impl_port_set_mix(this, NULL, 0);
521 
522 	pw_map_init(&this->mix_port_map, 64, 64);
523 
524 	this->latency[SPA_DIRECTION_INPUT] = SPA_LATENCY_INFO(SPA_DIRECTION_INPUT);
525 	this->latency[SPA_DIRECTION_OUTPUT] = SPA_LATENCY_INFO(SPA_DIRECTION_OUTPUT);
526 
527 	if (info)
528 		update_info(this, info);
529 
530 	return this;
531 
532 error_no_mem:
533 	pw_log_warn("%p: new failed", impl);
534 	free(impl);
535 	errno = -res;
536 	return NULL;
537 }
538 
539 SPA_EXPORT
pw_impl_port_set_mix(struct pw_impl_port * port,struct spa_node * node,uint32_t flags)540 int pw_impl_port_set_mix(struct pw_impl_port *port, struct spa_node *node, uint32_t flags)
541 {
542 	struct impl *impl = SPA_CONTAINER_OF(port, struct impl, this);
543 	struct pw_impl_port_mix *mix;
544 
545 	if (node == NULL) {
546 		node = &impl->mix_node;
547 		flags = 0;
548 	}
549 
550 	pw_log_debug("%p: mix node %p->%p", port, port->mix, node);
551 
552 	if (port->mix != NULL && port->mix != node) {
553 		spa_list_for_each(mix, &port->mix_list, link)
554 			spa_node_remove_port(port->mix, mix->port.direction, mix->port.port_id);
555 
556 		spa_node_port_set_io(port->mix,
557 			     pw_direction_reverse(port->direction), 0,
558 			     SPA_IO_Buffers, NULL, 0);
559 	}
560 	if (port->mix_handle != NULL) {
561 		pw_unload_spa_handle(port->mix_handle);
562 		port->mix_handle = NULL;
563 	}
564 
565 	port->mix_flags = flags;
566 	port->mix = node;
567 
568 	if (port->mix) {
569 		spa_list_for_each(mix, &port->mix_list, link)
570 			spa_node_add_port(port->mix, mix->port.direction, mix->port.port_id, NULL);
571 
572 		spa_node_port_set_io(port->mix,
573 			     pw_direction_reverse(port->direction), 0,
574 			     SPA_IO_Buffers,
575 			     &port->rt.io, sizeof(port->rt.io));
576 	}
577 	return 0;
578 }
579 
setup_mixer(struct pw_impl_port * port,const struct spa_pod * param)580 static int setup_mixer(struct pw_impl_port *port, const struct spa_pod *param)
581 {
582 	uint32_t media_type, media_subtype;
583 	int res;
584 	const char *fallback_lib, *factory_name;
585 	struct spa_handle *handle;
586 	struct spa_dict_item items[1];
587 	void *iface;
588 
589 	if ((res = spa_format_parse(param, &media_type, &media_subtype)) < 0)
590 		return res;
591 
592 	pw_log_debug("%p: %s/%s", port,
593 			spa_debug_type_find_name(spa_type_media_type, media_type),
594 			spa_debug_type_find_name(spa_type_media_subtype, media_subtype));
595 
596 	switch (media_type) {
597 	case SPA_MEDIA_TYPE_audio:
598 		switch (media_subtype) {
599 		case SPA_MEDIA_SUBTYPE_dsp:
600 		{
601 			struct spa_audio_info_dsp info;
602 			if ((res = spa_format_audio_dsp_parse(param, &info)) < 0)
603 				return res;
604 
605 			if (info.format != SPA_AUDIO_FORMAT_DSP_F32)
606 				return -ENOTSUP;
607 
608 			fallback_lib = "audiomixer/libspa-audiomixer";
609 			factory_name = SPA_NAME_AUDIO_MIXER_DSP;
610 			break;
611 		}
612 		case SPA_MEDIA_SUBTYPE_raw:
613 			fallback_lib = "audiomixer/libspa-audiomixer";
614 			factory_name = SPA_NAME_AUDIO_MIXER;
615 			break;
616 		default:
617 			return -ENOTSUP;
618 		}
619 		break;
620 
621 	case SPA_MEDIA_TYPE_application:
622 		switch (media_subtype) {
623 		case SPA_MEDIA_SUBTYPE_control:
624 			fallback_lib = "control/libspa-control";
625 			factory_name = SPA_NAME_CONTROL_MIXER;
626 			break;
627 		default:
628 			return -ENOTSUP;
629 		}
630 		break;
631 
632 	default:
633 		return -ENOTSUP;
634 	}
635 
636 	items[0] = SPA_DICT_ITEM_INIT(SPA_KEY_LIBRARY_NAME, fallback_lib);
637 	handle = pw_context_load_spa_handle(port->node->context, factory_name,
638 			&SPA_DICT_INIT_ARRAY(items));
639 	if (handle == NULL)
640 		return -errno;
641 
642 	if ((res = spa_handle_get_interface(handle,
643 					SPA_TYPE_INTERFACE_Node, &iface)) < 0) {
644 		pw_unload_spa_handle(handle);
645 		return res;
646 	}
647 
648 	pw_log_debug("mix node handle:%p iface:%p", handle, iface);
649 	pw_impl_port_set_mix(port, (struct spa_node*)iface,
650 			PW_IMPL_PORT_MIX_FLAG_MULTI |
651 			PW_IMPL_PORT_MIX_FLAG_NEGOTIATE);
652 	port->mix_handle = handle;
653 
654 	return 0;
655 }
656 
657 SPA_EXPORT
pw_impl_port_get_direction(struct pw_impl_port * port)658 enum pw_direction pw_impl_port_get_direction(struct pw_impl_port *port)
659 {
660 	return port->direction;
661 }
662 
663 SPA_EXPORT
pw_impl_port_get_id(struct pw_impl_port * port)664 uint32_t pw_impl_port_get_id(struct pw_impl_port *port)
665 {
666 	return port->port_id;
667 }
668 
669 SPA_EXPORT
pw_impl_port_get_properties(struct pw_impl_port * port)670 const struct pw_properties *pw_impl_port_get_properties(struct pw_impl_port *port)
671 {
672 	return port->properties;
673 }
674 
675 SPA_EXPORT
pw_impl_port_update_properties(struct pw_impl_port * port,const struct spa_dict * dict)676 int pw_impl_port_update_properties(struct pw_impl_port *port, const struct spa_dict *dict)
677 {
678 	int changed = update_properties(port, dict, false);
679 	emit_info_changed(port);
680 	return changed;
681 }
682 
pw_impl_port_update_info(struct pw_impl_port * port,const struct spa_port_info * info)683 void pw_impl_port_update_info(struct pw_impl_port *port, const struct spa_port_info *info)
684 {
685 	update_info(port, info);
686 	emit_info_changed(port);
687 }
688 
689 SPA_EXPORT
pw_impl_port_get_node(struct pw_impl_port * port)690 struct pw_impl_node *pw_impl_port_get_node(struct pw_impl_port *port)
691 {
692 	return port->node;
693 }
694 
695 SPA_EXPORT
pw_impl_port_add_listener(struct pw_impl_port * port,struct spa_hook * listener,const struct pw_impl_port_events * events,void * data)696 void pw_impl_port_add_listener(struct pw_impl_port *port,
697 			  struct spa_hook *listener,
698 			  const struct pw_impl_port_events *events,
699 			  void *data)
700 {
701 	spa_hook_list_append(&port->listener_list, listener, events, data);
702 }
703 
704 SPA_EXPORT
pw_impl_port_get_info(struct pw_impl_port * port)705 const struct pw_port_info *pw_impl_port_get_info(struct pw_impl_port *port)
706 {
707 	return &port->info;
708 }
709 
710 SPA_EXPORT
pw_impl_port_get_user_data(struct pw_impl_port * port)711 void * pw_impl_port_get_user_data(struct pw_impl_port *port)
712 {
713 	return port->user_data;
714 }
715 
do_add_port(struct spa_loop * loop,bool async,uint32_t seq,const void * data,size_t size,void * user_data)716 static int do_add_port(struct spa_loop *loop,
717 		       bool async, uint32_t seq, const void *data, size_t size, void *user_data)
718 {
719         struct pw_impl_port *this = user_data;
720 
721 	pw_log_trace("%p: add port", this);
722 	if (this->direction == PW_DIRECTION_INPUT)
723 		spa_list_append(&this->node->rt.input_mix, &this->rt.node_link);
724 	else
725 		spa_list_append(&this->node->rt.output_mix, &this->rt.node_link);
726 
727 	return 0;
728 }
729 
check_param_io(void * data,int seq,uint32_t id,uint32_t index,uint32_t next,struct spa_pod * param)730 static int check_param_io(void *data, int seq, uint32_t id,
731 		uint32_t index, uint32_t next, struct spa_pod *param)
732 {
733 	struct pw_impl_port *port = data;
734 	struct pw_impl_node *node = port->node;
735 	uint32_t pid, psize;
736 
737 	if (spa_pod_parse_object(param,
738 			SPA_TYPE_OBJECT_ParamIO, NULL,
739 			SPA_PARAM_IO_id,   SPA_POD_Id(&pid),
740 			SPA_PARAM_IO_size, SPA_POD_Int(&psize)) < 0)
741 		return 0;
742 
743 	pw_log_debug("%p: got io id:%d (%s)", port, pid,
744 			spa_debug_type_find_name(spa_type_io, pid));
745 
746 	switch (pid) {
747 	case SPA_IO_Control:
748 	case SPA_IO_Notify:
749 		pw_control_new(node->context, port, pid, psize, 0);
750 		SPA_FLAG_SET(port->flags, PW_IMPL_PORT_FLAG_CONTROL);
751 		break;
752 	case SPA_IO_Buffers:
753 		SPA_FLAG_SET(port->flags, PW_IMPL_PORT_FLAG_BUFFERS);
754 		break;
755 	default:
756 		break;
757 	}
758 	return 0;
759 }
760 
reply_param(void * data,int seq,uint32_t id,uint32_t index,uint32_t next,struct spa_pod * param)761 static int reply_param(void *data, int seq, uint32_t id,
762 		uint32_t index, uint32_t next, struct spa_pod *param)
763 {
764 	struct resource_data *d = data;
765 	struct pw_resource *resource = d->resource;
766 	pw_log_debug("%p: resource %p reply param %u %u %u", d->port,
767 			resource, id, index, next);
768 	pw_port_resource_param(resource, seq, id, index, next, param);
769 	return 0;
770 }
771 
port_enum_params(void * object,int seq,uint32_t id,uint32_t index,uint32_t num,const struct spa_pod * filter)772 static int port_enum_params(void *object, int seq, uint32_t id, uint32_t index, uint32_t num,
773 		const struct spa_pod *filter)
774 {
775 	struct resource_data *data = object;
776 	struct pw_resource *resource = data->resource;
777 	struct pw_impl_port *port = data->port;
778 	int res;
779 
780 	pw_log_debug("%p: resource %p enum params seq:%d id:%d (%s) index:%u num:%u", port,
781 			resource, seq, id, spa_debug_type_find_name(spa_type_param, id),
782 			index, num);
783 
784 	if ((res = pw_impl_port_for_each_param(port, seq, id, index, num, filter,
785 			reply_param, data)) < 0)
786 		pw_resource_errorf(resource, res,
787 				"enum params id:%d (%s) failed", id,
788 				spa_debug_type_find_name(spa_type_param, id));
789 	return res;
790 }
791 
port_subscribe_params(void * object,uint32_t * ids,uint32_t n_ids)792 static int port_subscribe_params(void *object, uint32_t *ids, uint32_t n_ids)
793 {
794 	struct resource_data *data = object;
795 	struct pw_resource *resource = data->resource;
796 	uint32_t i;
797 
798 	n_ids = SPA_MIN(n_ids, SPA_N_ELEMENTS(data->subscribe_ids));
799 	data->n_subscribe_ids = n_ids;
800 
801 	for (i = 0; i < n_ids; i++) {
802 		data->subscribe_ids[i] = ids[i];
803 		pw_log_debug("%p: resource %p subscribe param id:%d (%s)", data->port,
804 				resource, ids[i],
805 				spa_debug_type_find_name(spa_type_param, ids[i]));
806 		port_enum_params(data, 1, ids[i], 0, UINT32_MAX, NULL);
807 	}
808 	return 0;
809 }
810 
811 static const struct pw_port_methods port_methods = {
812 	PW_VERSION_PORT_METHODS,
813 	.subscribe_params = port_subscribe_params,
814 	.enum_params = port_enum_params
815 };
816 
resource_destroy(void * data)817 static void resource_destroy(void *data)
818 {
819 	struct resource_data *d = data;
820 	spa_hook_remove(&d->resource_listener);
821 	spa_hook_remove(&d->object_listener);
822 }
823 
824 static const struct pw_resource_events resource_events = {
825 	PW_VERSION_RESOURCE_EVENTS,
826 	.destroy = resource_destroy,
827 };
828 
829 static int
global_bind(void * _data,struct pw_impl_client * client,uint32_t permissions,uint32_t version,uint32_t id)830 global_bind(void *_data, struct pw_impl_client *client, uint32_t permissions,
831 	       uint32_t version, uint32_t id)
832 {
833 	struct pw_impl_port *this = _data;
834 	struct pw_global *global = this->global;
835 	struct pw_resource *resource;
836 	struct resource_data *data;
837 	int res;
838 
839 	resource = pw_resource_new(client, id, permissions, global->type, version, sizeof(*data));
840 	if (resource == NULL) {
841 		res = -errno;
842 		goto error_resource;
843 	}
844 
845 	data = pw_resource_get_user_data(resource);
846 	data->port = this;
847 	data->resource = resource;
848 
849 	pw_resource_add_listener(resource,
850 			&data->resource_listener,
851 			&resource_events, data);
852 	pw_resource_add_object_listener(resource,
853 			&data->object_listener,
854 			&port_methods, data);
855 
856 	pw_log_debug("%p: bound to %d", this, resource->id);
857 	pw_global_add_resource(global, resource);
858 
859 	this->info.change_mask = PW_PORT_CHANGE_MASK_ALL;
860 	pw_port_resource_info(resource, &this->info);
861 	this->info.change_mask = 0;
862 	return 0;
863 
864 error_resource:
865 	pw_log_error("%p: can't create port resource: %m", this);
866 	return res;
867 }
868 
global_destroy(void * object)869 static void global_destroy(void *object)
870 {
871 	struct pw_impl_port *port = object;
872 	spa_hook_remove(&port->global_listener);
873 	port->global = NULL;
874 	pw_impl_port_destroy(port);
875 }
876 
877 static const struct pw_global_events global_events = {
878 	PW_VERSION_GLOBAL_EVENTS,
879 	.destroy = global_destroy,
880 };
881 
pw_impl_port_register(struct pw_impl_port * port,struct pw_properties * properties)882 int pw_impl_port_register(struct pw_impl_port *port,
883 		     struct pw_properties *properties)
884 {
885 	static const char * const keys[] = {
886 		PW_KEY_OBJECT_SERIAL,
887 		PW_KEY_OBJECT_PATH,
888 		PW_KEY_FORMAT_DSP,
889 		PW_KEY_NODE_ID,
890 		PW_KEY_AUDIO_CHANNEL,
891 		PW_KEY_PORT_ID,
892 		PW_KEY_PORT_NAME,
893 		PW_KEY_PORT_DIRECTION,
894 		PW_KEY_PORT_MONITOR,
895 		PW_KEY_PORT_PHYSICAL,
896 		PW_KEY_PORT_TERMINAL,
897 		PW_KEY_PORT_CONTROL,
898 		PW_KEY_PORT_ALIAS,
899 		PW_KEY_PORT_EXTRA,
900 		NULL
901 	};
902 
903 	struct pw_impl_node *node = port->node;
904 
905 	if (node == NULL || node->global == NULL)
906 		return -EIO;
907 
908 	port->global = pw_global_new(node->context,
909 				PW_TYPE_INTERFACE_Port,
910 				PW_VERSION_PORT,
911 				properties,
912 				global_bind,
913 				port);
914 	if (port->global == NULL)
915 		return -errno;
916 
917 	pw_global_add_listener(port->global, &port->global_listener, &global_events, port);
918 
919 	port->info.id = port->global->id;
920 	pw_properties_setf(port->properties, PW_KEY_NODE_ID, "%d", node->global->id);
921 	pw_properties_setf(port->properties, PW_KEY_OBJECT_ID, "%d", port->info.id);
922 	pw_properties_setf(port->properties, PW_KEY_OBJECT_SERIAL, "%"PRIu64,
923 			pw_global_get_serial(port->global));
924 	port->info.props = &port->properties->dict;
925 
926 	pw_global_update_keys(port->global, &port->properties->dict, keys);
927 
928 	pw_impl_port_emit_initialized(port);
929 
930 	return pw_global_register(port->global);
931 }
932 
933 SPA_EXPORT
pw_impl_port_add(struct pw_impl_port * port,struct pw_impl_node * node)934 int pw_impl_port_add(struct pw_impl_port *port, struct pw_impl_node *node)
935 {
936 	uint32_t port_id = port->port_id;
937 	struct spa_list *ports;
938 	struct pw_map *portmap;
939 	struct pw_impl_port *find;
940 	bool control;
941 	const char *str, *dir;
942 	int res;
943 
944 	if (port->node != NULL)
945 		return -EEXIST;
946 
947 	if (port->direction == PW_DIRECTION_INPUT) {
948 		ports = &node->input_ports;
949 		portmap = &node->input_port_map;
950 	} else {
951 		ports = &node->output_ports;
952 		portmap = &node->output_port_map;
953 	}
954 
955 	find = pw_map_lookup(portmap, port_id);
956 	if (find != NULL)
957 		return -EEXIST;
958 
959 	if ((res = pw_map_insert_at(portmap, port_id, port)) < 0)
960 		return res;
961 
962 	port->node = node;
963 
964 	pw_impl_node_emit_port_init(node, port);
965 
966 	pw_impl_port_for_each_param(port, 0, SPA_PARAM_IO, 0, 0, NULL, check_param_io, port);
967 	pw_impl_port_for_each_param(port, 0, SPA_PARAM_Latency, 0, 0, NULL, process_latency_param, port);
968 
969 	control = PW_IMPL_PORT_IS_CONTROL(port);
970 	if (control) {
971 		dir = port->direction == PW_DIRECTION_INPUT ?  "control" : "notify";
972 		pw_properties_set(port->properties, PW_KEY_PORT_CONTROL, "true");
973 	}
974 	else {
975 		dir = port->direction == PW_DIRECTION_INPUT ?  "in" : "out";
976 	}
977 	pw_properties_set(port->properties, PW_KEY_PORT_DIRECTION, dir);
978 
979 	if (pw_properties_get(port->properties, PW_KEY_PORT_NAME) == NULL) {
980 		if ((str = pw_properties_get(port->properties, PW_KEY_AUDIO_CHANNEL)) != NULL &&
981 		    !spa_streq(str, "UNK")) {
982 			pw_properties_setf(port->properties, PW_KEY_PORT_NAME, "%s_%s", dir, str);
983 		}
984 		else {
985 			pw_properties_setf(port->properties, PW_KEY_PORT_NAME, "%s_%d", dir, port->port_id);
986 		}
987 	}
988 	port->info.props = &port->properties->dict;
989 
990 	if (control) {
991 		pw_log_debug("%p: setting node control", port);
992 	} else {
993 		pw_log_debug("%p: setting node io", port);
994 		spa_node_port_set_io(node->node,
995 				     port->direction, port->port_id,
996 				     SPA_IO_Buffers,
997 				     &port->rt.io, sizeof(port->rt.io));
998 
999 		spa_node_port_set_io(port->mix,
1000 			     pw_direction_reverse(port->direction), 0,
1001 			     SPA_IO_Buffers,
1002 			     &port->rt.io, sizeof(port->rt.io));
1003 	}
1004 
1005 	pw_log_debug("%p: %d add to node %p", port, port_id, node);
1006 
1007 	spa_list_append(ports, &port->link);
1008 
1009 	if (port->direction == PW_DIRECTION_INPUT) {
1010 		node->info.n_input_ports++;
1011 		node->info.change_mask |= PW_NODE_CHANGE_MASK_INPUT_PORTS;
1012 	} else {
1013 		node->info.n_output_ports++;
1014 		node->info.change_mask |= PW_NODE_CHANGE_MASK_OUTPUT_PORTS;
1015 	}
1016 
1017 	if (node->global)
1018 		pw_impl_port_register(port, NULL);
1019 
1020 	if (port->state <= PW_IMPL_PORT_STATE_INIT)
1021 		pw_impl_port_update_state(port, PW_IMPL_PORT_STATE_CONFIGURE, 0, NULL);
1022 
1023 	pw_impl_node_emit_port_added(node, port);
1024 	emit_info_changed(port);
1025 
1026 	return 0;
1027 }
1028 
do_destroy_link(void * data,struct pw_impl_link * link)1029 static int do_destroy_link(void *data, struct pw_impl_link *link)
1030 {
1031 	pw_impl_link_destroy(link);
1032 	return 0;
1033 }
1034 
pw_impl_port_unlink(struct pw_impl_port * port)1035 void pw_impl_port_unlink(struct pw_impl_port *port)
1036 {
1037 	pw_impl_port_for_each_link(port, do_destroy_link, port);
1038 }
1039 
do_remove_port(struct spa_loop * loop,bool async,uint32_t seq,const void * data,size_t size,void * user_data)1040 static int do_remove_port(struct spa_loop *loop,
1041 			  bool async, uint32_t seq, const void *data, size_t size, void *user_data)
1042 {
1043 	struct pw_impl_port *this = user_data;
1044 
1045 	pw_log_trace("%p: remove port", this);
1046 	spa_list_remove(&this->rt.node_link);
1047 
1048 	return 0;
1049 }
1050 
pw_impl_port_remove(struct pw_impl_port * port)1051 static void pw_impl_port_remove(struct pw_impl_port *port)
1052 {
1053 	struct pw_impl_node *node = port->node;
1054 	int res;
1055 
1056 	if (node == NULL)
1057 		return;
1058 
1059 	pw_log_debug("%p: remove added:%d", port, port->added);
1060 
1061 	if (port->added) {
1062 		pw_loop_invoke(node->data_loop, do_remove_port,
1063 			       SPA_ID_INVALID, NULL, 0, true, port);
1064 		port->added = false;
1065 	}
1066 
1067 	if (SPA_FLAG_IS_SET(port->flags, PW_IMPL_PORT_FLAG_TO_REMOVE)) {
1068 		if ((res = spa_node_remove_port(node->node, port->direction, port->port_id)) < 0)
1069 			pw_log_warn("%p: can't remove: %s", port, spa_strerror(res));
1070 	}
1071 
1072 	if (port->direction == PW_DIRECTION_INPUT) {
1073 		pw_map_insert_at(&node->input_port_map, port->port_id, NULL);
1074 		node->info.n_input_ports--;
1075 	} else {
1076 		pw_map_insert_at(&node->output_port_map, port->port_id, NULL);
1077 		node->info.n_output_ports--;
1078 	}
1079 
1080 	pw_impl_port_set_mix(port, NULL, 0);
1081 
1082 	spa_list_remove(&port->link);
1083 	pw_impl_node_emit_port_removed(node, port);
1084 	port->node = NULL;
1085 }
1086 
pw_impl_port_destroy(struct pw_impl_port * port)1087 void pw_impl_port_destroy(struct pw_impl_port *port)
1088 {
1089 	struct impl *impl = SPA_CONTAINER_OF(port, struct impl, this);
1090 	struct pw_control *control;
1091 
1092 	pw_log_debug("%p: destroy", port);
1093 
1094 	port->destroying = true;
1095 	pw_impl_port_emit_destroy(port);
1096 
1097 	pw_impl_port_unlink(port);
1098 
1099 	pw_log_debug("%p: control destroy", port);
1100 	spa_list_consume(control, &port->control_list[0], port_link)
1101 		pw_control_destroy(control);
1102 	spa_list_consume(control, &port->control_list[1], port_link)
1103 		pw_control_destroy(control);
1104 
1105 	pw_impl_port_remove(port);
1106 
1107 	if (port->global) {
1108 		spa_hook_remove(&port->global_listener);
1109 		pw_global_destroy(port->global);
1110 	}
1111 
1112 	pw_log_debug("%p: free", port);
1113 	pw_impl_port_emit_free(port);
1114 
1115 	spa_hook_list_clean(&port->listener_list);
1116 
1117 	pw_buffers_clear(&port->buffers);
1118 	pw_buffers_clear(&port->mix_buffers);
1119 	free((void*)port->error);
1120 
1121 	pw_param_clear(&impl->param_list, SPA_ID_INVALID);
1122 	pw_param_clear(&impl->pending_list, SPA_ID_INVALID);
1123 
1124 	pw_map_clear(&port->mix_port_map);
1125 
1126 	pw_properties_free(port->properties);
1127 
1128 	free(port);
1129 }
1130 
1131 struct result_port_params_data {
1132 	struct impl *impl;
1133 	void *data;
1134 	int (*callback) (void *data, int seq,
1135 			uint32_t id, uint32_t index, uint32_t next,
1136 			struct spa_pod *param);
1137 	int seq;
1138 	uint32_t count;
1139 	unsigned int cache:1;
1140 };
1141 
result_port_params(void * data,int seq,int res,uint32_t type,const void * result)1142 static void result_port_params(void *data, int seq, int res, uint32_t type, const void *result)
1143 {
1144 	struct result_port_params_data *d = data;
1145 	struct impl *impl = d->impl;
1146 	switch (type) {
1147 	case SPA_RESULT_TYPE_NODE_PARAMS:
1148 	{
1149 		const struct spa_result_node_params *r = result;
1150 		if (d->seq == seq) {
1151 			d->callback(d->data, seq, r->id, r->index, r->next, r->param);
1152 			if (d->cache) {
1153 				if (d->count++ == 0)
1154 					pw_param_add(&impl->pending_list, r->id, NULL);
1155 				pw_param_add(&impl->pending_list, r->id, r->param);
1156 			}
1157 		}
1158 		break;
1159 	}
1160 	default:
1161 		break;
1162 	}
1163 }
1164 
pw_impl_port_for_each_param(struct pw_impl_port * port,int seq,uint32_t param_id,uint32_t index,uint32_t max,const struct spa_pod * filter,int (* callback)(void * data,int seq,uint32_t id,uint32_t index,uint32_t next,struct spa_pod * param),void * data)1165 int pw_impl_port_for_each_param(struct pw_impl_port *port,
1166 			   int seq,
1167 			   uint32_t param_id,
1168 			   uint32_t index, uint32_t max,
1169 			   const struct spa_pod *filter,
1170 			   int (*callback) (void *data, int seq,
1171 					    uint32_t id, uint32_t index, uint32_t next,
1172 					    struct spa_pod *param),
1173 			   void *data)
1174 {
1175 	int res;
1176 	struct impl *impl = SPA_CONTAINER_OF(port, struct impl, this);
1177 	struct pw_impl_node *node = port->node;
1178 	struct result_port_params_data user_data = { impl, data, callback, seq, 0, false };
1179 	struct spa_hook listener;
1180 	struct spa_param_info *pi;
1181 	static const struct spa_node_events node_events = {
1182 		SPA_VERSION_NODE_EVENTS,
1183 		.result = result_port_params,
1184 	};
1185 
1186 	pi = pw_param_info_find(port->info.params, port->info.n_params, param_id);
1187 	if (pi == NULL)
1188 		return -ENOENT;
1189 
1190 	if (max == 0)
1191 		max = UINT32_MAX;
1192 
1193 	pw_log_debug("%p: params id:%d (%s) index:%u max:%u cached:%d", port, param_id,
1194 			spa_debug_type_find_name(spa_type_param, param_id),
1195 			index, max, pi->user);
1196 
1197 	if (pi->user == 1) {
1198 		struct pw_param *p;
1199 		uint8_t buffer[1024];
1200 		struct spa_pod_builder b = { 0 };
1201 	        struct spa_result_node_params result;
1202 		uint32_t count = 0;
1203 
1204 		result.id = param_id;
1205 		result.next = 0;
1206 
1207 		spa_list_for_each(p, &impl->param_list, link) {
1208 			result.index = result.next++;
1209 			if (p->id != param_id)
1210 				continue;
1211 
1212 			if (result.index < index)
1213 				continue;
1214 
1215 			spa_pod_builder_init(&b, buffer, sizeof(buffer));
1216 			if (spa_pod_filter(&b, &result.param, p->param, filter) != 0)
1217 				continue;
1218 
1219 			pw_log_debug("%p: %d param %u", port, seq, result.index);
1220 			result_port_params(&user_data, seq, 0, SPA_RESULT_TYPE_NODE_PARAMS, &result);
1221 
1222 			if (++count == max)
1223 				break;
1224 		}
1225 		res = 0;
1226 	} else {
1227 		user_data.cache = impl->cache_params &&
1228 			(filter == NULL && index == 0 && max == UINT32_MAX);
1229 
1230 		spa_zero(listener);
1231 		spa_node_add_listener(node->node, &listener, &node_events, &user_data);
1232 		res = spa_node_port_enum_params(node->node, seq,
1233 						port->direction, port->port_id,
1234 						param_id, index, max,
1235 						filter);
1236 		spa_hook_remove(&listener);
1237 
1238 		if (user_data.cache) {
1239 			pw_param_update(&impl->param_list, &impl->pending_list);
1240 			pi->user = 1;
1241 		}
1242 	}
1243 
1244 	pw_log_debug("%p: res %d: (%s)", port, res, spa_strerror(res));
1245 	return res;
1246 }
1247 
1248 struct param_filter {
1249 	struct pw_impl_port *in_port;
1250 	struct pw_impl_port *out_port;
1251 	int seq;
1252 	uint32_t in_param_id;
1253 	uint32_t out_param_id;
1254 	int (*callback) (void *data, int seq, uint32_t id, uint32_t index,
1255 			uint32_t next, struct spa_pod *param);
1256 	void *data;
1257 	uint32_t n_params;
1258 };
1259 
do_filter(void * data,int seq,uint32_t id,uint32_t index,uint32_t next,struct spa_pod * param)1260 static int do_filter(void *data, int seq, uint32_t id, uint32_t index, uint32_t next, struct spa_pod *param)
1261 {
1262 	struct param_filter *f = data;
1263 	f->n_params++;
1264 	return pw_impl_port_for_each_param(f->out_port, seq, f->out_param_id, 0, 0, param, f->callback, f->data);
1265 }
1266 
pw_impl_port_for_each_filtered_param(struct pw_impl_port * in_port,struct pw_impl_port * out_port,int seq,uint32_t in_param_id,uint32_t out_param_id,const struct spa_pod * filter,int (* callback)(void * data,int seq,uint32_t id,uint32_t index,uint32_t next,struct spa_pod * param),void * data)1267 int pw_impl_port_for_each_filtered_param(struct pw_impl_port *in_port,
1268 				    struct pw_impl_port *out_port,
1269 				    int seq,
1270 				    uint32_t in_param_id,
1271 				    uint32_t out_param_id,
1272 				    const struct spa_pod *filter,
1273 				    int (*callback) (void *data, int seq,
1274 						     uint32_t id, uint32_t index, uint32_t next,
1275 						     struct spa_pod *param),
1276 				    void *data)
1277 {
1278 	int res;
1279 	struct param_filter fd = { in_port, out_port, seq, in_param_id, out_param_id, callback, data, 0 };
1280 
1281 	if ((res = pw_impl_port_for_each_param(in_port, seq, in_param_id, 0, 0, filter, do_filter, &fd)) < 0)
1282 		return res;
1283 
1284 	if (fd.n_params == 0)
1285 		res = do_filter(&filter, seq, 0, 0, 0, NULL);
1286 
1287 	return res;
1288 }
1289 
pw_impl_port_for_each_link(struct pw_impl_port * port,int (* callback)(void * data,struct pw_impl_link * link),void * data)1290 int pw_impl_port_for_each_link(struct pw_impl_port *port,
1291 			  int (*callback) (void *data, struct pw_impl_link *link),
1292 			  void *data)
1293 {
1294 	struct pw_impl_link *l, *t;
1295 	int res = 0;
1296 
1297 	if (port->direction == PW_DIRECTION_OUTPUT) {
1298 		spa_list_for_each_safe(l, t, &port->links, output_link)
1299 			if ((res = callback(data, l)) != 0)
1300 				break;
1301 	} else {
1302 		spa_list_for_each_safe(l, t, &port->links, input_link)
1303 			if ((res = callback(data, l)) != 0)
1304 				break;
1305 	}
1306 	return res;
1307 }
1308 
pw_impl_port_recalc_latency(struct pw_impl_port * port)1309 int pw_impl_port_recalc_latency(struct pw_impl_port *port)
1310 {
1311 	struct pw_impl_link *l;
1312 	struct spa_latency_info latency, *current;
1313 	struct pw_impl_port *other;
1314 	struct spa_pod *param;
1315 	struct spa_pod_builder b = { 0 };
1316 	uint8_t buffer[1024];
1317 	bool changed;
1318 
1319 	if (port->destroying)
1320 		return 0;
1321 
1322 	/* given an output port, we calculate the total latency to the sinks or the input
1323 	 * latency. */
1324 	spa_latency_info_combine_start(&latency, SPA_DIRECTION_REVERSE(port->direction));
1325 
1326 	if (port->direction == PW_DIRECTION_OUTPUT) {
1327 		spa_list_for_each(l, &port->links, output_link) {
1328 			other = l->input;
1329 			spa_latency_info_combine(&latency, &other->latency[other->direction]);
1330 			pw_log_debug("port %d: peer %d: latency %f-%f %d-%d %"PRIu64"-%"PRIu64,
1331 					port->info.id, other->info.id,
1332 					latency.min_quantum, latency.max_quantum,
1333 					latency.min_rate, latency.max_rate,
1334 					latency.min_ns, latency.max_ns);
1335 		}
1336 	} else {
1337 		spa_list_for_each(l, &port->links, input_link) {
1338 			other = l->output;
1339 			spa_latency_info_combine(&latency, &other->latency[other->direction]);
1340 			pw_log_debug("port %d: peer %d: latency %f-%f %d-%d %"PRIu64"-%"PRIu64,
1341 					port->info.id, other->info.id,
1342 					latency.min_quantum, latency.max_quantum,
1343 					latency.min_rate, latency.max_rate,
1344 					latency.min_ns, latency.max_ns);
1345 		}
1346 	}
1347 	spa_latency_info_combine_finish(&latency);
1348 
1349 	current = &port->latency[latency.direction];
1350 
1351 	changed = spa_latency_info_compare(current, &latency) != 0;
1352 
1353 	pw_log_info("port %d: %s %s latency %f-%f %d-%d %"PRIu64"-%"PRIu64,
1354 			port->info.id, changed ? "set" : "keep",
1355 			pw_direction_as_string(latency.direction),
1356 			latency.min_quantum, latency.max_quantum,
1357 			latency.min_rate, latency.max_rate,
1358 			latency.min_ns, latency.max_ns);
1359 
1360 	if (!changed)
1361 		return 0;
1362 
1363 	*current = latency;
1364 
1365 	if (!port->have_latency_param)
1366 		return 0;
1367 
1368 	spa_pod_builder_init(&b, buffer, sizeof(buffer));
1369 	param = spa_latency_build(&b, SPA_PARAM_Latency, &latency);
1370 	return pw_impl_port_set_param(port, SPA_PARAM_Latency, 0, param);
1371 }
1372 
1373 SPA_EXPORT
pw_impl_port_is_linked(struct pw_impl_port * port)1374 int pw_impl_port_is_linked(struct pw_impl_port *port)
1375 {
1376 	return spa_list_is_empty(&port->links) ? 0 : 1;
1377 }
1378 
1379 SPA_EXPORT
pw_impl_port_set_param(struct pw_impl_port * port,uint32_t id,uint32_t flags,const struct spa_pod * param)1380 int pw_impl_port_set_param(struct pw_impl_port *port, uint32_t id, uint32_t flags,
1381 		      const struct spa_pod *param)
1382 {
1383 	int res;
1384 	struct pw_impl_node *node = port->node;
1385 
1386 	pw_log_debug("%p: %d set param %d %p", port, port->state, id, param);
1387 
1388 	/* set parameter on node */
1389 	res = spa_node_port_set_param(node->node,
1390 			port->direction, port->port_id,
1391 			id, flags, param);
1392 
1393 	pw_log_debug("%p: %d set param on node %d:%d id:%d (%s): %d (%s)", port, port->state,
1394 			port->direction, port->port_id, id,
1395 			spa_debug_type_find_name(spa_type_param, id),
1396 			res, spa_strerror(res));
1397 
1398 	/* set the parameters on all ports of the mixer node if possible */
1399 	if (res >= 0) {
1400 		struct pw_impl_port_mix *mix;
1401 
1402 		if (port->direction == PW_DIRECTION_INPUT &&
1403 		    id == SPA_PARAM_Format && param != NULL &&
1404 		    !SPA_FLAG_IS_SET(port->flags, PW_IMPL_PORT_FLAG_NO_MIXER)) {
1405 			setup_mixer(port, param);
1406 		}
1407 
1408 		spa_list_for_each(mix, &port->mix_list, link) {
1409 			spa_node_port_set_param(port->mix,
1410 				mix->port.direction, mix->port.port_id,
1411 				id, flags, param);
1412 		}
1413 		spa_node_port_set_param(port->mix,
1414 				pw_direction_reverse(port->direction), 0,
1415 				id, flags, param);
1416 	}
1417 
1418 	if (id == SPA_PARAM_Format) {
1419 		pw_log_debug("%p: %d %p %d", port, port->state, param, res);
1420 
1421 		if (port->added) {
1422 			pw_loop_invoke(node->data_loop, do_remove_port, SPA_ID_INVALID, NULL, 0, true, port);
1423 			port->added = false;
1424 		}
1425 		/* setting the format always destroys the negotiated buffers */
1426 		pw_buffers_clear(&port->buffers);
1427 		pw_buffers_clear(&port->mix_buffers);
1428 
1429 		if (param == NULL || res < 0) {
1430 			pw_impl_port_update_state(port, PW_IMPL_PORT_STATE_CONFIGURE, 0, NULL);
1431 		}
1432 		else if (spa_pod_is_fixated(param) <= 0) {
1433 			pw_impl_port_update_state(port, PW_IMPL_PORT_STATE_CONFIGURE, 0, NULL);
1434 			pw_impl_port_emit_param_changed(port, id);
1435 		}
1436 		else if (!SPA_RESULT_IS_ASYNC(res)) {
1437 			pw_impl_port_update_state(port, PW_IMPL_PORT_STATE_READY, 0, NULL);
1438 		}
1439 	}
1440 	return res;
1441 }
1442 
negotiate_mixer_buffers(struct pw_impl_port * port,uint32_t flags,struct spa_buffer ** buffers,uint32_t n_buffers)1443 static int negotiate_mixer_buffers(struct pw_impl_port *port, uint32_t flags,
1444                 struct spa_buffer **buffers, uint32_t n_buffers)
1445 {
1446 	int res;
1447 	struct pw_impl_node *node = port->node;
1448 
1449 	if (SPA_FLAG_IS_SET(port->mix_flags, PW_IMPL_PORT_MIX_FLAG_MIX_ONLY))
1450 		return 0;
1451 
1452 	if (SPA_FLAG_IS_SET(port->mix_flags, PW_IMPL_PORT_MIX_FLAG_NEGOTIATE)) {
1453 		int alloc_flags;
1454 
1455 		/* try dynamic data */
1456 		alloc_flags = PW_BUFFERS_FLAG_DYNAMIC;
1457 
1458 		pw_log_debug("%p: %d.%d negotiate %d buffers on node: %p",
1459 				port, port->direction, port->port_id, n_buffers, node->node);
1460 
1461 		if (port->added) {
1462 			pw_loop_invoke(node->data_loop, do_remove_port, SPA_ID_INVALID, NULL, 0, true, port);
1463 			port->added = false;
1464 		}
1465 
1466 		pw_buffers_clear(&port->mix_buffers);
1467 
1468 		if (n_buffers > 0) {
1469 			if ((res = pw_buffers_negotiate(node->context, alloc_flags,
1470 					port->mix, 0,
1471 					node->node, port->port_id,
1472 					&port->mix_buffers)) < 0) {
1473 				pw_log_warn("%p: can't negotiate buffers: %s",
1474 						port, spa_strerror(res));
1475 				return res;
1476 			}
1477 			buffers = port->mix_buffers.buffers;
1478 			n_buffers = port->mix_buffers.n_buffers;
1479 			flags = 0;
1480 		}
1481 	}
1482 
1483 	pw_log_debug("%p: %d.%d use %d buffers on node: %p",
1484 			port, port->direction, port->port_id, n_buffers, node->node);
1485 
1486 	res = spa_node_port_use_buffers(node->node,
1487 			port->direction, port->port_id,
1488 			flags, buffers, n_buffers);
1489 
1490 	if (SPA_RESULT_IS_OK(res)) {
1491 		spa_node_port_use_buffers(port->mix,
1492 			     pw_direction_reverse(port->direction), 0,
1493 			     0, buffers, n_buffers);
1494 	}
1495 	if (!port->added && n_buffers > 0) {
1496 		pw_loop_invoke(node->data_loop, do_add_port, SPA_ID_INVALID, NULL, 0, false, port);
1497 		port->added = true;
1498 	}
1499 	return res;
1500 }
1501 
1502 
1503 SPA_EXPORT
pw_impl_port_use_buffers(struct pw_impl_port * port,struct pw_impl_port_mix * mix,uint32_t flags,struct spa_buffer ** buffers,uint32_t n_buffers)1504 int pw_impl_port_use_buffers(struct pw_impl_port *port, struct pw_impl_port_mix *mix, uint32_t flags,
1505 		struct spa_buffer **buffers, uint32_t n_buffers)
1506 {
1507 	int res = 0, res2;
1508 
1509 	pw_log_debug("%p: %d:%d.%d: %d buffers flags:%d state:%d n_mix:%d", port,
1510 			port->direction, port->port_id, mix->id,
1511 			n_buffers, flags, port->state, port->n_mix);
1512 
1513 	if (n_buffers == 0 && port->state <= PW_IMPL_PORT_STATE_READY)
1514 		return 0;
1515 
1516 	if (n_buffers > 0 && port->state < PW_IMPL_PORT_STATE_READY)
1517 		return -EIO;
1518 
1519 	if (n_buffers == 0) {
1520 		if (port->n_mix == 1)
1521 			pw_impl_port_update_state(port, PW_IMPL_PORT_STATE_READY, 0, NULL);
1522 	}
1523 
1524 	/* first negotiate with the node, this makes it possible to let the
1525 	 * node allocate buffer memory if needed */
1526 	if (port->state == PW_IMPL_PORT_STATE_READY) {
1527 		res = negotiate_mixer_buffers(port, flags, buffers, n_buffers);
1528 
1529 		if (res < 0) {
1530 			pw_log_error("%p: negotiate buffers on node: %d (%s)",
1531 				port, res, spa_strerror(res));
1532 			pw_impl_port_update_state(port, PW_IMPL_PORT_STATE_ERROR, res,
1533 					strdup("can't negotiate buffers on port"));
1534 		} else if (n_buffers > 0 && !SPA_RESULT_IS_ASYNC(res)) {
1535 			pw_impl_port_update_state(port, PW_IMPL_PORT_STATE_PAUSED, 0, NULL);
1536 		}
1537 	}
1538 
1539 	/* then use the buffers on the mixer */
1540 	if (!SPA_FLAG_IS_SET(port->mix_flags, PW_IMPL_PORT_MIX_FLAG_MIX_ONLY))
1541 		flags &= ~SPA_NODE_BUFFERS_FLAG_ALLOC;
1542 
1543 	res2 = spa_node_port_use_buffers(port->mix,
1544 			mix->port.direction, mix->port.port_id, flags,
1545 			buffers, n_buffers);
1546 	if (res2 < 0) {
1547 		if (res2 != -ENOTSUP && n_buffers > 0) {
1548 			pw_log_warn("%p: mix use buffers failed: %d (%s)",
1549 					port, res2, spa_strerror(res2));
1550 			return res2;
1551 		}
1552 	}
1553 	else if (SPA_RESULT_IS_ASYNC(res2))
1554 		res = res2;
1555 
1556 	return res;
1557 }
1558