1 /* PipeWire
2 *
3 * Copyright © 2018 Wim Taymans
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining a
6 * copy of this software and associated documentation files (the "Software"),
7 * to deal in the Software without restriction, including without limitation
8 * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9 * and/or sell copies of the Software, and to permit persons to whom the
10 * Software is furnished to do so, subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice (including the next
13 * paragraph) shall be included in all copies or substantial portions of the
14 * Software.
15 *
16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
19 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
22 * DEALINGS IN THE SOFTWARE.
23 */
24
25 #include <string.h>
26 #include <stdlib.h>
27 #include <errno.h>
28 #include <float.h>
29
30 #include <spa/pod/parser.h>
31 #include <spa/param/audio/format-utils.h>
32 #include <spa/node/utils.h>
33 #include <spa/utils/names.h>
34 #include <spa/utils/string.h>
35 #include <spa/debug/types.h>
36 #include <spa/pod/filter.h>
37
38 #include "pipewire/impl.h"
39 #include "pipewire/private.h"
40
41 PW_LOG_TOPIC_EXTERN(log_port);
42 #define PW_LOG_TOPIC_DEFAULT log_port
43
44 /** \cond */
45 struct impl {
46 struct pw_impl_port this;
47 struct spa_node mix_node; /**< mix node implementation */
48
49 struct spa_list param_list;
50 struct spa_list pending_list;
51
52 unsigned int cache_params:1;
53 };
54
55 #define pw_port_resource(r,m,v,...) pw_resource_call(r,struct pw_port_events,m,v,__VA_ARGS__)
56 #define pw_port_resource_info(r,...) pw_port_resource(r,info,0,__VA_ARGS__)
57 #define pw_port_resource_param(r,...) pw_port_resource(r,param,0,__VA_ARGS__)
58
59 struct resource_data {
60 struct pw_impl_port *port;
61 struct pw_resource *resource;
62
63 struct spa_hook resource_listener;
64 struct spa_hook object_listener;
65
66 uint32_t subscribe_ids[MAX_PARAMS];
67 uint32_t n_subscribe_ids;
68 };
69
70 /** \endcond */
71
emit_info_changed(struct pw_impl_port * port)72 static void emit_info_changed(struct pw_impl_port *port)
73 {
74 struct pw_resource *resource;
75
76 if (port->info.change_mask == 0)
77 return;
78
79 pw_impl_port_emit_info_changed(port, &port->info);
80 if (port->node)
81 pw_impl_node_emit_port_info_changed(port->node, port, &port->info);
82
83 if (port->global)
84 spa_list_for_each(resource, &port->global->resource_list, link)
85 pw_port_resource_info(resource, &port->info);
86
87 port->info.change_mask = 0;
88 }
89
port_state_as_string(enum pw_impl_port_state state)90 static const char *port_state_as_string(enum pw_impl_port_state state)
91 {
92 switch (state) {
93 case PW_IMPL_PORT_STATE_ERROR:
94 return "error";
95 case PW_IMPL_PORT_STATE_INIT:
96 return "init";
97 case PW_IMPL_PORT_STATE_CONFIGURE:
98 return "configure";
99 case PW_IMPL_PORT_STATE_READY:
100 return "ready";
101 case PW_IMPL_PORT_STATE_PAUSED:
102 return "paused";
103 }
104 return "invalid-state";
105 }
106
pw_impl_port_update_state(struct pw_impl_port * port,enum pw_impl_port_state state,int res,char * error)107 void pw_impl_port_update_state(struct pw_impl_port *port, enum pw_impl_port_state state, int res, char *error)
108 {
109 enum pw_impl_port_state old = port->state;
110
111 port->state = state;
112 free((void*)port->error);
113 port->error = error;
114
115 if (old == state)
116 return;
117
118 pw_log(state == PW_IMPL_PORT_STATE_ERROR ?
119 SPA_LOG_LEVEL_ERROR : SPA_LOG_LEVEL_DEBUG,
120 "%p: state %s -> %s (%s)", port,
121 port_state_as_string(old), port_state_as_string(state), error);
122
123 pw_impl_port_emit_state_changed(port, old, state, error);
124
125 if (state == PW_IMPL_PORT_STATE_ERROR && port->global) {
126 struct pw_resource *resource;
127 spa_list_for_each(resource, &port->global->resource_list, link)
128 pw_resource_error(resource, res, error);
129 }
130 }
131
tee_process(void * object)132 static int tee_process(void *object)
133 {
134 struct impl *impl = object;
135 struct pw_impl_port *this = &impl->this;
136 struct pw_impl_port_mix *mix;
137 struct spa_io_buffers *io = &this->rt.io;
138
139 pw_log_trace_fp("%p: tee input %d %d", this, io->status, io->buffer_id);
140 spa_list_for_each(mix, &this->rt.mix_list, rt_link) {
141 pw_log_trace_fp("%p: port %d %p->%p %d", this,
142 mix->port.port_id, io, mix->io, mix->io->buffer_id);
143 *mix->io = *io;
144 }
145 io->status = SPA_STATUS_NEED_DATA;
146
147 return SPA_STATUS_HAVE_DATA | SPA_STATUS_NEED_DATA;
148 }
149
tee_reuse_buffer(void * object,uint32_t port_id,uint32_t buffer_id)150 static int tee_reuse_buffer(void *object, uint32_t port_id, uint32_t buffer_id)
151 {
152 struct impl *impl = object;
153 struct pw_impl_port *this = &impl->this;
154
155 pw_log_trace_fp("%p: tee reuse buffer %d %d", this, port_id, buffer_id);
156 spa_node_port_reuse_buffer(this->node->node, this->port_id, buffer_id);
157
158 return 0;
159 }
160
161 static const struct spa_node_methods schedule_tee_node = {
162 SPA_VERSION_NODE_METHODS,
163 .process = tee_process,
164 .port_reuse_buffer = tee_reuse_buffer,
165 };
166
schedule_mix_input(void * object)167 static int schedule_mix_input(void *object)
168 {
169 struct impl *impl = object;
170 struct pw_impl_port *this = &impl->this;
171 struct spa_io_buffers *io = &this->rt.io;
172 struct pw_impl_port_mix *mix;
173
174 if (SPA_UNLIKELY(PW_IMPL_PORT_IS_CONTROL(this)))
175 return SPA_STATUS_HAVE_DATA | SPA_STATUS_NEED_DATA;
176
177 spa_list_for_each(mix, &this->rt.mix_list, rt_link) {
178 pw_log_trace_fp("%p: mix input %d %p->%p %d %d", this,
179 mix->port.port_id, mix->io, io, mix->io->status, mix->io->buffer_id);
180 *io = *mix->io;
181 mix->io->status = SPA_STATUS_NEED_DATA;
182 break;
183 }
184 return SPA_STATUS_HAVE_DATA | SPA_STATUS_NEED_DATA;
185 }
186
schedule_mix_reuse_buffer(void * object,uint32_t port_id,uint32_t buffer_id)187 static int schedule_mix_reuse_buffer(void *object, uint32_t port_id, uint32_t buffer_id)
188 {
189 struct impl *impl = object;
190 struct pw_impl_port *this = &impl->this;
191 struct pw_impl_port_mix *mix;
192
193 spa_list_for_each(mix, &this->rt.mix_list, rt_link) {
194 pw_log_trace_fp("%p: reuse buffer %d %d", this, port_id, buffer_id);
195 /* FIXME send reuse buffer to peer */
196 break;
197 }
198 return 0;
199 }
200
201 static const struct spa_node_methods schedule_mix_node = {
202 SPA_VERSION_NODE_METHODS,
203 .process = schedule_mix_input,
204 .port_reuse_buffer = schedule_mix_reuse_buffer,
205 };
206
207 SPA_EXPORT
pw_impl_port_init_mix(struct pw_impl_port * port,struct pw_impl_port_mix * mix)208 int pw_impl_port_init_mix(struct pw_impl_port *port, struct pw_impl_port_mix *mix)
209 {
210 uint32_t port_id;
211 int res = 0;
212
213 port_id = pw_map_insert_new(&port->mix_port_map, mix);
214 if (port_id == SPA_ID_INVALID)
215 return -errno;
216
217 mix->port.direction = port->direction;
218 mix->port.port_id = port_id;
219
220 spa_list_append(&port->mix_list, &mix->link);
221 port->n_mix++;
222 mix->p = port;
223
224 spa_node_add_port(port->mix, port->direction, port_id, NULL);
225
226 res = pw_impl_port_call_init_mix(port, mix);
227
228 /* set the same format on the mixer as on the port if any */
229 {
230 uint32_t idx = 0;
231 uint8_t buffer[1024];
232 struct spa_pod_builder b;
233 struct spa_pod *param;
234
235 spa_pod_builder_init(&b, buffer, sizeof(buffer));
236 if (spa_node_port_enum_params_sync(port->mix,
237 pw_direction_reverse(port->direction), 0,
238 SPA_PARAM_Format, &idx, NULL, ¶m, &b) == 1) {
239 spa_node_port_set_param(port->mix,
240 port->direction, port_id,
241 SPA_PARAM_Format, 0, param);
242 }
243 }
244
245 pw_log_debug("%p: init mix n_mix:%d %d.%d io:%p: (%s)", port,
246 port->n_mix, port->port_id, mix->port.port_id,
247 mix->io, spa_strerror(res));
248
249 return res;
250 }
251
252 SPA_EXPORT
pw_impl_port_release_mix(struct pw_impl_port * port,struct pw_impl_port_mix * mix)253 int pw_impl_port_release_mix(struct pw_impl_port *port, struct pw_impl_port_mix *mix)
254 {
255 int res = 0;
256 uint32_t port_id = mix->port.port_id;
257
258 pw_map_remove(&port->mix_port_map, port_id);
259 spa_list_remove(&mix->link);
260 port->n_mix--;
261
262 res = pw_impl_port_call_release_mix(port, mix);
263
264 spa_node_remove_port(port->mix, port->direction, port_id);
265
266 pw_log_debug("%p: release mix %d %d.%d", port,
267 port->n_mix, port->port_id, mix->port.port_id);
268
269 return res;
270 }
271
update_properties(struct pw_impl_port * port,const struct spa_dict * dict,bool filter)272 static int update_properties(struct pw_impl_port *port, const struct spa_dict *dict, bool filter)
273 {
274 static const char * const ignored[] = {
275 PW_KEY_OBJECT_ID,
276 PW_KEY_PORT_DIRECTION,
277 PW_KEY_PORT_CONTROL,
278 PW_KEY_NODE_ID,
279 PW_KEY_PORT_ID,
280 NULL
281 };
282
283 int changed;
284
285 changed = pw_properties_update_ignore(port->properties, dict, filter ? ignored : NULL);
286 port->info.props = &port->properties->dict;
287
288 if (changed) {
289 pw_log_debug("%p: updated %d properties", port, changed);
290 port->info.change_mask |= PW_PORT_CHANGE_MASK_PROPS;
291 }
292 return changed;
293 }
294
resource_is_subscribed(struct pw_resource * resource,uint32_t id)295 static int resource_is_subscribed(struct pw_resource *resource, uint32_t id)
296 {
297 struct resource_data *data = pw_resource_get_user_data(resource);
298 uint32_t i;
299
300 for (i = 0; i < data->n_subscribe_ids; i++) {
301 if (data->subscribe_ids[i] == id)
302 return 1;
303 }
304 return 0;
305 }
306
notify_param(void * data,int seq,uint32_t id,uint32_t index,uint32_t next,struct spa_pod * param)307 static int notify_param(void *data, int seq, uint32_t id,
308 uint32_t index, uint32_t next, struct spa_pod *param)
309 {
310 struct pw_impl_port *port = data;
311 struct pw_resource *resource;
312
313 spa_list_for_each(resource, &port->global->resource_list, link) {
314 if (!resource_is_subscribed(resource, id))
315 continue;
316
317 pw_log_debug("%p: resource %p notify param %d", port, resource, id);
318 pw_port_resource_param(resource, seq, id, index, next, param);
319 }
320 return 0;
321 }
322
emit_params(struct pw_impl_port * port,uint32_t * changed_ids,uint32_t n_changed_ids)323 static void emit_params(struct pw_impl_port *port, uint32_t *changed_ids, uint32_t n_changed_ids)
324 {
325 uint32_t i;
326 int res;
327
328 if (port->global == NULL)
329 return;
330
331 pw_log_debug("%p: emit %d params", port, n_changed_ids);
332
333 for (i = 0; i < n_changed_ids; i++) {
334 struct pw_resource *resource;
335 int subscribed = 0;
336
337 pw_log_debug("%p: emit param %d/%d: %d", port, i, n_changed_ids,
338 changed_ids[i]);
339
340 pw_impl_port_emit_param_changed(port, changed_ids[i]);
341
342 /* first check if anyone is subscribed */
343 spa_list_for_each(resource, &port->global->resource_list, link) {
344 if ((subscribed = resource_is_subscribed(resource, changed_ids[i])))
345 break;
346 }
347 if (!subscribed)
348 continue;
349
350 if ((res = pw_impl_port_for_each_param(port, 1, changed_ids[i], 0, UINT32_MAX,
351 NULL, notify_param, port)) < 0) {
352 pw_log_error("%p: error %d (%s)", port, res, spa_strerror(res));
353 }
354 }
355 }
356
process_latency_param(void * data,int seq,uint32_t id,uint32_t index,uint32_t next,struct spa_pod * param)357 static int process_latency_param(void *data, int seq,
358 uint32_t id, uint32_t index, uint32_t next, struct spa_pod *param)
359 {
360 struct pw_impl_port *this = data;
361 struct spa_latency_info latency;
362
363 if (id != SPA_PARAM_Latency)
364 return -EINVAL;
365
366 if (spa_latency_parse(param, &latency) < 0)
367 return 0;
368 if (spa_latency_info_compare(&this->latency[latency.direction], &latency) == 0)
369 return 0;
370
371 pw_log_debug("port %p: got %s latency %f-%f %d-%d %"PRIu64"-%"PRIu64, this,
372 pw_direction_as_string(latency.direction),
373 latency.min_quantum, latency.max_quantum,
374 latency.min_rate, latency.max_rate,
375 latency.min_ns, latency.max_ns);
376
377 this->latency[latency.direction] = latency;
378 if (latency.direction == this->direction)
379 pw_impl_port_emit_latency_changed(this);
380
381 return 0;
382 }
383
update_info(struct pw_impl_port * port,const struct spa_port_info * info)384 static void update_info(struct pw_impl_port *port, const struct spa_port_info *info)
385 {
386 uint32_t changed_ids[MAX_PARAMS], n_changed_ids = 0;
387
388 pw_log_debug("%p: %p flags:%08"PRIx64" change_mask:%08"PRIx64,
389 port, info, info->flags, info->change_mask);
390
391 if (info->change_mask & SPA_PORT_CHANGE_MASK_FLAGS) {
392 port->spa_flags = info->flags;
393 }
394 if (info->change_mask & SPA_PORT_CHANGE_MASK_PROPS) {
395 if (info->props) {
396 update_properties(port, info->props, true);
397 } else {
398 pw_log_warn("%p: port PROPS update but no properties", port);
399 }
400 }
401 if (info->change_mask & SPA_PORT_CHANGE_MASK_PARAMS) {
402 uint32_t i;
403
404 port->info.change_mask |= PW_PORT_CHANGE_MASK_PARAMS;
405 port->info.n_params = SPA_MIN(info->n_params, SPA_N_ELEMENTS(port->params));
406
407 for (i = 0; i < port->info.n_params; i++) {
408 uint32_t id = info->params[i].id;
409
410 pw_log_debug("%p: param %d id:%d (%s) %08x:%08x", port, i,
411 id, spa_debug_type_find_name(spa_type_param, id),
412 port->info.params[i].flags, info->params[i].flags);
413
414 port->info.params[i].id = info->params[i].id;
415 if (port->info.params[i].flags == info->params[i].flags)
416 continue;
417
418 pw_log_debug("%p: update param %d", port, id);
419 port->info.params[i] = info->params[i];
420 port->info.params[i].user = 0;
421
422 if (info->params[i].flags & SPA_PARAM_INFO_READ)
423 changed_ids[n_changed_ids++] = id;
424
425 switch (id) {
426 case SPA_PARAM_Latency:
427 port->have_latency_param =
428 SPA_FLAG_IS_SET(info->params[i].flags, SPA_PARAM_INFO_WRITE);
429 if (port->node != NULL)
430 pw_impl_port_for_each_param(port, 0, id, 0, UINT32_MAX,
431 NULL, process_latency_param, port);
432 break;
433 default:
434 break;
435 }
436 }
437 }
438
439 if (n_changed_ids > 0)
440 emit_params(port, changed_ids, n_changed_ids);
441 }
442
443 SPA_EXPORT
pw_context_create_port(struct pw_context * context,enum pw_direction direction,uint32_t port_id,const struct spa_port_info * info,size_t user_data_size)444 struct pw_impl_port *pw_context_create_port(
445 struct pw_context *context,
446 enum pw_direction direction,
447 uint32_t port_id,
448 const struct spa_port_info *info,
449 size_t user_data_size)
450 {
451 struct impl *impl;
452 struct pw_impl_port *this;
453 struct pw_properties *properties;
454 const struct spa_node_methods *mix_methods;
455 int res;
456
457 impl = calloc(1, sizeof(struct impl) + user_data_size);
458 if (impl == NULL)
459 return NULL;
460
461 spa_list_init(&impl->param_list);
462 spa_list_init(&impl->pending_list);
463 impl->cache_params = true;
464
465 this = &impl->this;
466 pw_log_debug("%p: new %s %d", this,
467 pw_direction_as_string(direction), port_id);
468
469 if (info && info->change_mask & SPA_PORT_CHANGE_MASK_PROPS && info->props)
470 properties = pw_properties_new_dict(info->props);
471 else
472 properties = pw_properties_new(NULL, NULL);
473
474 if (properties == NULL) {
475 res = -errno;
476 goto error_no_mem;
477 }
478 pw_properties_setf(properties, PW_KEY_PORT_ID, "%u", port_id);
479
480 if (info) {
481 if (SPA_FLAG_IS_SET(info->flags, SPA_PORT_FLAG_PHYSICAL))
482 pw_properties_set(properties, PW_KEY_PORT_PHYSICAL, "true");
483 if (SPA_FLAG_IS_SET(info->flags, SPA_PORT_FLAG_TERMINAL))
484 pw_properties_set(properties, PW_KEY_PORT_TERMINAL, "true");
485 this->spa_flags = info->flags;
486 }
487
488 this->direction = direction;
489 this->port_id = port_id;
490 this->properties = properties;
491 this->state = PW_IMPL_PORT_STATE_INIT;
492 this->rt.io = SPA_IO_BUFFERS_INIT;
493
494 if (user_data_size > 0)
495 this->user_data = SPA_PTROFF(impl, sizeof(struct impl), void);
496
497 this->info.direction = direction;
498 this->info.params = this->params;
499 this->info.change_mask = PW_PORT_CHANGE_MASK_PROPS;
500 this->info.props = &this->properties->dict;
501
502 spa_list_init(&this->links);
503 spa_list_init(&this->mix_list);
504 spa_list_init(&this->rt.mix_list);
505 spa_list_init(&this->control_list[0]);
506 spa_list_init(&this->control_list[1]);
507
508 spa_hook_list_init(&this->listener_list);
509
510 if (this->direction == PW_DIRECTION_INPUT)
511 mix_methods = &schedule_mix_node;
512 else
513 mix_methods = &schedule_tee_node;
514
515 impl->mix_node.iface = SPA_INTERFACE_INIT(
516 SPA_TYPE_INTERFACE_Node,
517 SPA_VERSION_NODE,
518 mix_methods, impl);
519
520 pw_impl_port_set_mix(this, NULL, 0);
521
522 pw_map_init(&this->mix_port_map, 64, 64);
523
524 this->latency[SPA_DIRECTION_INPUT] = SPA_LATENCY_INFO(SPA_DIRECTION_INPUT);
525 this->latency[SPA_DIRECTION_OUTPUT] = SPA_LATENCY_INFO(SPA_DIRECTION_OUTPUT);
526
527 if (info)
528 update_info(this, info);
529
530 return this;
531
532 error_no_mem:
533 pw_log_warn("%p: new failed", impl);
534 free(impl);
535 errno = -res;
536 return NULL;
537 }
538
539 SPA_EXPORT
pw_impl_port_set_mix(struct pw_impl_port * port,struct spa_node * node,uint32_t flags)540 int pw_impl_port_set_mix(struct pw_impl_port *port, struct spa_node *node, uint32_t flags)
541 {
542 struct impl *impl = SPA_CONTAINER_OF(port, struct impl, this);
543 struct pw_impl_port_mix *mix;
544
545 if (node == NULL) {
546 node = &impl->mix_node;
547 flags = 0;
548 }
549
550 pw_log_debug("%p: mix node %p->%p", port, port->mix, node);
551
552 if (port->mix != NULL && port->mix != node) {
553 spa_list_for_each(mix, &port->mix_list, link)
554 spa_node_remove_port(port->mix, mix->port.direction, mix->port.port_id);
555
556 spa_node_port_set_io(port->mix,
557 pw_direction_reverse(port->direction), 0,
558 SPA_IO_Buffers, NULL, 0);
559 }
560 if (port->mix_handle != NULL) {
561 pw_unload_spa_handle(port->mix_handle);
562 port->mix_handle = NULL;
563 }
564
565 port->mix_flags = flags;
566 port->mix = node;
567
568 if (port->mix) {
569 spa_list_for_each(mix, &port->mix_list, link)
570 spa_node_add_port(port->mix, mix->port.direction, mix->port.port_id, NULL);
571
572 spa_node_port_set_io(port->mix,
573 pw_direction_reverse(port->direction), 0,
574 SPA_IO_Buffers,
575 &port->rt.io, sizeof(port->rt.io));
576 }
577 return 0;
578 }
579
setup_mixer(struct pw_impl_port * port,const struct spa_pod * param)580 static int setup_mixer(struct pw_impl_port *port, const struct spa_pod *param)
581 {
582 uint32_t media_type, media_subtype;
583 int res;
584 const char *fallback_lib, *factory_name;
585 struct spa_handle *handle;
586 struct spa_dict_item items[1];
587 void *iface;
588
589 if ((res = spa_format_parse(param, &media_type, &media_subtype)) < 0)
590 return res;
591
592 pw_log_debug("%p: %s/%s", port,
593 spa_debug_type_find_name(spa_type_media_type, media_type),
594 spa_debug_type_find_name(spa_type_media_subtype, media_subtype));
595
596 switch (media_type) {
597 case SPA_MEDIA_TYPE_audio:
598 switch (media_subtype) {
599 case SPA_MEDIA_SUBTYPE_dsp:
600 {
601 struct spa_audio_info_dsp info;
602 if ((res = spa_format_audio_dsp_parse(param, &info)) < 0)
603 return res;
604
605 if (info.format != SPA_AUDIO_FORMAT_DSP_F32)
606 return -ENOTSUP;
607
608 fallback_lib = "audiomixer/libspa-audiomixer";
609 factory_name = SPA_NAME_AUDIO_MIXER_DSP;
610 break;
611 }
612 case SPA_MEDIA_SUBTYPE_raw:
613 fallback_lib = "audiomixer/libspa-audiomixer";
614 factory_name = SPA_NAME_AUDIO_MIXER;
615 break;
616 default:
617 return -ENOTSUP;
618 }
619 break;
620
621 case SPA_MEDIA_TYPE_application:
622 switch (media_subtype) {
623 case SPA_MEDIA_SUBTYPE_control:
624 fallback_lib = "control/libspa-control";
625 factory_name = SPA_NAME_CONTROL_MIXER;
626 break;
627 default:
628 return -ENOTSUP;
629 }
630 break;
631
632 default:
633 return -ENOTSUP;
634 }
635
636 items[0] = SPA_DICT_ITEM_INIT(SPA_KEY_LIBRARY_NAME, fallback_lib);
637 handle = pw_context_load_spa_handle(port->node->context, factory_name,
638 &SPA_DICT_INIT_ARRAY(items));
639 if (handle == NULL)
640 return -errno;
641
642 if ((res = spa_handle_get_interface(handle,
643 SPA_TYPE_INTERFACE_Node, &iface)) < 0) {
644 pw_unload_spa_handle(handle);
645 return res;
646 }
647
648 pw_log_debug("mix node handle:%p iface:%p", handle, iface);
649 pw_impl_port_set_mix(port, (struct spa_node*)iface,
650 PW_IMPL_PORT_MIX_FLAG_MULTI |
651 PW_IMPL_PORT_MIX_FLAG_NEGOTIATE);
652 port->mix_handle = handle;
653
654 return 0;
655 }
656
657 SPA_EXPORT
pw_impl_port_get_direction(struct pw_impl_port * port)658 enum pw_direction pw_impl_port_get_direction(struct pw_impl_port *port)
659 {
660 return port->direction;
661 }
662
663 SPA_EXPORT
pw_impl_port_get_id(struct pw_impl_port * port)664 uint32_t pw_impl_port_get_id(struct pw_impl_port *port)
665 {
666 return port->port_id;
667 }
668
669 SPA_EXPORT
pw_impl_port_get_properties(struct pw_impl_port * port)670 const struct pw_properties *pw_impl_port_get_properties(struct pw_impl_port *port)
671 {
672 return port->properties;
673 }
674
675 SPA_EXPORT
pw_impl_port_update_properties(struct pw_impl_port * port,const struct spa_dict * dict)676 int pw_impl_port_update_properties(struct pw_impl_port *port, const struct spa_dict *dict)
677 {
678 int changed = update_properties(port, dict, false);
679 emit_info_changed(port);
680 return changed;
681 }
682
pw_impl_port_update_info(struct pw_impl_port * port,const struct spa_port_info * info)683 void pw_impl_port_update_info(struct pw_impl_port *port, const struct spa_port_info *info)
684 {
685 update_info(port, info);
686 emit_info_changed(port);
687 }
688
689 SPA_EXPORT
pw_impl_port_get_node(struct pw_impl_port * port)690 struct pw_impl_node *pw_impl_port_get_node(struct pw_impl_port *port)
691 {
692 return port->node;
693 }
694
695 SPA_EXPORT
pw_impl_port_add_listener(struct pw_impl_port * port,struct spa_hook * listener,const struct pw_impl_port_events * events,void * data)696 void pw_impl_port_add_listener(struct pw_impl_port *port,
697 struct spa_hook *listener,
698 const struct pw_impl_port_events *events,
699 void *data)
700 {
701 spa_hook_list_append(&port->listener_list, listener, events, data);
702 }
703
704 SPA_EXPORT
pw_impl_port_get_info(struct pw_impl_port * port)705 const struct pw_port_info *pw_impl_port_get_info(struct pw_impl_port *port)
706 {
707 return &port->info;
708 }
709
710 SPA_EXPORT
pw_impl_port_get_user_data(struct pw_impl_port * port)711 void * pw_impl_port_get_user_data(struct pw_impl_port *port)
712 {
713 return port->user_data;
714 }
715
do_add_port(struct spa_loop * loop,bool async,uint32_t seq,const void * data,size_t size,void * user_data)716 static int do_add_port(struct spa_loop *loop,
717 bool async, uint32_t seq, const void *data, size_t size, void *user_data)
718 {
719 struct pw_impl_port *this = user_data;
720
721 pw_log_trace("%p: add port", this);
722 if (this->direction == PW_DIRECTION_INPUT)
723 spa_list_append(&this->node->rt.input_mix, &this->rt.node_link);
724 else
725 spa_list_append(&this->node->rt.output_mix, &this->rt.node_link);
726
727 return 0;
728 }
729
check_param_io(void * data,int seq,uint32_t id,uint32_t index,uint32_t next,struct spa_pod * param)730 static int check_param_io(void *data, int seq, uint32_t id,
731 uint32_t index, uint32_t next, struct spa_pod *param)
732 {
733 struct pw_impl_port *port = data;
734 struct pw_impl_node *node = port->node;
735 uint32_t pid, psize;
736
737 if (spa_pod_parse_object(param,
738 SPA_TYPE_OBJECT_ParamIO, NULL,
739 SPA_PARAM_IO_id, SPA_POD_Id(&pid),
740 SPA_PARAM_IO_size, SPA_POD_Int(&psize)) < 0)
741 return 0;
742
743 pw_log_debug("%p: got io id:%d (%s)", port, pid,
744 spa_debug_type_find_name(spa_type_io, pid));
745
746 switch (pid) {
747 case SPA_IO_Control:
748 case SPA_IO_Notify:
749 pw_control_new(node->context, port, pid, psize, 0);
750 SPA_FLAG_SET(port->flags, PW_IMPL_PORT_FLAG_CONTROL);
751 break;
752 case SPA_IO_Buffers:
753 SPA_FLAG_SET(port->flags, PW_IMPL_PORT_FLAG_BUFFERS);
754 break;
755 default:
756 break;
757 }
758 return 0;
759 }
760
reply_param(void * data,int seq,uint32_t id,uint32_t index,uint32_t next,struct spa_pod * param)761 static int reply_param(void *data, int seq, uint32_t id,
762 uint32_t index, uint32_t next, struct spa_pod *param)
763 {
764 struct resource_data *d = data;
765 struct pw_resource *resource = d->resource;
766 pw_log_debug("%p: resource %p reply param %u %u %u", d->port,
767 resource, id, index, next);
768 pw_port_resource_param(resource, seq, id, index, next, param);
769 return 0;
770 }
771
port_enum_params(void * object,int seq,uint32_t id,uint32_t index,uint32_t num,const struct spa_pod * filter)772 static int port_enum_params(void *object, int seq, uint32_t id, uint32_t index, uint32_t num,
773 const struct spa_pod *filter)
774 {
775 struct resource_data *data = object;
776 struct pw_resource *resource = data->resource;
777 struct pw_impl_port *port = data->port;
778 int res;
779
780 pw_log_debug("%p: resource %p enum params seq:%d id:%d (%s) index:%u num:%u", port,
781 resource, seq, id, spa_debug_type_find_name(spa_type_param, id),
782 index, num);
783
784 if ((res = pw_impl_port_for_each_param(port, seq, id, index, num, filter,
785 reply_param, data)) < 0)
786 pw_resource_errorf(resource, res,
787 "enum params id:%d (%s) failed", id,
788 spa_debug_type_find_name(spa_type_param, id));
789 return res;
790 }
791
port_subscribe_params(void * object,uint32_t * ids,uint32_t n_ids)792 static int port_subscribe_params(void *object, uint32_t *ids, uint32_t n_ids)
793 {
794 struct resource_data *data = object;
795 struct pw_resource *resource = data->resource;
796 uint32_t i;
797
798 n_ids = SPA_MIN(n_ids, SPA_N_ELEMENTS(data->subscribe_ids));
799 data->n_subscribe_ids = n_ids;
800
801 for (i = 0; i < n_ids; i++) {
802 data->subscribe_ids[i] = ids[i];
803 pw_log_debug("%p: resource %p subscribe param id:%d (%s)", data->port,
804 resource, ids[i],
805 spa_debug_type_find_name(spa_type_param, ids[i]));
806 port_enum_params(data, 1, ids[i], 0, UINT32_MAX, NULL);
807 }
808 return 0;
809 }
810
811 static const struct pw_port_methods port_methods = {
812 PW_VERSION_PORT_METHODS,
813 .subscribe_params = port_subscribe_params,
814 .enum_params = port_enum_params
815 };
816
resource_destroy(void * data)817 static void resource_destroy(void *data)
818 {
819 struct resource_data *d = data;
820 spa_hook_remove(&d->resource_listener);
821 spa_hook_remove(&d->object_listener);
822 }
823
824 static const struct pw_resource_events resource_events = {
825 PW_VERSION_RESOURCE_EVENTS,
826 .destroy = resource_destroy,
827 };
828
829 static int
global_bind(void * _data,struct pw_impl_client * client,uint32_t permissions,uint32_t version,uint32_t id)830 global_bind(void *_data, struct pw_impl_client *client, uint32_t permissions,
831 uint32_t version, uint32_t id)
832 {
833 struct pw_impl_port *this = _data;
834 struct pw_global *global = this->global;
835 struct pw_resource *resource;
836 struct resource_data *data;
837 int res;
838
839 resource = pw_resource_new(client, id, permissions, global->type, version, sizeof(*data));
840 if (resource == NULL) {
841 res = -errno;
842 goto error_resource;
843 }
844
845 data = pw_resource_get_user_data(resource);
846 data->port = this;
847 data->resource = resource;
848
849 pw_resource_add_listener(resource,
850 &data->resource_listener,
851 &resource_events, data);
852 pw_resource_add_object_listener(resource,
853 &data->object_listener,
854 &port_methods, data);
855
856 pw_log_debug("%p: bound to %d", this, resource->id);
857 pw_global_add_resource(global, resource);
858
859 this->info.change_mask = PW_PORT_CHANGE_MASK_ALL;
860 pw_port_resource_info(resource, &this->info);
861 this->info.change_mask = 0;
862 return 0;
863
864 error_resource:
865 pw_log_error("%p: can't create port resource: %m", this);
866 return res;
867 }
868
global_destroy(void * object)869 static void global_destroy(void *object)
870 {
871 struct pw_impl_port *port = object;
872 spa_hook_remove(&port->global_listener);
873 port->global = NULL;
874 pw_impl_port_destroy(port);
875 }
876
877 static const struct pw_global_events global_events = {
878 PW_VERSION_GLOBAL_EVENTS,
879 .destroy = global_destroy,
880 };
881
pw_impl_port_register(struct pw_impl_port * port,struct pw_properties * properties)882 int pw_impl_port_register(struct pw_impl_port *port,
883 struct pw_properties *properties)
884 {
885 static const char * const keys[] = {
886 PW_KEY_OBJECT_SERIAL,
887 PW_KEY_OBJECT_PATH,
888 PW_KEY_FORMAT_DSP,
889 PW_KEY_NODE_ID,
890 PW_KEY_AUDIO_CHANNEL,
891 PW_KEY_PORT_ID,
892 PW_KEY_PORT_NAME,
893 PW_KEY_PORT_DIRECTION,
894 PW_KEY_PORT_MONITOR,
895 PW_KEY_PORT_PHYSICAL,
896 PW_KEY_PORT_TERMINAL,
897 PW_KEY_PORT_CONTROL,
898 PW_KEY_PORT_ALIAS,
899 PW_KEY_PORT_EXTRA,
900 NULL
901 };
902
903 struct pw_impl_node *node = port->node;
904
905 if (node == NULL || node->global == NULL)
906 return -EIO;
907
908 port->global = pw_global_new(node->context,
909 PW_TYPE_INTERFACE_Port,
910 PW_VERSION_PORT,
911 properties,
912 global_bind,
913 port);
914 if (port->global == NULL)
915 return -errno;
916
917 pw_global_add_listener(port->global, &port->global_listener, &global_events, port);
918
919 port->info.id = port->global->id;
920 pw_properties_setf(port->properties, PW_KEY_NODE_ID, "%d", node->global->id);
921 pw_properties_setf(port->properties, PW_KEY_OBJECT_ID, "%d", port->info.id);
922 pw_properties_setf(port->properties, PW_KEY_OBJECT_SERIAL, "%"PRIu64,
923 pw_global_get_serial(port->global));
924 port->info.props = &port->properties->dict;
925
926 pw_global_update_keys(port->global, &port->properties->dict, keys);
927
928 pw_impl_port_emit_initialized(port);
929
930 return pw_global_register(port->global);
931 }
932
933 SPA_EXPORT
pw_impl_port_add(struct pw_impl_port * port,struct pw_impl_node * node)934 int pw_impl_port_add(struct pw_impl_port *port, struct pw_impl_node *node)
935 {
936 uint32_t port_id = port->port_id;
937 struct spa_list *ports;
938 struct pw_map *portmap;
939 struct pw_impl_port *find;
940 bool control;
941 const char *str, *dir;
942 int res;
943
944 if (port->node != NULL)
945 return -EEXIST;
946
947 if (port->direction == PW_DIRECTION_INPUT) {
948 ports = &node->input_ports;
949 portmap = &node->input_port_map;
950 } else {
951 ports = &node->output_ports;
952 portmap = &node->output_port_map;
953 }
954
955 find = pw_map_lookup(portmap, port_id);
956 if (find != NULL)
957 return -EEXIST;
958
959 if ((res = pw_map_insert_at(portmap, port_id, port)) < 0)
960 return res;
961
962 port->node = node;
963
964 pw_impl_node_emit_port_init(node, port);
965
966 pw_impl_port_for_each_param(port, 0, SPA_PARAM_IO, 0, 0, NULL, check_param_io, port);
967 pw_impl_port_for_each_param(port, 0, SPA_PARAM_Latency, 0, 0, NULL, process_latency_param, port);
968
969 control = PW_IMPL_PORT_IS_CONTROL(port);
970 if (control) {
971 dir = port->direction == PW_DIRECTION_INPUT ? "control" : "notify";
972 pw_properties_set(port->properties, PW_KEY_PORT_CONTROL, "true");
973 }
974 else {
975 dir = port->direction == PW_DIRECTION_INPUT ? "in" : "out";
976 }
977 pw_properties_set(port->properties, PW_KEY_PORT_DIRECTION, dir);
978
979 if (pw_properties_get(port->properties, PW_KEY_PORT_NAME) == NULL) {
980 if ((str = pw_properties_get(port->properties, PW_KEY_AUDIO_CHANNEL)) != NULL &&
981 !spa_streq(str, "UNK")) {
982 pw_properties_setf(port->properties, PW_KEY_PORT_NAME, "%s_%s", dir, str);
983 }
984 else {
985 pw_properties_setf(port->properties, PW_KEY_PORT_NAME, "%s_%d", dir, port->port_id);
986 }
987 }
988 port->info.props = &port->properties->dict;
989
990 if (control) {
991 pw_log_debug("%p: setting node control", port);
992 } else {
993 pw_log_debug("%p: setting node io", port);
994 spa_node_port_set_io(node->node,
995 port->direction, port->port_id,
996 SPA_IO_Buffers,
997 &port->rt.io, sizeof(port->rt.io));
998
999 spa_node_port_set_io(port->mix,
1000 pw_direction_reverse(port->direction), 0,
1001 SPA_IO_Buffers,
1002 &port->rt.io, sizeof(port->rt.io));
1003 }
1004
1005 pw_log_debug("%p: %d add to node %p", port, port_id, node);
1006
1007 spa_list_append(ports, &port->link);
1008
1009 if (port->direction == PW_DIRECTION_INPUT) {
1010 node->info.n_input_ports++;
1011 node->info.change_mask |= PW_NODE_CHANGE_MASK_INPUT_PORTS;
1012 } else {
1013 node->info.n_output_ports++;
1014 node->info.change_mask |= PW_NODE_CHANGE_MASK_OUTPUT_PORTS;
1015 }
1016
1017 if (node->global)
1018 pw_impl_port_register(port, NULL);
1019
1020 if (port->state <= PW_IMPL_PORT_STATE_INIT)
1021 pw_impl_port_update_state(port, PW_IMPL_PORT_STATE_CONFIGURE, 0, NULL);
1022
1023 pw_impl_node_emit_port_added(node, port);
1024 emit_info_changed(port);
1025
1026 return 0;
1027 }
1028
do_destroy_link(void * data,struct pw_impl_link * link)1029 static int do_destroy_link(void *data, struct pw_impl_link *link)
1030 {
1031 pw_impl_link_destroy(link);
1032 return 0;
1033 }
1034
pw_impl_port_unlink(struct pw_impl_port * port)1035 void pw_impl_port_unlink(struct pw_impl_port *port)
1036 {
1037 pw_impl_port_for_each_link(port, do_destroy_link, port);
1038 }
1039
do_remove_port(struct spa_loop * loop,bool async,uint32_t seq,const void * data,size_t size,void * user_data)1040 static int do_remove_port(struct spa_loop *loop,
1041 bool async, uint32_t seq, const void *data, size_t size, void *user_data)
1042 {
1043 struct pw_impl_port *this = user_data;
1044
1045 pw_log_trace("%p: remove port", this);
1046 spa_list_remove(&this->rt.node_link);
1047
1048 return 0;
1049 }
1050
pw_impl_port_remove(struct pw_impl_port * port)1051 static void pw_impl_port_remove(struct pw_impl_port *port)
1052 {
1053 struct pw_impl_node *node = port->node;
1054 int res;
1055
1056 if (node == NULL)
1057 return;
1058
1059 pw_log_debug("%p: remove added:%d", port, port->added);
1060
1061 if (port->added) {
1062 pw_loop_invoke(node->data_loop, do_remove_port,
1063 SPA_ID_INVALID, NULL, 0, true, port);
1064 port->added = false;
1065 }
1066
1067 if (SPA_FLAG_IS_SET(port->flags, PW_IMPL_PORT_FLAG_TO_REMOVE)) {
1068 if ((res = spa_node_remove_port(node->node, port->direction, port->port_id)) < 0)
1069 pw_log_warn("%p: can't remove: %s", port, spa_strerror(res));
1070 }
1071
1072 if (port->direction == PW_DIRECTION_INPUT) {
1073 pw_map_insert_at(&node->input_port_map, port->port_id, NULL);
1074 node->info.n_input_ports--;
1075 } else {
1076 pw_map_insert_at(&node->output_port_map, port->port_id, NULL);
1077 node->info.n_output_ports--;
1078 }
1079
1080 pw_impl_port_set_mix(port, NULL, 0);
1081
1082 spa_list_remove(&port->link);
1083 pw_impl_node_emit_port_removed(node, port);
1084 port->node = NULL;
1085 }
1086
pw_impl_port_destroy(struct pw_impl_port * port)1087 void pw_impl_port_destroy(struct pw_impl_port *port)
1088 {
1089 struct impl *impl = SPA_CONTAINER_OF(port, struct impl, this);
1090 struct pw_control *control;
1091
1092 pw_log_debug("%p: destroy", port);
1093
1094 port->destroying = true;
1095 pw_impl_port_emit_destroy(port);
1096
1097 pw_impl_port_unlink(port);
1098
1099 pw_log_debug("%p: control destroy", port);
1100 spa_list_consume(control, &port->control_list[0], port_link)
1101 pw_control_destroy(control);
1102 spa_list_consume(control, &port->control_list[1], port_link)
1103 pw_control_destroy(control);
1104
1105 pw_impl_port_remove(port);
1106
1107 if (port->global) {
1108 spa_hook_remove(&port->global_listener);
1109 pw_global_destroy(port->global);
1110 }
1111
1112 pw_log_debug("%p: free", port);
1113 pw_impl_port_emit_free(port);
1114
1115 spa_hook_list_clean(&port->listener_list);
1116
1117 pw_buffers_clear(&port->buffers);
1118 pw_buffers_clear(&port->mix_buffers);
1119 free((void*)port->error);
1120
1121 pw_param_clear(&impl->param_list, SPA_ID_INVALID);
1122 pw_param_clear(&impl->pending_list, SPA_ID_INVALID);
1123
1124 pw_map_clear(&port->mix_port_map);
1125
1126 pw_properties_free(port->properties);
1127
1128 free(port);
1129 }
1130
1131 struct result_port_params_data {
1132 struct impl *impl;
1133 void *data;
1134 int (*callback) (void *data, int seq,
1135 uint32_t id, uint32_t index, uint32_t next,
1136 struct spa_pod *param);
1137 int seq;
1138 uint32_t count;
1139 unsigned int cache:1;
1140 };
1141
result_port_params(void * data,int seq,int res,uint32_t type,const void * result)1142 static void result_port_params(void *data, int seq, int res, uint32_t type, const void *result)
1143 {
1144 struct result_port_params_data *d = data;
1145 struct impl *impl = d->impl;
1146 switch (type) {
1147 case SPA_RESULT_TYPE_NODE_PARAMS:
1148 {
1149 const struct spa_result_node_params *r = result;
1150 if (d->seq == seq) {
1151 d->callback(d->data, seq, r->id, r->index, r->next, r->param);
1152 if (d->cache) {
1153 if (d->count++ == 0)
1154 pw_param_add(&impl->pending_list, r->id, NULL);
1155 pw_param_add(&impl->pending_list, r->id, r->param);
1156 }
1157 }
1158 break;
1159 }
1160 default:
1161 break;
1162 }
1163 }
1164
pw_impl_port_for_each_param(struct pw_impl_port * port,int seq,uint32_t param_id,uint32_t index,uint32_t max,const struct spa_pod * filter,int (* callback)(void * data,int seq,uint32_t id,uint32_t index,uint32_t next,struct spa_pod * param),void * data)1165 int pw_impl_port_for_each_param(struct pw_impl_port *port,
1166 int seq,
1167 uint32_t param_id,
1168 uint32_t index, uint32_t max,
1169 const struct spa_pod *filter,
1170 int (*callback) (void *data, int seq,
1171 uint32_t id, uint32_t index, uint32_t next,
1172 struct spa_pod *param),
1173 void *data)
1174 {
1175 int res;
1176 struct impl *impl = SPA_CONTAINER_OF(port, struct impl, this);
1177 struct pw_impl_node *node = port->node;
1178 struct result_port_params_data user_data = { impl, data, callback, seq, 0, false };
1179 struct spa_hook listener;
1180 struct spa_param_info *pi;
1181 static const struct spa_node_events node_events = {
1182 SPA_VERSION_NODE_EVENTS,
1183 .result = result_port_params,
1184 };
1185
1186 pi = pw_param_info_find(port->info.params, port->info.n_params, param_id);
1187 if (pi == NULL)
1188 return -ENOENT;
1189
1190 if (max == 0)
1191 max = UINT32_MAX;
1192
1193 pw_log_debug("%p: params id:%d (%s) index:%u max:%u cached:%d", port, param_id,
1194 spa_debug_type_find_name(spa_type_param, param_id),
1195 index, max, pi->user);
1196
1197 if (pi->user == 1) {
1198 struct pw_param *p;
1199 uint8_t buffer[1024];
1200 struct spa_pod_builder b = { 0 };
1201 struct spa_result_node_params result;
1202 uint32_t count = 0;
1203
1204 result.id = param_id;
1205 result.next = 0;
1206
1207 spa_list_for_each(p, &impl->param_list, link) {
1208 result.index = result.next++;
1209 if (p->id != param_id)
1210 continue;
1211
1212 if (result.index < index)
1213 continue;
1214
1215 spa_pod_builder_init(&b, buffer, sizeof(buffer));
1216 if (spa_pod_filter(&b, &result.param, p->param, filter) != 0)
1217 continue;
1218
1219 pw_log_debug("%p: %d param %u", port, seq, result.index);
1220 result_port_params(&user_data, seq, 0, SPA_RESULT_TYPE_NODE_PARAMS, &result);
1221
1222 if (++count == max)
1223 break;
1224 }
1225 res = 0;
1226 } else {
1227 user_data.cache = impl->cache_params &&
1228 (filter == NULL && index == 0 && max == UINT32_MAX);
1229
1230 spa_zero(listener);
1231 spa_node_add_listener(node->node, &listener, &node_events, &user_data);
1232 res = spa_node_port_enum_params(node->node, seq,
1233 port->direction, port->port_id,
1234 param_id, index, max,
1235 filter);
1236 spa_hook_remove(&listener);
1237
1238 if (user_data.cache) {
1239 pw_param_update(&impl->param_list, &impl->pending_list);
1240 pi->user = 1;
1241 }
1242 }
1243
1244 pw_log_debug("%p: res %d: (%s)", port, res, spa_strerror(res));
1245 return res;
1246 }
1247
1248 struct param_filter {
1249 struct pw_impl_port *in_port;
1250 struct pw_impl_port *out_port;
1251 int seq;
1252 uint32_t in_param_id;
1253 uint32_t out_param_id;
1254 int (*callback) (void *data, int seq, uint32_t id, uint32_t index,
1255 uint32_t next, struct spa_pod *param);
1256 void *data;
1257 uint32_t n_params;
1258 };
1259
do_filter(void * data,int seq,uint32_t id,uint32_t index,uint32_t next,struct spa_pod * param)1260 static int do_filter(void *data, int seq, uint32_t id, uint32_t index, uint32_t next, struct spa_pod *param)
1261 {
1262 struct param_filter *f = data;
1263 f->n_params++;
1264 return pw_impl_port_for_each_param(f->out_port, seq, f->out_param_id, 0, 0, param, f->callback, f->data);
1265 }
1266
pw_impl_port_for_each_filtered_param(struct pw_impl_port * in_port,struct pw_impl_port * out_port,int seq,uint32_t in_param_id,uint32_t out_param_id,const struct spa_pod * filter,int (* callback)(void * data,int seq,uint32_t id,uint32_t index,uint32_t next,struct spa_pod * param),void * data)1267 int pw_impl_port_for_each_filtered_param(struct pw_impl_port *in_port,
1268 struct pw_impl_port *out_port,
1269 int seq,
1270 uint32_t in_param_id,
1271 uint32_t out_param_id,
1272 const struct spa_pod *filter,
1273 int (*callback) (void *data, int seq,
1274 uint32_t id, uint32_t index, uint32_t next,
1275 struct spa_pod *param),
1276 void *data)
1277 {
1278 int res;
1279 struct param_filter fd = { in_port, out_port, seq, in_param_id, out_param_id, callback, data, 0 };
1280
1281 if ((res = pw_impl_port_for_each_param(in_port, seq, in_param_id, 0, 0, filter, do_filter, &fd)) < 0)
1282 return res;
1283
1284 if (fd.n_params == 0)
1285 res = do_filter(&filter, seq, 0, 0, 0, NULL);
1286
1287 return res;
1288 }
1289
pw_impl_port_for_each_link(struct pw_impl_port * port,int (* callback)(void * data,struct pw_impl_link * link),void * data)1290 int pw_impl_port_for_each_link(struct pw_impl_port *port,
1291 int (*callback) (void *data, struct pw_impl_link *link),
1292 void *data)
1293 {
1294 struct pw_impl_link *l, *t;
1295 int res = 0;
1296
1297 if (port->direction == PW_DIRECTION_OUTPUT) {
1298 spa_list_for_each_safe(l, t, &port->links, output_link)
1299 if ((res = callback(data, l)) != 0)
1300 break;
1301 } else {
1302 spa_list_for_each_safe(l, t, &port->links, input_link)
1303 if ((res = callback(data, l)) != 0)
1304 break;
1305 }
1306 return res;
1307 }
1308
pw_impl_port_recalc_latency(struct pw_impl_port * port)1309 int pw_impl_port_recalc_latency(struct pw_impl_port *port)
1310 {
1311 struct pw_impl_link *l;
1312 struct spa_latency_info latency, *current;
1313 struct pw_impl_port *other;
1314 struct spa_pod *param;
1315 struct spa_pod_builder b = { 0 };
1316 uint8_t buffer[1024];
1317 bool changed;
1318
1319 if (port->destroying)
1320 return 0;
1321
1322 /* given an output port, we calculate the total latency to the sinks or the input
1323 * latency. */
1324 spa_latency_info_combine_start(&latency, SPA_DIRECTION_REVERSE(port->direction));
1325
1326 if (port->direction == PW_DIRECTION_OUTPUT) {
1327 spa_list_for_each(l, &port->links, output_link) {
1328 other = l->input;
1329 spa_latency_info_combine(&latency, &other->latency[other->direction]);
1330 pw_log_debug("port %d: peer %d: latency %f-%f %d-%d %"PRIu64"-%"PRIu64,
1331 port->info.id, other->info.id,
1332 latency.min_quantum, latency.max_quantum,
1333 latency.min_rate, latency.max_rate,
1334 latency.min_ns, latency.max_ns);
1335 }
1336 } else {
1337 spa_list_for_each(l, &port->links, input_link) {
1338 other = l->output;
1339 spa_latency_info_combine(&latency, &other->latency[other->direction]);
1340 pw_log_debug("port %d: peer %d: latency %f-%f %d-%d %"PRIu64"-%"PRIu64,
1341 port->info.id, other->info.id,
1342 latency.min_quantum, latency.max_quantum,
1343 latency.min_rate, latency.max_rate,
1344 latency.min_ns, latency.max_ns);
1345 }
1346 }
1347 spa_latency_info_combine_finish(&latency);
1348
1349 current = &port->latency[latency.direction];
1350
1351 changed = spa_latency_info_compare(current, &latency) != 0;
1352
1353 pw_log_info("port %d: %s %s latency %f-%f %d-%d %"PRIu64"-%"PRIu64,
1354 port->info.id, changed ? "set" : "keep",
1355 pw_direction_as_string(latency.direction),
1356 latency.min_quantum, latency.max_quantum,
1357 latency.min_rate, latency.max_rate,
1358 latency.min_ns, latency.max_ns);
1359
1360 if (!changed)
1361 return 0;
1362
1363 *current = latency;
1364
1365 if (!port->have_latency_param)
1366 return 0;
1367
1368 spa_pod_builder_init(&b, buffer, sizeof(buffer));
1369 param = spa_latency_build(&b, SPA_PARAM_Latency, &latency);
1370 return pw_impl_port_set_param(port, SPA_PARAM_Latency, 0, param);
1371 }
1372
1373 SPA_EXPORT
pw_impl_port_is_linked(struct pw_impl_port * port)1374 int pw_impl_port_is_linked(struct pw_impl_port *port)
1375 {
1376 return spa_list_is_empty(&port->links) ? 0 : 1;
1377 }
1378
1379 SPA_EXPORT
pw_impl_port_set_param(struct pw_impl_port * port,uint32_t id,uint32_t flags,const struct spa_pod * param)1380 int pw_impl_port_set_param(struct pw_impl_port *port, uint32_t id, uint32_t flags,
1381 const struct spa_pod *param)
1382 {
1383 int res;
1384 struct pw_impl_node *node = port->node;
1385
1386 pw_log_debug("%p: %d set param %d %p", port, port->state, id, param);
1387
1388 /* set parameter on node */
1389 res = spa_node_port_set_param(node->node,
1390 port->direction, port->port_id,
1391 id, flags, param);
1392
1393 pw_log_debug("%p: %d set param on node %d:%d id:%d (%s): %d (%s)", port, port->state,
1394 port->direction, port->port_id, id,
1395 spa_debug_type_find_name(spa_type_param, id),
1396 res, spa_strerror(res));
1397
1398 /* set the parameters on all ports of the mixer node if possible */
1399 if (res >= 0) {
1400 struct pw_impl_port_mix *mix;
1401
1402 if (port->direction == PW_DIRECTION_INPUT &&
1403 id == SPA_PARAM_Format && param != NULL &&
1404 !SPA_FLAG_IS_SET(port->flags, PW_IMPL_PORT_FLAG_NO_MIXER)) {
1405 setup_mixer(port, param);
1406 }
1407
1408 spa_list_for_each(mix, &port->mix_list, link) {
1409 spa_node_port_set_param(port->mix,
1410 mix->port.direction, mix->port.port_id,
1411 id, flags, param);
1412 }
1413 spa_node_port_set_param(port->mix,
1414 pw_direction_reverse(port->direction), 0,
1415 id, flags, param);
1416 }
1417
1418 if (id == SPA_PARAM_Format) {
1419 pw_log_debug("%p: %d %p %d", port, port->state, param, res);
1420
1421 if (port->added) {
1422 pw_loop_invoke(node->data_loop, do_remove_port, SPA_ID_INVALID, NULL, 0, true, port);
1423 port->added = false;
1424 }
1425 /* setting the format always destroys the negotiated buffers */
1426 pw_buffers_clear(&port->buffers);
1427 pw_buffers_clear(&port->mix_buffers);
1428
1429 if (param == NULL || res < 0) {
1430 pw_impl_port_update_state(port, PW_IMPL_PORT_STATE_CONFIGURE, 0, NULL);
1431 }
1432 else if (spa_pod_is_fixated(param) <= 0) {
1433 pw_impl_port_update_state(port, PW_IMPL_PORT_STATE_CONFIGURE, 0, NULL);
1434 pw_impl_port_emit_param_changed(port, id);
1435 }
1436 else if (!SPA_RESULT_IS_ASYNC(res)) {
1437 pw_impl_port_update_state(port, PW_IMPL_PORT_STATE_READY, 0, NULL);
1438 }
1439 }
1440 return res;
1441 }
1442
negotiate_mixer_buffers(struct pw_impl_port * port,uint32_t flags,struct spa_buffer ** buffers,uint32_t n_buffers)1443 static int negotiate_mixer_buffers(struct pw_impl_port *port, uint32_t flags,
1444 struct spa_buffer **buffers, uint32_t n_buffers)
1445 {
1446 int res;
1447 struct pw_impl_node *node = port->node;
1448
1449 if (SPA_FLAG_IS_SET(port->mix_flags, PW_IMPL_PORT_MIX_FLAG_MIX_ONLY))
1450 return 0;
1451
1452 if (SPA_FLAG_IS_SET(port->mix_flags, PW_IMPL_PORT_MIX_FLAG_NEGOTIATE)) {
1453 int alloc_flags;
1454
1455 /* try dynamic data */
1456 alloc_flags = PW_BUFFERS_FLAG_DYNAMIC;
1457
1458 pw_log_debug("%p: %d.%d negotiate %d buffers on node: %p",
1459 port, port->direction, port->port_id, n_buffers, node->node);
1460
1461 if (port->added) {
1462 pw_loop_invoke(node->data_loop, do_remove_port, SPA_ID_INVALID, NULL, 0, true, port);
1463 port->added = false;
1464 }
1465
1466 pw_buffers_clear(&port->mix_buffers);
1467
1468 if (n_buffers > 0) {
1469 if ((res = pw_buffers_negotiate(node->context, alloc_flags,
1470 port->mix, 0,
1471 node->node, port->port_id,
1472 &port->mix_buffers)) < 0) {
1473 pw_log_warn("%p: can't negotiate buffers: %s",
1474 port, spa_strerror(res));
1475 return res;
1476 }
1477 buffers = port->mix_buffers.buffers;
1478 n_buffers = port->mix_buffers.n_buffers;
1479 flags = 0;
1480 }
1481 }
1482
1483 pw_log_debug("%p: %d.%d use %d buffers on node: %p",
1484 port, port->direction, port->port_id, n_buffers, node->node);
1485
1486 res = spa_node_port_use_buffers(node->node,
1487 port->direction, port->port_id,
1488 flags, buffers, n_buffers);
1489
1490 if (SPA_RESULT_IS_OK(res)) {
1491 spa_node_port_use_buffers(port->mix,
1492 pw_direction_reverse(port->direction), 0,
1493 0, buffers, n_buffers);
1494 }
1495 if (!port->added && n_buffers > 0) {
1496 pw_loop_invoke(node->data_loop, do_add_port, SPA_ID_INVALID, NULL, 0, false, port);
1497 port->added = true;
1498 }
1499 return res;
1500 }
1501
1502
1503 SPA_EXPORT
pw_impl_port_use_buffers(struct pw_impl_port * port,struct pw_impl_port_mix * mix,uint32_t flags,struct spa_buffer ** buffers,uint32_t n_buffers)1504 int pw_impl_port_use_buffers(struct pw_impl_port *port, struct pw_impl_port_mix *mix, uint32_t flags,
1505 struct spa_buffer **buffers, uint32_t n_buffers)
1506 {
1507 int res = 0, res2;
1508
1509 pw_log_debug("%p: %d:%d.%d: %d buffers flags:%d state:%d n_mix:%d", port,
1510 port->direction, port->port_id, mix->id,
1511 n_buffers, flags, port->state, port->n_mix);
1512
1513 if (n_buffers == 0 && port->state <= PW_IMPL_PORT_STATE_READY)
1514 return 0;
1515
1516 if (n_buffers > 0 && port->state < PW_IMPL_PORT_STATE_READY)
1517 return -EIO;
1518
1519 if (n_buffers == 0) {
1520 if (port->n_mix == 1)
1521 pw_impl_port_update_state(port, PW_IMPL_PORT_STATE_READY, 0, NULL);
1522 }
1523
1524 /* first negotiate with the node, this makes it possible to let the
1525 * node allocate buffer memory if needed */
1526 if (port->state == PW_IMPL_PORT_STATE_READY) {
1527 res = negotiate_mixer_buffers(port, flags, buffers, n_buffers);
1528
1529 if (res < 0) {
1530 pw_log_error("%p: negotiate buffers on node: %d (%s)",
1531 port, res, spa_strerror(res));
1532 pw_impl_port_update_state(port, PW_IMPL_PORT_STATE_ERROR, res,
1533 strdup("can't negotiate buffers on port"));
1534 } else if (n_buffers > 0 && !SPA_RESULT_IS_ASYNC(res)) {
1535 pw_impl_port_update_state(port, PW_IMPL_PORT_STATE_PAUSED, 0, NULL);
1536 }
1537 }
1538
1539 /* then use the buffers on the mixer */
1540 if (!SPA_FLAG_IS_SET(port->mix_flags, PW_IMPL_PORT_MIX_FLAG_MIX_ONLY))
1541 flags &= ~SPA_NODE_BUFFERS_FLAG_ALLOC;
1542
1543 res2 = spa_node_port_use_buffers(port->mix,
1544 mix->port.direction, mix->port.port_id, flags,
1545 buffers, n_buffers);
1546 if (res2 < 0) {
1547 if (res2 != -ENOTSUP && n_buffers > 0) {
1548 pw_log_warn("%p: mix use buffers failed: %d (%s)",
1549 port, res2, spa_strerror(res2));
1550 return res2;
1551 }
1552 }
1553 else if (SPA_RESULT_IS_ASYNC(res2))
1554 res = res2;
1555
1556 return res;
1557 }
1558