1 /* PipeWire
2  *
3  * Copyright © 2018 Wim Taymans
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining a
6  * copy of this software and associated documentation files (the "Software"),
7  * to deal in the Software without restriction, including without limitation
8  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9  * and/or sell copies of the Software, and to permit persons to whom the
10  * Software is furnished to do so, subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice (including the next
13  * paragraph) shall be included in all copies or substantial portions of the
14  * Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.  IN NO EVENT SHALL
19  * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
22  * DEALINGS IN THE SOFTWARE.
23  */
24 
25 #include <string.h>
26 #include <stdlib.h>
27 #include <stdio.h>
28 #include <unistd.h>
29 #include <errno.h>
30 #include <time.h>
31 
32 #include <spa/support/system.h>
33 #include <spa/pod/parser.h>
34 #include <spa/pod/filter.h>
35 #include <spa/node/utils.h>
36 #include <spa/debug/types.h>
37 #include <spa/utils/string.h>
38 
39 #include "pipewire/impl-node.h"
40 #include "pipewire/private.h"
41 
42 PW_LOG_TOPIC_EXTERN(log_node);
43 #define PW_LOG_TOPIC_DEFAULT log_node
44 
45 #define DEFAULT_SYNC_TIMEOUT  ((uint64_t)(5 * SPA_NSEC_PER_SEC))
46 
47 /** \cond */
48 struct impl {
49 	struct pw_impl_node this;
50 
51 	enum pw_node_state pending_state;
52 	uint32_t pending_id;
53 
54 	struct pw_work_queue *work;
55 
56 	int last_error;
57 
58 	struct spa_list param_list;
59 	struct spa_list pending_list;
60 
61 	unsigned int pause_on_idle:1;
62 	unsigned int cache_params:1;
63 	unsigned int pending_play:1;
64 };
65 
66 #define pw_node_resource(r,m,v,...)	pw_resource_call(r,struct pw_node_events,m,v,__VA_ARGS__)
67 #define pw_node_resource_info(r,...)	pw_node_resource(r,info,0,__VA_ARGS__)
68 #define pw_node_resource_param(r,...)	pw_node_resource(r,param,0,__VA_ARGS__)
69 
70 struct resource_data {
71 	struct pw_impl_node *node;
72 
73 	struct pw_resource *resource;
74 	struct spa_hook resource_listener;
75 	struct spa_hook object_listener;
76 
77 	uint32_t subscribe_ids[MAX_PARAMS];
78 	uint32_t n_subscribe_ids;
79 
80 	/* for async replies */
81 	int seq;
82 	int end;
83 	struct spa_hook listener;
84 };
85 
86 /** \endcond */
87 
add_node(struct pw_impl_node * this,struct pw_impl_node * driver)88 static void add_node(struct pw_impl_node *this, struct pw_impl_node *driver)
89 {
90 	struct pw_node_activation_state *dstate, *nstate;
91 	struct pw_node_target *t;
92 
93 	if (this->exported)
94 		return;
95 
96 	pw_log_trace("%p: add to driver %p %p %p", this, driver,
97 			driver->rt.activation, this->rt.activation);
98 
99 	/* signal the driver */
100 	this->rt.driver_target.activation = driver->rt.activation;
101 	this->rt.driver_target.node = driver;
102 	this->rt.driver_target.data = driver;
103 	spa_list_append(&this->rt.target_list, &this->rt.driver_target.link);
104 
105 	spa_list_append(&driver->rt.target_list, &this->rt.target.link);
106 	nstate = &this->rt.activation->state[0];
107 	if (!this->rt.target.active) {
108 		nstate->required++;
109 		this->rt.target.active = true;
110 	}
111 
112 	spa_list_for_each(t, &this->rt.target_list, link) {
113 		dstate = &t->activation->state[0];
114 		if (!t->active) {
115 			dstate->required++;
116 			t->active = true;
117 		}
118 		pw_log_trace("%p: driver state:%p pending:%d/%d, node state:%p pending:%d/%d",
119 				this, dstate, dstate->pending, dstate->required,
120 				nstate, nstate->pending, nstate->required);
121 	}
122 }
123 
remove_node(struct pw_impl_node * this)124 static void remove_node(struct pw_impl_node *this)
125 {
126 	struct pw_node_activation_state *dstate, *nstate;
127 	struct pw_node_target *t;
128 
129 	if (this->exported)
130 		return;
131 
132 	pw_log_trace("%p: remove from driver %p %p %p",
133 			this, this->rt.driver_target.data,
134 			this->rt.driver_target.activation, this->rt.activation);
135 
136 	spa_list_remove(&this->rt.target.link);
137 
138 	nstate = &this->rt.activation->state[0];
139 	if (this->rt.target.active) {
140 		nstate->required--;
141 		this->rt.target.active = false;
142 	}
143 
144 	spa_list_for_each(t, &this->rt.target_list, link) {
145 		dstate = &t->activation->state[0];
146 		if (t->active) {
147 			dstate->required--;
148 			t->active = false;
149 		}
150 		pw_log_trace("%p: driver state:%p pending:%d/%d, node state:%p pending:%d/%d",
151 				this, dstate, dstate->pending, dstate->required,
152 				nstate, nstate->pending, nstate->required);
153 	}
154 	spa_list_remove(&this->rt.driver_target.link);
155 
156 	this->rt.driver_target.node = NULL;
157 }
158 
159 static int
do_node_remove(struct spa_loop * loop,bool async,uint32_t seq,const void * data,size_t size,void * user_data)160 do_node_remove(struct spa_loop *loop,
161 	       bool async, uint32_t seq, const void *data, size_t size, void *user_data)
162 {
163 	struct pw_impl_node *this = user_data;
164 	if (this->source.loop != NULL) {
165 		spa_loop_remove_source(loop, &this->source);
166 		remove_node(this);
167 	}
168 	return 0;
169 }
170 
node_deactivate(struct pw_impl_node * this)171 static void node_deactivate(struct pw_impl_node *this)
172 {
173 	struct pw_impl_port *port;
174 	struct pw_impl_link *link;
175 
176 	pw_log_debug("%p: deactivate", this);
177 	spa_list_for_each(port, &this->input_ports, link) {
178 		spa_list_for_each(link, &port->links, input_link)
179 			pw_impl_link_deactivate(link);
180 	}
181 	spa_list_for_each(port, &this->output_ports, link) {
182 		spa_list_for_each(link, &port->links, output_link)
183 			pw_impl_link_deactivate(link);
184 	}
185 	pw_loop_invoke(this->data_loop, do_node_remove, 1, NULL, 0, true, this);
186 }
187 
pause_node(struct pw_impl_node * this)188 static int pause_node(struct pw_impl_node *this)
189 {
190 	struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this);
191 	int res = 0;
192 
193 	pw_log_debug("%p: pause node state:%s pending:%s pause-on-idle:%d", this,
194 			pw_node_state_as_string(this->info.state),
195 			pw_node_state_as_string(impl->pending_state),
196 			impl->pause_on_idle);
197 
198 	if (impl->pending_state <= PW_NODE_STATE_IDLE)
199 		return 0;
200 
201 	node_deactivate(this);
202 
203 	res = spa_node_send_command(this->node,
204 				    &SPA_NODE_COMMAND_INIT(SPA_NODE_COMMAND_Pause));
205 	if (res < 0)
206 		pw_log_debug("%p: pause node error %s", this, spa_strerror(res));
207 
208 	return res;
209 }
210 
node_activate(struct pw_impl_node * this)211 static void node_activate(struct pw_impl_node *this)
212 {
213 	struct pw_impl_port *port;
214 
215 	pw_log_debug("%p: activate", this);
216 	spa_list_for_each(port, &this->input_ports, link) {
217 		struct pw_impl_link *link;
218 		spa_list_for_each(link, &port->links, input_link)
219 			pw_impl_link_activate(link);
220 	}
221 	spa_list_for_each(port, &this->output_ports, link) {
222 		struct pw_impl_link *link;
223 		spa_list_for_each(link, &port->links, output_link)
224 			pw_impl_link_activate(link);
225 	}
226 }
227 
start_node(struct pw_impl_node * this)228 static int start_node(struct pw_impl_node *this)
229 {
230 	struct impl *impl = SPA_CONTAINER_OF(this, struct impl, this);
231 	int res = 0;
232 
233 	node_activate(this);
234 
235 	if (impl->pending_state >= PW_NODE_STATE_RUNNING)
236 		return 0;
237 
238 	pw_log_debug("%p: start node", this);
239 
240 	if (!(this->driving && this->driver)) {
241 		impl->pending_play = true;
242 		res = spa_node_send_command(this->node,
243 			&SPA_NODE_COMMAND_INIT(SPA_NODE_COMMAND_Start));
244 	}
245 
246 	if (res < 0)
247 		pw_log_error("(%s-%u) start node error %d: %s", this->name, this->info.id,
248 				res, spa_strerror(res));
249 
250 	return res;
251 }
252 
emit_info_changed(struct pw_impl_node * node,bool flags_changed)253 static void emit_info_changed(struct pw_impl_node *node, bool flags_changed)
254 {
255 	if (node->info.change_mask == 0 && !flags_changed)
256 		return;
257 
258 	pw_impl_node_emit_info_changed(node, &node->info);
259 
260 	if (node->global && node->info.change_mask != 0) {
261 		struct pw_resource *resource;
262 		spa_list_for_each(resource, &node->global->resource_list, link)
263 			pw_node_resource_info(resource, &node->info);
264 	}
265 
266 	node->info.change_mask = 0;
267 }
268 
resource_is_subscribed(struct pw_resource * resource,uint32_t id)269 static int resource_is_subscribed(struct pw_resource *resource, uint32_t id)
270 {
271 	struct resource_data *data = pw_resource_get_user_data(resource);
272 	uint32_t i;
273 
274 	for (i = 0; i < data->n_subscribe_ids; i++) {
275 		if (data->subscribe_ids[i] == id)
276 			return 1;
277 	}
278 	return 0;
279 }
280 
notify_param(void * data,int seq,uint32_t id,uint32_t index,uint32_t next,struct spa_pod * param)281 static int notify_param(void *data, int seq, uint32_t id,
282 		uint32_t index, uint32_t next, struct spa_pod *param)
283 {
284 	struct pw_impl_node *node = data;
285 	struct pw_resource *resource;
286 
287 	spa_list_for_each(resource, &node->global->resource_list, link) {
288 		if (!resource_is_subscribed(resource, id))
289 			continue;
290 
291 		pw_log_debug("%p: resource %p notify param %d", node, resource, id);
292 		pw_node_resource_param(resource, seq, id, index, next, param);
293 	}
294 	return 0;
295 }
296 
emit_params(struct pw_impl_node * node,uint32_t * changed_ids,uint32_t n_changed_ids)297 static void emit_params(struct pw_impl_node *node, uint32_t *changed_ids, uint32_t n_changed_ids)
298 {
299 	uint32_t i;
300 	int res;
301 
302 	if (node->global == NULL)
303 		return;
304 
305 	pw_log_debug("%p: emit %d params", node, n_changed_ids);
306 
307 	for (i = 0; i < n_changed_ids; i++) {
308 		struct pw_resource *resource;
309 		int subscribed = 0;
310 
311 		/* first check if anyone is subscribed */
312 		spa_list_for_each(resource, &node->global->resource_list, link) {
313 			if ((subscribed = resource_is_subscribed(resource, changed_ids[i])))
314 				break;
315 		}
316 		if (!subscribed)
317 			continue;
318 
319 		if ((res = pw_impl_node_for_each_param(node, 1, changed_ids[i], 0, UINT32_MAX,
320 					NULL, notify_param, node)) < 0) {
321 			pw_log_error("%p: error %d (%s)", node, res, spa_strerror(res));
322 		}
323 	}
324 }
325 
326 static int
do_node_add(struct spa_loop * loop,bool async,uint32_t seq,const void * data,size_t size,void * user_data)327 do_node_add(struct spa_loop *loop,
328 	    bool async, uint32_t seq, const void *data, size_t size, void *user_data)
329 {
330 	struct pw_impl_node *this = user_data;
331 	struct pw_impl_node *driver = this->driver_node;
332 
333 	if (this->source.loop == NULL) {
334 		spa_loop_add_source(loop, &this->source);
335 		add_node(this, driver);
336 	}
337 	return 0;
338 }
339 
node_update_state(struct pw_impl_node * node,enum pw_node_state state,int res,char * error)340 static void node_update_state(struct pw_impl_node *node, enum pw_node_state state, int res, char *error)
341 {
342 	struct impl *impl = SPA_CONTAINER_OF(node, struct impl, this);
343 	enum pw_node_state old = node->info.state;
344 
345 	switch (state) {
346 	case PW_NODE_STATE_RUNNING:
347 		if (node->driving && node->driver) {
348 			res = spa_node_send_command(node->node,
349 				&SPA_NODE_COMMAND_INIT(SPA_NODE_COMMAND_Start));
350 			if (res < 0) {
351 				state = PW_NODE_STATE_ERROR;
352 				error = spa_aprintf("Start error: %s", spa_strerror(res));
353 			}
354 		}
355 		if (res >= 0)
356 			pw_loop_invoke(node->data_loop, do_node_add, 1, NULL, 0, true, node);
357 		break;
358 	default:
359 		break;
360 	}
361 
362 	free((char*)node->info.error);
363 	node->info.error = error;
364 	node->info.state = state;
365 	impl->pending_state = state;
366 
367 	pw_log_debug("%p: (%s) %s -> %s (%s)", node, node->name,
368 		     pw_node_state_as_string(old), pw_node_state_as_string(state), error);
369 
370 	if (old == state)
371 		return;
372 
373 	if (state == PW_NODE_STATE_ERROR) {
374 		pw_log_error("(%s-%u) %s -> error (%s)", node->name, node->info.id,
375 		     pw_node_state_as_string(old), error);
376 	} else {
377 		pw_log_info("(%s-%u) %s -> %s", node->name, node->info.id,
378 		     pw_node_state_as_string(old), pw_node_state_as_string(state));
379 	}
380 	pw_impl_node_emit_state_changed(node, old, state, error);
381 
382 	node->info.change_mask |= PW_NODE_CHANGE_MASK_STATE;
383 	emit_info_changed(node, false);
384 
385 	if (state == PW_NODE_STATE_ERROR && node->global) {
386 		struct pw_resource *resource;
387 		spa_list_for_each(resource, &node->global->resource_list, link)
388 			pw_resource_error(resource, res, error);
389 	}
390 }
391 
suspend_node(struct pw_impl_node * this)392 static int suspend_node(struct pw_impl_node *this)
393 {
394 	int res = 0;
395 	struct pw_impl_port *p;
396 
397 	pw_log_debug("%p: suspend node state:%s", this,
398 			pw_node_state_as_string(this->info.state));
399 
400 	if (this->info.state > 0 && this->info.state <= PW_NODE_STATE_SUSPENDED)
401 		return 0;
402 
403 	node_deactivate(this);
404 
405 	spa_list_for_each(p, &this->input_ports, link) {
406 		if ((res = pw_impl_port_set_param(p, SPA_PARAM_Format, 0, NULL)) < 0)
407 			pw_log_warn("%p: error unset format input: %s",
408 					this, spa_strerror(res));
409 		/* force CONFIGURE in case of async */
410 		p->state = PW_IMPL_PORT_STATE_CONFIGURE;
411 	}
412 
413 	spa_list_for_each(p, &this->output_ports, link) {
414 		if ((res = pw_impl_port_set_param(p, SPA_PARAM_Format, 0, NULL)) < 0)
415 			pw_log_warn("%p: error unset format output: %s",
416 					this, spa_strerror(res));
417 		/* force CONFIGURE in case of async */
418 		p->state = PW_IMPL_PORT_STATE_CONFIGURE;
419 	}
420 
421 	res = spa_node_send_command(this->node,
422 				    &SPA_NODE_COMMAND_INIT(SPA_NODE_COMMAND_Suspend));
423 	if (res == -ENOTSUP)
424 		res = spa_node_send_command(this->node,
425 				    &SPA_NODE_COMMAND_INIT(SPA_NODE_COMMAND_Pause));
426 	if (res < 0 && res != -EIO)
427 		pw_log_warn("%p: suspend node error %s", this, spa_strerror(res));
428 
429 	node_update_state(this, PW_NODE_STATE_SUSPENDED, 0, NULL);
430 
431 	return res;
432 }
433 
434 static void
clear_info(struct pw_impl_node * this)435 clear_info(struct pw_impl_node *this)
436 {
437 	free(this->name);
438 	free((char*)this->info.error);
439 }
440 
reply_param(void * data,int seq,uint32_t id,uint32_t index,uint32_t next,struct spa_pod * param)441 static int reply_param(void *data, int seq, uint32_t id,
442 		uint32_t index, uint32_t next, struct spa_pod *param)
443 {
444 	struct resource_data *d = data;
445 	pw_log_debug("%p: resource %p reply param %d", d->node, d->resource, seq);
446 	pw_node_resource_param(d->resource, seq, id, index, next, param);
447 	return 0;
448 }
449 
node_enum_params(void * object,int seq,uint32_t id,uint32_t index,uint32_t num,const struct spa_pod * filter)450 static int node_enum_params(void *object, int seq, uint32_t id,
451 		uint32_t index, uint32_t num, const struct spa_pod *filter)
452 {
453 	struct resource_data *data = object;
454 	struct pw_resource *resource = data->resource;
455 	struct pw_impl_node *node = data->node;
456 	int res;
457 
458 	pw_log_debug("%p: resource %p enum params seq:%d id:%d (%s) index:%u num:%u",
459 			node, resource, seq, id,
460 			spa_debug_type_find_name(spa_type_param, id), index, num);
461 
462 	if ((res = pw_impl_node_for_each_param(node, seq, id, index, num,
463 				filter, reply_param, data)) < 0) {
464 		pw_resource_errorf(resource, res,
465 				"enum params id:%d (%s) failed", id,
466 				spa_debug_type_find_name(spa_type_param, id));
467 	}
468 	return 0;
469 }
470 
node_subscribe_params(void * object,uint32_t * ids,uint32_t n_ids)471 static int node_subscribe_params(void *object, uint32_t *ids, uint32_t n_ids)
472 {
473 	struct resource_data *data = object;
474 	struct pw_resource *resource = data->resource;
475 	uint32_t i;
476 
477 	n_ids = SPA_MIN(n_ids, SPA_N_ELEMENTS(data->subscribe_ids));
478 	data->n_subscribe_ids = n_ids;
479 
480 	for (i = 0; i < n_ids; i++) {
481 		data->subscribe_ids[i] = ids[i];
482 		pw_log_debug("%p: resource %p subscribe param id:%d (%s)",
483 				data->node, resource, ids[i],
484 				spa_debug_type_find_name(spa_type_param, ids[i]));
485 		node_enum_params(data, 1, ids[i], 0, UINT32_MAX, NULL);
486 	}
487 	return 0;
488 }
489 
remove_busy_resource(struct resource_data * d)490 static void remove_busy_resource(struct resource_data *d)
491 {
492 	if (d->end != -1) {
493 		spa_hook_remove(&d->listener);
494 		d->end = -1;
495 		pw_impl_client_set_busy(d->resource->client, false);
496 	}
497 }
498 
result_node_sync(void * data,int seq,int res,uint32_t type,const void * result)499 static void result_node_sync(void *data, int seq, int res, uint32_t type, const void *result)
500 {
501 	struct resource_data *d = data;
502 
503 	pw_log_debug("%p: sync result %d %d (%d/%d)", d->node, res, seq, d->seq, d->end);
504 	if (seq == d->end)
505 		remove_busy_resource(d);
506 }
507 
node_set_param(void * object,uint32_t id,uint32_t flags,const struct spa_pod * param)508 static int node_set_param(void *object, uint32_t id, uint32_t flags,
509 		const struct spa_pod *param)
510 {
511 	struct resource_data *data = object;
512 	struct pw_resource *resource = data->resource;
513 	struct pw_impl_node *node = data->node;
514 	struct pw_impl_client *client = resource->client;
515 	int res;
516 	static const struct spa_node_events node_events = {
517 		SPA_VERSION_NODE_EVENTS,
518 		.result = result_node_sync,
519 	};
520 
521 	pw_log_debug("%p: resource %p set param id:%d (%s) %08x", node, resource,
522 			id, spa_debug_type_find_name(spa_type_param, id), flags);
523 
524 	res = spa_node_set_param(node->node, id, flags, param);
525 
526 	if (res < 0) {
527 		pw_resource_errorf(resource, res,
528 				"set param id:%d (%s) flags:%08x failed", id,
529 				spa_debug_type_find_name(spa_type_param, id), flags);
530 
531 	} else if (SPA_RESULT_IS_ASYNC(res)) {
532 		pw_impl_client_set_busy(client, true);
533 		if (data->end == -1)
534 			spa_node_add_listener(node->node, &data->listener,
535 				&node_events, data);
536 		data->seq = res;
537 		data->end = spa_node_sync(node->node, res);
538 	}
539 	return 0;
540 }
541 
node_send_command(void * object,const struct spa_command * command)542 static int node_send_command(void *object, const struct spa_command *command)
543 {
544 	struct resource_data *data = object;
545 	struct pw_impl_node *node = data->node;
546 
547 	switch (SPA_NODE_COMMAND_ID(command)) {
548 	case SPA_NODE_COMMAND_Suspend:
549 		suspend_node(node);
550 		break;
551 	default:
552 		spa_node_send_command(node->node, command);
553 		break;
554 	}
555 	return 0;
556 }
557 
558 static const struct pw_node_methods node_methods = {
559 	PW_VERSION_NODE_METHODS,
560 	.subscribe_params = node_subscribe_params,
561 	.enum_params = node_enum_params,
562 	.set_param = node_set_param,
563 	.send_command = node_send_command
564 };
565 
resource_destroy(void * data)566 static void resource_destroy(void *data)
567 {
568 	struct resource_data *d = data;
569 	remove_busy_resource(d);
570 	spa_hook_remove(&d->resource_listener);
571 	spa_hook_remove(&d->object_listener);
572 }
573 
resource_pong(void * data,int seq)574 static void resource_pong(void *data, int seq)
575 {
576 	struct resource_data *d = data;
577 	struct pw_resource *resource = d->resource;
578 	pw_log_debug("%p: resource %p: got pong %d", d->node,
579 			resource, seq);
580 }
581 
582 static const struct pw_resource_events resource_events = {
583 	PW_VERSION_RESOURCE_EVENTS,
584 	.destroy = resource_destroy,
585 	.pong = resource_pong,
586 };
587 
588 static int
global_bind(void * _data,struct pw_impl_client * client,uint32_t permissions,uint32_t version,uint32_t id)589 global_bind(void *_data, struct pw_impl_client *client, uint32_t permissions,
590 	    uint32_t version, uint32_t id)
591 {
592 	struct pw_impl_node *this = _data;
593 	struct pw_global *global = this->global;
594 	struct pw_resource *resource;
595 	struct resource_data *data;
596 
597 	resource = pw_resource_new(client, id, permissions, global->type, version, sizeof(*data));
598 	if (resource == NULL)
599 		goto error_resource;
600 
601 	data = pw_resource_get_user_data(resource);
602 	data->node = this;
603 	data->resource = resource;
604 	data->end = -1;
605 
606 	pw_resource_add_listener(resource,
607 			&data->resource_listener,
608 			&resource_events, data);
609 	pw_resource_add_object_listener(resource,
610 			&data->object_listener,
611 			&node_methods, data);
612 
613 	pw_log_debug("%p: bound to %d", this, resource->id);
614 	pw_global_add_resource(global, resource);
615 
616 	this->info.change_mask = PW_NODE_CHANGE_MASK_ALL;
617 	pw_node_resource_info(resource, &this->info);
618 	this->info.change_mask = 0;
619 
620 	return 0;
621 
622 error_resource:
623 	pw_log_error("%p: can't create node resource: %m", this);
624 	return -errno;
625 }
626 
global_destroy(void * data)627 static void global_destroy(void *data)
628 {
629 	struct pw_impl_node *this = data;
630 	spa_hook_remove(&this->global_listener);
631 	this->global = NULL;
632 	pw_impl_node_destroy(this);
633 }
634 
635 static const struct pw_global_events global_events = {
636 	PW_VERSION_GLOBAL_EVENTS,
637 	.destroy = global_destroy,
638 };
639 
insert_driver(struct pw_context * context,struct pw_impl_node * node)640 static inline void insert_driver(struct pw_context *context, struct pw_impl_node *node)
641 {
642 	struct pw_impl_node *n, *t;
643 
644 	spa_list_for_each_safe(n, t, &context->driver_list, driver_link) {
645 		if (n->priority_driver < node->priority_driver)
646 			break;
647 	}
648 	spa_list_append(&n->driver_link, &node->driver_link);
649 }
650 
update_io(struct pw_impl_node * node)651 static void update_io(struct pw_impl_node *node)
652 {
653 	pw_log_debug("%p: id:%d", node, node->info.id);
654 
655 	if (spa_node_set_io(node->node,
656 			    SPA_IO_Position,
657 			    &node->rt.activation->position,
658 			    sizeof(struct spa_io_position)) >= 0) {
659 		pw_log_debug("%p: set position %p", node, &node->rt.activation->position);
660 		node->rt.position = &node->rt.activation->position;
661 
662 		node->current_rate = node->rt.position->clock.rate;
663 		node->current_quantum = node->rt.position->clock.duration;
664 	} else if (node->driver) {
665 		pw_log_warn("%p: can't set position on driver", node);
666 	}
667 	if (spa_node_set_io(node->node,
668 			    SPA_IO_Clock,
669 			    &node->rt.activation->position.clock,
670 			    sizeof(struct spa_io_clock)) >= 0) {
671 		pw_log_debug("%p: set clock %p", node, &node->rt.activation->position.clock);
672 		node->rt.clock = &node->rt.activation->position.clock;
673 	}
674 }
675 
676 SPA_EXPORT
pw_impl_node_register(struct pw_impl_node * this,struct pw_properties * properties)677 int pw_impl_node_register(struct pw_impl_node *this,
678 		     struct pw_properties *properties)
679 {
680 	static const char * const keys[] = {
681 		PW_KEY_OBJECT_SERIAL,
682 		PW_KEY_OBJECT_PATH,
683 		PW_KEY_MODULE_ID,
684 		PW_KEY_FACTORY_ID,
685 		PW_KEY_CLIENT_ID,
686 		PW_KEY_DEVICE_ID,
687 		PW_KEY_PRIORITY_SESSION,
688 		PW_KEY_PRIORITY_DRIVER,
689 		PW_KEY_APP_NAME,
690 		PW_KEY_NODE_DESCRIPTION,
691 		PW_KEY_NODE_NAME,
692 		PW_KEY_NODE_NICK,
693 		PW_KEY_NODE_SESSION,
694 		PW_KEY_MEDIA_CLASS,
695 		PW_KEY_MEDIA_TYPE,
696 		PW_KEY_MEDIA_CATEGORY,
697 		PW_KEY_MEDIA_ROLE,
698 		NULL
699 	};
700 
701 	struct pw_context *context = this->context;
702 	struct pw_impl_port *port;
703 
704 	pw_log_debug("%p: register", this);
705 
706 	if (this->registered)
707 		goto error_existed;
708 
709 	this->global = pw_global_new(context,
710 				     PW_TYPE_INTERFACE_Node,
711 				     PW_VERSION_NODE,
712 				     properties,
713 				     global_bind,
714 				     this);
715 	if (this->global == NULL)
716 		return -errno;
717 
718 	spa_list_append(&context->node_list, &this->link);
719 	if (this->driver)
720 		insert_driver(context, this);
721 	this->registered = true;
722 
723 	this->rt.activation->position.clock.id = this->global->id;
724 
725 	this->info.id = this->global->id;
726 	pw_properties_setf(this->properties, PW_KEY_OBJECT_ID, "%d", this->info.id);
727 	pw_properties_setf(this->properties, PW_KEY_OBJECT_SERIAL, "%"PRIu64,
728 			pw_global_get_serial(this->global));
729 	this->info.props = &this->properties->dict;
730 
731 	pw_global_update_keys(this->global, &this->properties->dict, keys);
732 
733 	pw_impl_node_initialized(this);
734 
735 	pw_global_add_listener(this->global, &this->global_listener, &global_events, this);
736 	pw_global_register(this->global);
737 
738 	if (this->node)
739 		update_io(this);
740 
741 	spa_list_for_each(port, &this->input_ports, link)
742 		pw_impl_port_register(port, NULL);
743 	spa_list_for_each(port, &this->output_ports, link)
744 		pw_impl_port_register(port, NULL);
745 
746 	if (this->active)
747 		pw_context_recalc_graph(context, "register active node");
748 
749 	return 0;
750 
751 error_existed:
752 	pw_properties_free(properties);
753 	return -EEXIST;
754 }
755 
756 SPA_EXPORT
pw_impl_node_initialized(struct pw_impl_node * this)757 int pw_impl_node_initialized(struct pw_impl_node *this)
758 {
759 	pw_log_debug("%p initialized", this);
760 	pw_impl_node_emit_initialized(this);
761 	node_update_state(this, PW_NODE_STATE_SUSPENDED, 0, NULL);
762 	return 0;
763 }
764 
765 static int
do_move_nodes(struct spa_loop * loop,bool async,uint32_t seq,const void * data,size_t size,void * user_data)766 do_move_nodes(struct spa_loop *loop,
767 		bool async, uint32_t seq, const void *data, size_t size, void *user_data)
768 {
769 	struct impl *impl = user_data;
770 	struct pw_impl_node *driver = *(struct pw_impl_node **)data;
771 	struct pw_impl_node *node = &impl->this;
772 	int res;
773 
774 	pw_log_trace("%p: driver:%p->%p", node, node->driver_node, driver);
775 
776 	if ((res = spa_node_set_io(node->node,
777 		    SPA_IO_Position,
778 		    &driver->rt.activation->position,
779 		    sizeof(struct spa_io_position))) < 0) {
780 		pw_log_debug("%p: set position: %s", node, spa_strerror(res));
781 	}
782 
783 	pw_log_trace("%p: set position %p", node, &driver->rt.activation->position);
784 	node->rt.position = &driver->rt.activation->position;
785 
786 	node->current_rate = node->rt.position->clock.rate;
787 	node->current_quantum = node->rt.position->clock.duration;
788 
789 	if (node->source.loop != NULL) {
790 		remove_node(node);
791 		add_node(node, driver);
792 	}
793 	return 0;
794 }
795 
remove_segment_owner(struct pw_impl_node * driver,uint32_t node_id)796 static void remove_segment_owner(struct pw_impl_node *driver, uint32_t node_id)
797 {
798 	struct pw_node_activation *a = driver->rt.activation;
799 	ATOMIC_CAS(a->segment_owner[0], node_id, 0);
800 	ATOMIC_CAS(a->segment_owner[1], node_id, 0);
801 }
802 
803 SPA_EXPORT
pw_impl_node_set_driver(struct pw_impl_node * node,struct pw_impl_node * driver)804 int pw_impl_node_set_driver(struct pw_impl_node *node, struct pw_impl_node *driver)
805 {
806 	struct impl *impl = SPA_CONTAINER_OF(node, struct impl, this);
807 	struct pw_impl_node *old = node->driver_node;
808 
809 	if (driver == NULL)
810 		driver = node;
811 
812 	spa_list_remove(&node->follower_link);
813 	spa_list_append(&driver->follower_list, &node->follower_link);
814 
815 	if (old == driver)
816 		return 0;
817 
818 	remove_segment_owner(old, node->info.id);
819 
820 	if (old != node && old->driving && driver->info.state < PW_NODE_STATE_RUNNING) {
821 		driver->current_rate = old->current_rate;
822 		driver->current_quantum = old->current_quantum;
823 		driver->current_pending = true;
824 		pw_log_info("move quantum:%"PRIu64" rate:%d (%s-%d -> %s-%d)",
825 				driver->current_quantum,
826 				driver->current_rate.denom,
827 				old->name, old->info.id,
828 				driver->name, driver->info.id);
829 	}
830 	node->driving = node->driver && driver == node;
831 
832 	pw_log_debug("%p: driver %p driving:%u", node,
833 		driver, node->driving);
834 	pw_log_info("(%s-%u) -> change driver (%s-%d -> %s-%d)",
835 			node->name, node->info.id,
836 			old->name, old->info.id, driver->name, driver->info.id);
837 
838 	node->driver_node = driver;
839 
840 	pw_loop_invoke(node->data_loop,
841 		       do_move_nodes, SPA_ID_INVALID, &driver, sizeof(struct pw_impl_node *),
842 		       true, impl);
843 
844 	pw_impl_node_emit_driver_changed(node, old, driver);
845 
846 	return 0;
847 }
848 
check_properties(struct pw_impl_node * node)849 static void check_properties(struct pw_impl_node *node)
850 {
851 	struct impl *impl = SPA_CONTAINER_OF(node, struct impl, this);
852 	struct pw_context *context = node->context;
853 	const char *str, *recalc_reason = NULL;
854 	struct spa_fraction frac;
855 	bool driver;
856 
857 	if ((str = pw_properties_get(node->properties, PW_KEY_PRIORITY_DRIVER))) {
858 		node->priority_driver = pw_properties_parse_int(str);
859 		pw_log_debug("%p: priority driver %d", node, node->priority_driver);
860 	}
861 
862 	if ((str = pw_properties_get(node->properties, PW_KEY_NODE_NAME)) &&
863 	    (node->name == NULL || !spa_streq(node->name, str))) {
864 		free(node->name);
865 		node->name = strdup(str);
866 		pw_log_debug("%p: name '%s'", node, node->name);
867 	}
868 
869 	impl->pause_on_idle = pw_properties_get_bool(node->properties, PW_KEY_NODE_PAUSE_ON_IDLE, true);
870 	impl->cache_params =  pw_properties_get_bool(node->properties, PW_KEY_NODE_CACHE_PARAMS, true);
871 	node->transport_sync = pw_properties_get_bool(node->properties, "node.transport.sync", false);
872 	driver = pw_properties_get_bool(node->properties, PW_KEY_NODE_DRIVER, false);
873 
874 	if (node->driver != driver) {
875 		pw_log_debug("%p: driver %d -> %d", node, node->driver, driver);
876 		node->driver = driver;
877 		if (node->registered) {
878 			if (driver)
879 				insert_driver(context, node);
880 			else
881 				spa_list_remove(&node->driver_link);
882 		}
883 		recalc_reason = "driver changed";
884 	}
885 
886 	/* not scheduled automatically so we add an additional required trigger */
887 	if (pw_properties_get_bool(node->properties, PW_KEY_NODE_TRIGGER, false))
888 		node->rt.activation->state[0].required++;
889 
890 	/* group defines what nodes are scheduled together */
891 	if ((str = pw_properties_get(node->properties, PW_KEY_NODE_GROUP)) == NULL)
892 		str = "";
893 
894 	if (!spa_streq(str, node->group)) {
895 		pw_log_info("%p: group '%s'->'%s'", node, node->group, str);
896 		snprintf(node->group, sizeof(node->group), "%s", str);
897 		node->freewheel = spa_streq(node->group, "pipewire.freewheel");
898 		recalc_reason = "group changed";
899 	}
900 
901 
902 	node->want_driver = pw_properties_get_bool(node->properties, PW_KEY_NODE_WANT_DRIVER, false);
903 	node->always_process = pw_properties_get_bool(node->properties, PW_KEY_NODE_ALWAYS_PROCESS, false);
904 
905 	if (node->always_process)
906 		node->want_driver = true;
907 
908 	if ((str = pw_properties_get(node->properties, PW_KEY_NODE_LATENCY))) {
909                 if (sscanf(str, "%u/%u", &frac.num, &frac.denom) == 2 && frac.denom != 0) {
910 			if (node->latency.num != frac.num || node->latency.denom != frac.denom) {
911 				pw_log_info("(%s-%u) latency:%u/%u -> %u/%u", node->name,
912 						node->info.id, node->latency.num,
913 						node->latency.denom, frac.num, frac.denom);
914 				node->latency = frac;
915 				recalc_reason = "quantum changed";
916 			}
917 		}
918 	}
919 	if ((str = pw_properties_get(node->properties, PW_KEY_NODE_MAX_LATENCY))) {
920                 if (sscanf(str, "%u/%u", &frac.num, &frac.denom) == 2 && frac.denom != 0) {
921 			if (node->max_latency.num != frac.num || node->max_latency.denom != frac.denom) {
922 				pw_log_info("(%s-%u) max-latency:%u/%u -> %u/%u", node->name,
923 						node->info.id, node->max_latency.num,
924 						node->max_latency.denom, frac.num, frac.denom);
925 				node->max_latency = frac;
926 				recalc_reason = "max quantum changed";
927 			}
928 		}
929 	}
930 	node->lock_quantum = pw_properties_get_bool(node->properties, PW_KEY_NODE_LOCK_QUANTUM, false);
931 
932 	if ((str = pw_properties_get(node->properties, PW_KEY_NODE_RATE))) {
933                 if (sscanf(str, "%u/%u", &frac.num, &frac.denom) == 2 && frac.denom != 0) {
934 			if (node->rate.num != frac.num || node->rate.denom != frac.denom) {
935 				pw_log_info("(%s-%u) rate:%u/%u -> %u/%u", node->name,
936 						node->info.id, node->rate.num,
937 						node->rate.denom, frac.num, frac.denom);
938 				node->rate = frac;
939 				recalc_reason = "node rate changed";
940 			}
941 		}
942 	}
943 	node->lock_rate = pw_properties_get_bool(node->properties, PW_KEY_NODE_LOCK_RATE, false);
944 
945 	pw_log_debug("%p: driver:%d recalc:%s active:%d", node, node->driver,
946 			recalc_reason, node->active);
947 
948 	if (recalc_reason != NULL && node->active)
949 		pw_context_recalc_graph(context, recalc_reason);
950 }
951 
str_status(uint32_t status)952 static const char *str_status(uint32_t status)
953 {
954 	switch (status) {
955 	case PW_NODE_ACTIVATION_NOT_TRIGGERED:
956 		return "not-triggered";
957 	case PW_NODE_ACTIVATION_TRIGGERED:
958 		return "triggered";
959 	case PW_NODE_ACTIVATION_AWAKE:
960 		return "awake";
961 	case PW_NODE_ACTIVATION_FINISHED:
962 		return "finished";
963 	}
964 	return "unknown";
965 }
966 
dump_states(struct pw_impl_node * driver)967 static void dump_states(struct pw_impl_node *driver)
968 {
969 	struct pw_node_target *t;
970 	struct pw_node_activation *na = driver->rt.activation;
971 	struct spa_io_clock *cl = &na->position.clock;
972 
973 	spa_list_for_each(t, &driver->rt.target_list, link) {
974 		struct pw_node_activation *a = t->activation;
975 		struct pw_node_activation_state *state = &a->state[0];
976 		if (t->node == NULL)
977 			continue;
978 		if (a->status == PW_NODE_ACTIVATION_TRIGGERED ||
979 		    a->status == PW_NODE_ACTIVATION_AWAKE) {
980 			pw_log_info("(%s-%u) client too slow! rate:%u/%u pos:%"PRIu64" status:%s",
981 				t->node->name, t->node->info.id,
982 				(uint32_t)(cl->rate.num * cl->duration), cl->rate.denom,
983 				cl->position, str_status(a->status));
984 		}
985 		pw_log_debug("(%s-%u) state:%p pending:%d/%d s:%"PRIu64" a:%"PRIu64" f:%"PRIu64
986 				" waiting:%"PRIu64" process:%"PRIu64" status:%s sync:%d",
987 				t->node->name, t->node->info.id, state,
988 				state->pending, state->required,
989 				a->signal_time,
990 				a->awake_time,
991 				a->finish_time,
992 				a->awake_time - a->signal_time,
993 				a->finish_time - a->awake_time,
994 				str_status(a->status), a->pending_sync);
995 	}
996 }
997 
resume_node(struct pw_impl_node * this,int status)998 static inline int resume_node(struct pw_impl_node *this, int status)
999 {
1000 	struct pw_node_target *t;
1001 	struct timespec ts;
1002 	struct pw_node_activation *activation = this->rt.activation;
1003 	struct spa_system *data_system = this->context->data_system;
1004 	uint64_t nsec;
1005 
1006 	spa_system_clock_gettime(data_system, CLOCK_MONOTONIC, &ts);
1007 	nsec = SPA_TIMESPEC_TO_NSEC(&ts);
1008 	activation->status = PW_NODE_ACTIVATION_FINISHED;
1009 	activation->finish_time = nsec;
1010 
1011 	pw_log_trace_fp("%p: trigger peers %"PRIu64, this, nsec);
1012 
1013 	spa_list_for_each(t, &this->rt.target_list, link) {
1014 		struct pw_node_activation *a = t->activation;
1015 		struct pw_node_activation_state *state = &a->state[0];
1016 
1017 		pw_log_trace_fp("%p: state:%p pending:%d/%d", t->node, state,
1018                                 state->pending, state->required);
1019 
1020 		if (pw_node_activation_state_dec(state, 1)) {
1021 			a->status = PW_NODE_ACTIVATION_TRIGGERED;
1022 			a->signal_time = nsec;
1023 			t->signal(t->data);
1024 		}
1025 	}
1026 	return 0;
1027 }
1028 
calculate_stats(struct pw_impl_node * this,struct pw_node_activation * a)1029 static inline void calculate_stats(struct pw_impl_node *this,  struct pw_node_activation *a)
1030 {
1031 	if (SPA_LIKELY(a->signal_time > a->prev_signal_time)) {
1032 		uint64_t process_time = a->finish_time - a->signal_time;
1033 		uint64_t period_time = a->signal_time - a->prev_signal_time;
1034 		float load = (float) process_time / (float) period_time;
1035 		a->cpu_load[0] = (a->cpu_load[0] + load) / 2.0f;
1036 		a->cpu_load[1] = (a->cpu_load[1] * 7.0f + load) / 8.0f;
1037 		a->cpu_load[2] = (a->cpu_load[2] * 31.0f + load) / 32.0f;
1038 	}
1039 }
1040 
process_node(void * data)1041 static inline int process_node(void *data)
1042 {
1043 	struct pw_impl_node *this = data;
1044 	struct timespec ts;
1045         struct pw_impl_port *p;
1046 	struct pw_node_activation *a = this->rt.activation;
1047 	struct spa_system *data_system = this->context->data_system;
1048 	int status;
1049 
1050 	spa_system_clock_gettime(data_system, CLOCK_MONOTONIC, &ts);
1051 	a->status = PW_NODE_ACTIVATION_AWAKE;
1052 	a->awake_time = SPA_TIMESPEC_TO_NSEC(&ts);
1053 
1054 	pw_log_trace_fp("%p: process %"PRIu64, this, a->awake_time);
1055 
1056 	/* when transport sync is not supported, just clear the flag */
1057 	if (!this->transport_sync)
1058 		a->pending_sync = false;
1059 
1060 	spa_list_for_each(p, &this->rt.input_mix, rt.node_link)
1061 		spa_node_process(p->mix);
1062 
1063 	status = spa_node_process(this->node);
1064 	a->state[0].status = status;
1065 
1066 	if (status & SPA_STATUS_HAVE_DATA) {
1067 		spa_list_for_each(p, &this->rt.output_mix, rt.node_link)
1068 			spa_node_process(p->mix);
1069 	}
1070 
1071 	if (SPA_UNLIKELY(this == this->driver_node && !this->exported)) {
1072 		spa_system_clock_gettime(data_system, CLOCK_MONOTONIC, &ts);
1073 		a->status = PW_NODE_ACTIVATION_FINISHED;
1074 		a->signal_time = a->finish_time;
1075 		a->finish_time = SPA_TIMESPEC_TO_NSEC(&ts);
1076 
1077 		/* calculate CPU time */
1078 		calculate_stats(this, a);
1079 
1080 		pw_log_trace_fp("%p: graph completed wait:%"PRIu64" run:%"PRIu64
1081 				" busy:%"PRIu64" period:%"PRIu64" cpu:%f:%f:%f", this,
1082 				a->awake_time - a->signal_time,
1083 				a->finish_time - a->awake_time,
1084 				a->finish_time - a->signal_time,
1085 				a->signal_time - a->prev_signal_time,
1086 				a->cpu_load[0], a->cpu_load[1], a->cpu_load[2]);
1087 
1088 		pw_context_driver_emit_complete(this->context, this);
1089 
1090 	} else if (status == SPA_STATUS_OK) {
1091 		pw_log_trace_fp("%p: async continue", this);
1092 	} else {
1093 		resume_node(this, status);
1094 	}
1095 	if (status & SPA_STATUS_DRAINED) {
1096 		pw_context_driver_emit_drained(this->context, this);
1097 	}
1098 	return 0;
1099 }
1100 
node_on_fd_events(struct spa_source * source)1101 static void node_on_fd_events(struct spa_source *source)
1102 {
1103 	struct pw_impl_node *this = source->data;
1104 	struct spa_system *data_system = this->context->data_system;
1105 
1106 	if (SPA_UNLIKELY(source->rmask & (SPA_IO_ERR | SPA_IO_HUP))) {
1107 		pw_log_warn("%p: got socket error %08x", this, source->rmask);
1108 		return;
1109 	}
1110 
1111 	if (SPA_LIKELY(source->rmask & SPA_IO_IN)) {
1112 		uint64_t cmd;
1113 
1114 		if (SPA_UNLIKELY(spa_system_eventfd_read(data_system, this->source.fd, &cmd) < 0))
1115 			pw_log_warn("%p: read failed %m", this);
1116 		else if (SPA_UNLIKELY(cmd > 1))
1117 			pw_log_info("(%s-%u) client missed %"PRIu64" wakeups",
1118 				this->name, this->info.id, cmd - 1);
1119 
1120 		pw_log_trace_fp("%p: got process", this);
1121 		this->rt.target.signal(this->rt.target.data);
1122 	}
1123 }
1124 
reset_segment(struct spa_io_segment * seg)1125 static void reset_segment(struct spa_io_segment *seg)
1126 {
1127 	spa_zero(*seg);
1128 	seg->rate = 1.0;
1129 }
1130 
reset_position(struct pw_impl_node * this,struct spa_io_position * pos)1131 static void reset_position(struct pw_impl_node *this, struct spa_io_position *pos)
1132 {
1133 	uint32_t i;
1134 	struct settings *s = &this->context->settings;
1135 	uint32_t quantum = s->clock_force_quantum == 0 ? s->clock_quantum : s->clock_force_quantum;
1136 	uint32_t rate = s->clock_force_rate == 0 ? s->clock_rate : s->clock_force_rate;
1137 
1138 	this->current_rate = SPA_FRACTION(1, rate);
1139 	this->current_quantum = quantum;
1140 
1141 	pos->clock.rate = this->current_rate;
1142 	pos->clock.duration = this->current_quantum;
1143 	pos->video.flags = SPA_IO_VIDEO_SIZE_VALID;
1144 	pos->video.size = s->video_size;
1145 	pos->video.stride = pos->video.size.width * 16;
1146 	pos->video.framerate = s->video_rate;
1147 	pos->offset = INT64_MIN;
1148 
1149 	pos->n_segments = 1;
1150 	for (i = 0; i < SPA_IO_POSITION_MAX_SEGMENTS; i++)
1151 		reset_segment(&pos->segments[i]);
1152 }
1153 
1154 SPA_EXPORT
pw_context_create_node(struct pw_context * context,struct pw_properties * properties,size_t user_data_size)1155 struct pw_impl_node *pw_context_create_node(struct pw_context *context,
1156 			    struct pw_properties *properties,
1157 			    size_t user_data_size)
1158 {
1159 	struct impl *impl;
1160 	struct pw_impl_node *this;
1161 	size_t size;
1162 	struct spa_system *data_system = context->data_system;
1163 	int res;
1164 
1165 	impl = calloc(1, sizeof(struct impl) + user_data_size);
1166 	if (impl == NULL) {
1167 		res = -errno;
1168 		goto error_exit;
1169 	}
1170 
1171 	spa_list_init(&impl->param_list);
1172 	spa_list_init(&impl->pending_list);
1173 
1174 	this = &impl->this;
1175 	this->context = context;
1176 	this->name = strdup("node");
1177 
1178 	if (user_data_size > 0)
1179                 this->user_data = SPA_PTROFF(impl, sizeof(struct impl), void);
1180 
1181 	if (properties == NULL)
1182 		properties = pw_properties_new(NULL, NULL);
1183 	if (properties == NULL) {
1184 		res = -errno;
1185 		goto error_clean;
1186 	}
1187 
1188 	this->properties = properties;
1189 
1190 	if ((res = spa_system_eventfd_create(data_system, SPA_FD_CLOEXEC | SPA_FD_NONBLOCK)) < 0)
1191 		goto error_clean;
1192 
1193 	pw_log_debug("%p: new fd:%d", this, res);
1194 
1195 	this->source.fd = res;
1196 	this->source.func = node_on_fd_events;
1197 	this->source.data = this;
1198 	this->source.mask = SPA_IO_IN | SPA_IO_ERR | SPA_IO_HUP;
1199 	this->source.rmask = 0;
1200 
1201 	size = sizeof(struct pw_node_activation);
1202 
1203 	this->activation = pw_mempool_alloc(this->context->pool,
1204 			PW_MEMBLOCK_FLAG_READWRITE |
1205 			PW_MEMBLOCK_FLAG_SEAL |
1206 			PW_MEMBLOCK_FLAG_MAP,
1207 			SPA_DATA_MemFd, size);
1208 	if (this->activation == NULL) {
1209 		res = -errno;
1210                 goto error_clean;
1211 	}
1212 
1213 	impl->work = pw_context_get_work_queue(this->context);
1214 	if (impl->work == NULL) {
1215 		res = -errno;
1216 		goto error_clean;
1217 	}
1218 	impl->pending_id = SPA_ID_INVALID;
1219 
1220 	this->data_loop = context->data_loop;
1221 
1222 	spa_list_init(&this->follower_list);
1223 
1224 	spa_hook_list_init(&this->listener_list);
1225 
1226 	this->info.state = PW_NODE_STATE_CREATING;
1227 	this->info.props = &this->properties->dict;
1228 	this->info.params = this->params;
1229 
1230 	spa_list_init(&this->input_ports);
1231 	pw_map_init(&this->input_port_map, 64, 64);
1232 	spa_list_init(&this->output_ports);
1233 	pw_map_init(&this->output_port_map, 64, 64);
1234 
1235 	spa_list_init(&this->rt.input_mix);
1236 	spa_list_init(&this->rt.output_mix);
1237 	spa_list_init(&this->rt.target_list);
1238 
1239 	this->rt.activation = this->activation->map->ptr;
1240 	this->rt.target.activation = this->rt.activation;
1241 	this->rt.target.node = this;
1242 	this->rt.target.signal = process_node;
1243 	this->rt.target.data = this;
1244 	this->rt.driver_target.signal = process_node;
1245 
1246 	reset_position(this, &this->rt.activation->position);
1247 	this->rt.activation->sync_timeout = DEFAULT_SYNC_TIMEOUT;
1248 	this->rt.activation->sync_left = 0;
1249 
1250 	this->rt.rate_limit.interval = 2 * SPA_NSEC_PER_SEC;
1251 	this->rt.rate_limit.burst = 1;
1252 
1253 	check_properties(this);
1254 
1255 	this->driver_node = this;
1256 	spa_list_append(&this->follower_list, &this->follower_link);
1257 	this->driving = true;
1258 
1259 	return this;
1260 
1261 error_clean:
1262 	if (this->activation)
1263 		pw_memblock_unref(this->activation);
1264 	if (this->source.fd != -1)
1265 		spa_system_close(this->context->data_system, this->source.fd);
1266 	free(impl);
1267 error_exit:
1268 	pw_properties_free(properties);
1269 	errno = -res;
1270 	return NULL;
1271 }
1272 
1273 SPA_EXPORT
pw_impl_node_get_info(struct pw_impl_node * node)1274 const struct pw_node_info *pw_impl_node_get_info(struct pw_impl_node *node)
1275 {
1276 	return &node->info;
1277 }
1278 
1279 SPA_EXPORT
pw_impl_node_get_user_data(struct pw_impl_node * node)1280 void * pw_impl_node_get_user_data(struct pw_impl_node *node)
1281 {
1282 	return node->user_data;
1283 }
1284 
1285 SPA_EXPORT
pw_impl_node_get_context(struct pw_impl_node * node)1286 struct pw_context * pw_impl_node_get_context(struct pw_impl_node *node)
1287 {
1288 	return node->context;
1289 }
1290 
1291 SPA_EXPORT
pw_impl_node_get_global(struct pw_impl_node * node)1292 struct pw_global *pw_impl_node_get_global(struct pw_impl_node *node)
1293 {
1294 	return node->global;
1295 }
1296 
1297 SPA_EXPORT
pw_impl_node_get_properties(struct pw_impl_node * node)1298 const struct pw_properties *pw_impl_node_get_properties(struct pw_impl_node *node)
1299 {
1300 	return node->properties;
1301 }
1302 
update_properties(struct pw_impl_node * node,const struct spa_dict * dict,bool filter)1303 static int update_properties(struct pw_impl_node *node, const struct spa_dict *dict, bool filter)
1304 {
1305 	static const char * const ignored[] = {
1306 		PW_KEY_OBJECT_ID,
1307 		PW_KEY_MODULE_ID,
1308 		PW_KEY_FACTORY_ID,
1309 		PW_KEY_CLIENT_ID,
1310 		PW_KEY_DEVICE_ID,
1311 		NULL
1312 	};
1313 
1314 	int changed;
1315 
1316 	changed = pw_properties_update_ignore(node->properties, dict, filter ? ignored : NULL);
1317 	node->info.props = &node->properties->dict;
1318 
1319 	pw_log_debug("%p: updated %d properties", node, changed);
1320 
1321 	if (changed) {
1322 		check_properties(node);
1323 		node->info.change_mask |= PW_NODE_CHANGE_MASK_PROPS;
1324 	}
1325 	return changed;
1326 }
1327 
1328 SPA_EXPORT
pw_impl_node_update_properties(struct pw_impl_node * node,const struct spa_dict * dict)1329 int pw_impl_node_update_properties(struct pw_impl_node *node, const struct spa_dict *dict)
1330 {
1331 	int changed = update_properties(node, dict, false);
1332 	emit_info_changed(node, false);
1333 	return changed;
1334 }
1335 
node_info(void * data,const struct spa_node_info * info)1336 static void node_info(void *data, const struct spa_node_info *info)
1337 {
1338 	struct pw_impl_node *node = data;
1339 	uint32_t changed_ids[MAX_PARAMS], n_changed_ids = 0;
1340 	bool flags_changed = false;
1341 
1342 	node->info.max_input_ports = info->max_input_ports;
1343 	node->info.max_output_ports = info->max_output_ports;
1344 
1345 	pw_log_debug("%p: flags:%08"PRIx64" change_mask:%08"PRIx64" max_in:%u max_out:%u",
1346 			node, info->flags, info->change_mask, info->max_input_ports,
1347 			info->max_output_ports);
1348 
1349 	if (info->change_mask & SPA_NODE_CHANGE_MASK_FLAGS) {
1350 		if (node->spa_flags != info->flags) {
1351 			flags_changed = node->spa_flags != 0;
1352 			pw_log_debug("%p: flags %"PRIu64"->%"PRIu64, node, node->spa_flags, info->flags);
1353 			node->spa_flags = info->flags;
1354 		}
1355 	}
1356 	if (info->change_mask & SPA_NODE_CHANGE_MASK_PROPS) {
1357 		update_properties(node, info->props, true);
1358 	}
1359 	if (info->change_mask & SPA_NODE_CHANGE_MASK_PARAMS) {
1360 		uint32_t i;
1361 
1362 		node->info.change_mask |= PW_NODE_CHANGE_MASK_PARAMS;
1363 		node->info.n_params = SPA_MIN(info->n_params, SPA_N_ELEMENTS(node->params));
1364 
1365 		for (i = 0; i < node->info.n_params; i++) {
1366 			uint32_t id = info->params[i].id;
1367 
1368 			pw_log_debug("%p: param %d id:%d (%s) %08x:%08x", node, i,
1369 					id, spa_debug_type_find_name(spa_type_param, id),
1370 					node->info.params[i].flags, info->params[i].flags);
1371 
1372 			node->info.params[i].id = info->params[i].id;
1373 			if (node->info.params[i].flags == info->params[i].flags)
1374 				continue;
1375 
1376 			pw_log_debug("%p: update param %d", node, id);
1377 			node->info.params[i] = info->params[i];
1378 			node->info.params[i].user = 0;
1379 
1380 			if (info->params[i].flags & SPA_PARAM_INFO_READ)
1381 				changed_ids[n_changed_ids++] = id;
1382 		}
1383 	}
1384 	emit_info_changed(node, flags_changed);
1385 
1386 	if (n_changed_ids > 0)
1387 		emit_params(node, changed_ids, n_changed_ids);
1388 
1389 	if (flags_changed)
1390 		pw_context_recalc_graph(node->context, "node flags changed");
1391 }
1392 
node_port_info(void * data,enum spa_direction direction,uint32_t port_id,const struct spa_port_info * info)1393 static void node_port_info(void *data, enum spa_direction direction, uint32_t port_id,
1394 		const struct spa_port_info *info)
1395 {
1396 	struct pw_impl_node *node = data;
1397 	struct pw_impl_port *port = pw_impl_node_find_port(node, direction, port_id);
1398 
1399 	if (info == NULL) {
1400 		if (port) {
1401 			pw_log_debug("%p: %s port %d removed", node,
1402 					pw_direction_as_string(direction), port_id);
1403 			pw_impl_port_destroy(port);
1404 		} else {
1405 			pw_log_warn("%p: %s port %d unknown", node,
1406 					pw_direction_as_string(direction), port_id);
1407 		}
1408 	} else if (port) {
1409 		pw_log_debug("%p: %s port %d changed", node,
1410 				pw_direction_as_string(direction), port_id);
1411 		pw_impl_port_update_info(port, info);
1412 	} else {
1413 		int res;
1414 
1415 		pw_log_debug("%p: %s port %d added", node,
1416 				pw_direction_as_string(direction), port_id);
1417 
1418 		if ((port = pw_context_create_port(node->context, direction, port_id, info,
1419 					node->port_user_data_size))) {
1420 			if ((res = pw_impl_port_add(port, node)) < 0) {
1421 				pw_log_error("%p: can't add port %p: %d, %s",
1422 						node, port, res, spa_strerror(res));
1423 				pw_impl_port_destroy(port);
1424 			}
1425 		}
1426 	}
1427 }
1428 
node_result(void * data,int seq,int res,uint32_t type,const void * result)1429 static void node_result(void *data, int seq, int res, uint32_t type, const void *result)
1430 {
1431 	struct pw_impl_node *node = data;
1432 	struct impl *impl = SPA_CONTAINER_OF(node, struct impl, this);
1433 
1434 	pw_log_trace("%p: result seq:%d res:%d type:%u", node, seq, res, type);
1435 	if (res < 0)
1436 		impl->last_error = res;
1437 
1438 	if (SPA_RESULT_IS_ASYNC(seq))
1439 	        pw_work_queue_complete(impl->work, &impl->this, SPA_RESULT_ASYNC_SEQ(seq), res);
1440 
1441 	pw_impl_node_emit_result(node, seq, res, type, result);
1442 }
1443 
node_event(void * data,const struct spa_event * event)1444 static void node_event(void *data, const struct spa_event *event)
1445 {
1446 	struct pw_impl_node *node = data;
1447 	struct impl *impl = SPA_CONTAINER_OF(node, struct impl, this);
1448 
1449 	pw_log_trace("%p: event %d", node, SPA_EVENT_TYPE(event));
1450 
1451 	switch (SPA_NODE_EVENT_ID(event)) {
1452 	case SPA_NODE_EVENT_Error:
1453 		impl->last_error = -EFAULT;
1454 		node_update_state(node, PW_NODE_STATE_ERROR,
1455 				-EFAULT, strdup("Received error event"));
1456 		break;
1457 	case SPA_NODE_EVENT_RequestProcess:
1458 		pw_log_debug("request process");
1459 		if (!node->driving) {
1460 			pw_impl_node_send_command(node->driver_node,
1461 				    &SPA_NODE_COMMAND_INIT(SPA_NODE_COMMAND_RequestProcess));
1462 		}
1463 		break;
1464 	default:
1465 		pw_log_debug("unhandled event %d", SPA_NODE_EVENT_ID(event));
1466 		break;
1467 	}
1468 	pw_impl_node_emit_event(node, event);
1469 }
1470 
1471 static const struct spa_node_events node_events = {
1472 	SPA_VERSION_NODE_EVENTS,
1473 	.info = node_info,
1474 	.port_info = node_port_info,
1475 	.result = node_result,
1476 	.event = node_event,
1477 };
1478 
1479 #define SYNC_CHECK	0
1480 #define SYNC_START	1
1481 #define SYNC_STOP	2
1482 
check_updates(struct pw_impl_node * node,uint32_t * reposition_owner)1483 static int check_updates(struct pw_impl_node *node, uint32_t *reposition_owner)
1484 {
1485 	int res = SYNC_CHECK;
1486 	struct pw_node_activation *a = node->rt.activation;
1487 	uint32_t command;
1488 
1489 	if (SPA_UNLIKELY(a->position.offset == INT64_MIN))
1490 		a->position.offset = a->position.clock.position;
1491 
1492 	command = ATOMIC_XCHG(a->command, PW_NODE_ACTIVATION_COMMAND_NONE);
1493 	*reposition_owner = ATOMIC_XCHG(a->reposition_owner, 0);
1494 
1495 	if (SPA_UNLIKELY(command != PW_NODE_ACTIVATION_COMMAND_NONE)) {
1496 		pw_log_debug("%p: update command:%u", node, command);
1497 		switch (command) {
1498 		case PW_NODE_ACTIVATION_COMMAND_STOP:
1499 			a->position.state = SPA_IO_POSITION_STATE_STOPPED;
1500 			res = SYNC_STOP;
1501 			break;
1502 		case PW_NODE_ACTIVATION_COMMAND_START:
1503 			a->position.state = SPA_IO_POSITION_STATE_STARTING;
1504 			a->sync_left = a->sync_timeout /
1505 				((a->position.clock.duration * SPA_NSEC_PER_SEC) /
1506 				 a->position.clock.rate.denom);
1507 			res = SYNC_START;
1508 			break;
1509 		}
1510 	}
1511 	return res;
1512 }
1513 
do_reposition(struct pw_impl_node * driver,struct pw_impl_node * node)1514 static void do_reposition(struct pw_impl_node *driver, struct pw_impl_node *node)
1515 {
1516 	struct pw_node_activation *a = driver->rt.activation;
1517 	struct spa_io_segment *dst, *src;
1518 
1519 	src = &node->rt.activation->reposition;
1520 	dst = &a->position.segments[0];
1521 
1522 	pw_log_info("%p: update position:%"PRIu64, node, src->position);
1523 
1524 	dst->version = src->version;
1525 	dst->flags = src->flags;
1526 	dst->start = src->start;
1527 	dst->duration = src->duration;
1528 	dst->rate = src->rate;
1529 	dst->position = src->position;
1530 	if (src->bar.flags & SPA_IO_SEGMENT_BAR_FLAG_VALID)
1531 		dst->bar = src->bar;
1532 	if (src->video.flags & SPA_IO_SEGMENT_VIDEO_FLAG_VALID)
1533 		dst->video = src->video;
1534 
1535 	if (dst->start == 0)
1536 		dst->start = a->position.clock.position - a->position.offset;
1537 
1538 	switch (a->position.state) {
1539 	case SPA_IO_POSITION_STATE_RUNNING:
1540 		a->position.state = SPA_IO_POSITION_STATE_STARTING;
1541 		a->sync_left = a->sync_timeout /
1542 			((a->position.clock.duration * SPA_NSEC_PER_SEC) /
1543 			 a->position.clock.rate.denom);
1544 		break;
1545 	}
1546 }
1547 
update_position(struct pw_impl_node * node,int all_ready)1548 static void update_position(struct pw_impl_node *node, int all_ready)
1549 {
1550 	struct pw_node_activation *a = node->rt.activation;
1551 
1552 	if (a->position.state == SPA_IO_POSITION_STATE_STARTING) {
1553 		if (!all_ready && --a->sync_left == 0) {
1554 			pw_log_warn("(%s-%u) sync timeout, going to RUNNING",
1555 					node->name, node->info.id);
1556 			pw_context_driver_emit_timeout(node->context, node);
1557 			dump_states(node);
1558 			all_ready = true;
1559 		}
1560 		if (all_ready)
1561 			a->position.state = SPA_IO_POSITION_STATE_RUNNING;
1562 	}
1563 	if (a->position.state != SPA_IO_POSITION_STATE_RUNNING)
1564 		a->position.offset += a->position.clock.duration;
1565 }
1566 
node_ready(void * data,int status)1567 static int node_ready(void *data, int status)
1568 {
1569 	struct pw_impl_node *node = data, *reposition_node = NULL;
1570 	struct pw_impl_node *driver = node->driver_node;
1571 	struct pw_node_target *t;
1572 	struct pw_impl_port *p;
1573 
1574 	pw_log_trace_fp("%p: ready driver:%d exported:%d %p status:%d", node,
1575 			node->driver, node->exported, driver, status);
1576 
1577 	if (SPA_UNLIKELY(node == driver)) {
1578 		struct pw_node_activation *a = node->rt.activation;
1579 		struct pw_node_activation_state *state = &a->state[0];
1580 		int sync_type, all_ready, update_sync, target_sync;
1581 		uint32_t owner[2], reposition_owner;
1582 		uint64_t min_timeout = UINT64_MAX;
1583 
1584 		if (SPA_UNLIKELY(state->pending > 0)) {
1585 			pw_context_driver_emit_incomplete(node->context, node);
1586 			if (ratelimit_test(&node->rt.rate_limit, a->signal_time, SPA_LOG_LEVEL_DEBUG)) {
1587 				pw_log_debug("(%s-%u) graph not finished: state:%p quantum:%"PRIu64
1588 						" pending %d/%d", node->name, node->info.id,
1589 						state, a->position.clock.duration,
1590 						state->pending, state->required);
1591 				dump_states(node);
1592 			}
1593 			node->rt.target.signal(node->rt.target.data);
1594 		}
1595 
1596 		if (node->current_pending) {
1597 			node->rt.position->clock.duration = node->current_quantum;
1598 			node->rt.position->clock.rate = node->current_rate;
1599 			node->current_pending = false;
1600 		}
1601 
1602 		sync_type = check_updates(node, &reposition_owner);
1603 		owner[0] = ATOMIC_LOAD(a->segment_owner[0]);
1604 		owner[1] = ATOMIC_LOAD(a->segment_owner[1]);
1605 again:
1606 		all_ready = sync_type == SYNC_CHECK;
1607 		update_sync = !all_ready;
1608 		target_sync = sync_type == SYNC_START ? true : false;
1609 
1610 		spa_list_for_each(t, &driver->rt.target_list, link) {
1611 			struct pw_node_activation *ta = t->activation;
1612 
1613 			ta->status = PW_NODE_ACTIVATION_NOT_TRIGGERED;
1614 			pw_node_activation_state_reset(&ta->state[0]);
1615 
1616 			if (SPA_LIKELY(t->node)) {
1617 				uint32_t id = t->node->info.id;
1618 
1619 				/* this is the node with reposition info */
1620 				if (SPA_UNLIKELY(id == reposition_owner))
1621 					reposition_node = t->node;
1622 
1623 				/* update extra segment info if it is the owner */
1624 				if (SPA_UNLIKELY(id == owner[0]))
1625 					a->position.segments[0].bar = ta->segment.bar;
1626 				if (SPA_UNLIKELY(id == owner[1]))
1627 					a->position.segments[0].video = ta->segment.video;
1628 
1629 				min_timeout = SPA_MIN(min_timeout, ta->sync_timeout);
1630 			}
1631 
1632 			if (SPA_UNLIKELY(update_sync)) {
1633 				ta->pending_sync = target_sync;
1634 				ta->pending_new_pos = target_sync;
1635 			} else {
1636 				all_ready &= ta->pending_sync == false;
1637 			}
1638 		}
1639 		a->prev_signal_time = a->signal_time;
1640 		a->sync_timeout = SPA_MIN(min_timeout, DEFAULT_SYNC_TIMEOUT);
1641 
1642 		if (SPA_UNLIKELY(reposition_node)) {
1643 			do_reposition(node, reposition_node);
1644 			sync_type = SYNC_START;
1645 			reposition_owner = 0;
1646 			reposition_node = NULL;
1647 			goto again;
1648 		}
1649 
1650 		update_position(node, all_ready);
1651 
1652 		pw_context_driver_emit_start(node->context, node);
1653 	}
1654 	if (SPA_UNLIKELY(node->driver && !node->driving))
1655 		return 0;
1656 
1657 	if (status & SPA_STATUS_HAVE_DATA) {
1658 		spa_list_for_each(p, &node->rt.output_mix, rt.node_link)
1659 			spa_node_process(p->mix);
1660 	}
1661 	return resume_node(node, status);
1662 }
1663 
node_reuse_buffer(void * data,uint32_t port_id,uint32_t buffer_id)1664 static int node_reuse_buffer(void *data, uint32_t port_id, uint32_t buffer_id)
1665 {
1666 	struct pw_impl_node *node = data;
1667 	struct pw_impl_port *p;
1668 
1669 	spa_list_for_each(p, &node->rt.input_mix, rt.node_link) {
1670 		if (p->port_id != port_id)
1671 			continue;
1672 		spa_node_port_reuse_buffer(p->mix, 0, buffer_id);
1673 		break;
1674 	}
1675 	return 0;
1676 }
1677 
update_xrun_stats(struct pw_node_activation * a,uint64_t trigger,uint64_t delay)1678 static void update_xrun_stats(struct pw_node_activation *a, uint64_t trigger, uint64_t delay)
1679 {
1680 	a->xrun_count++;
1681 	a->xrun_time = trigger;
1682 	a->xrun_delay = delay;
1683 	a->max_delay = SPA_MAX(a->max_delay, delay);
1684 }
1685 
node_xrun(void * data,uint64_t trigger,uint64_t delay,struct spa_pod * info)1686 static int node_xrun(void *data, uint64_t trigger, uint64_t delay, struct spa_pod *info)
1687 {
1688 	struct pw_impl_node *this = data;
1689 	struct pw_node_activation *a = this->rt.activation;
1690 	struct pw_node_activation *da = this->rt.driver_target.activation;
1691 
1692 	update_xrun_stats(a, trigger, delay);
1693 	if (da && da != a)
1694 		update_xrun_stats(da, trigger, delay);
1695 
1696 	if (ratelimit_test(&this->rt.rate_limit, a->signal_time, SPA_LOG_LEVEL_INFO)) {
1697 		struct spa_fraction rate;
1698 		if (da) {
1699 			struct spa_io_clock *cl = &da->position.clock;
1700 			rate.num = cl->rate.num * cl->duration;
1701 			rate.denom = cl->rate.denom;
1702 		} else {
1703 			rate = SPA_FRACTION(0,0);
1704 		}
1705 		pw_log_info("(%s-%d) XRun! rate:%u/%u count:%u time:%"PRIu64
1706 				" delay:%"PRIu64" max:%"PRIu64,
1707 				this->name, this->info.id,
1708 				rate.num, rate.denom, a->xrun_count,
1709 				trigger, delay, a->max_delay);
1710 	}
1711 
1712 	pw_context_driver_emit_xrun(this->context, this);
1713 
1714 	return 0;
1715 }
1716 
1717 static const struct spa_node_callbacks node_callbacks = {
1718 	SPA_VERSION_NODE_CALLBACKS,
1719 	.ready = node_ready,
1720 	.reuse_buffer = node_reuse_buffer,
1721 	.xrun = node_xrun,
1722 };
1723 
1724 SPA_EXPORT
pw_impl_node_set_implementation(struct pw_impl_node * node,struct spa_node * spa_node)1725 int pw_impl_node_set_implementation(struct pw_impl_node *node,
1726 			struct spa_node *spa_node)
1727 {
1728 	int res;
1729 
1730 	pw_log_debug("%p: implementation %p", node, spa_node);
1731 
1732 	if (node->node) {
1733 		pw_log_error("%p: implementation existed %p", node, node->node);
1734 		return -EEXIST;
1735 	}
1736 
1737 	node->node = spa_node;
1738 	spa_node_set_callbacks(node->node, &node_callbacks, node);
1739 	res = spa_node_add_listener(node->node, &node->listener, &node_events, node);
1740 
1741 	if (node->registered)
1742 		update_io(node);
1743 
1744 	return res;
1745 }
1746 
1747 SPA_EXPORT
pw_impl_node_get_implementation(struct pw_impl_node * node)1748 struct spa_node *pw_impl_node_get_implementation(struct pw_impl_node *node)
1749 {
1750 	return node->node;
1751 }
1752 
1753 SPA_EXPORT
pw_impl_node_add_listener(struct pw_impl_node * node,struct spa_hook * listener,const struct pw_impl_node_events * events,void * data)1754 void pw_impl_node_add_listener(struct pw_impl_node *node,
1755 			   struct spa_hook *listener,
1756 			   const struct pw_impl_node_events *events,
1757 			   void *data)
1758 {
1759 	spa_hook_list_append(&node->listener_list, listener, events, data);
1760 }
1761 
1762 /** Destroy a node
1763  * \param node a node to destroy
1764  *
1765  * Remove \a node. This will stop the transfer on the node and
1766  * free the resources allocated by \a node.
1767  */
1768 SPA_EXPORT
pw_impl_node_destroy(struct pw_impl_node * node)1769 void pw_impl_node_destroy(struct pw_impl_node *node)
1770 {
1771 	struct impl *impl = SPA_CONTAINER_OF(node, struct impl, this);
1772 	struct pw_impl_port *port;
1773 	struct pw_impl_node *follower;
1774 	bool active, had_driver;
1775 
1776 	active = node->active;
1777 	node->active = false;
1778 
1779 	pw_log_debug("%p: destroy", impl);
1780 	pw_log_info("(%s-%u) destroy", node->name, node->info.id);
1781 
1782 	node_deactivate(node);
1783 
1784 	suspend_node(node);
1785 
1786 	pw_impl_node_emit_destroy(node);
1787 
1788 	pw_log_debug("%p: driver node %p", impl, node->driver_node);
1789 	had_driver = node != node->driver_node;
1790 
1791 	/* remove ourself as a follower from the driver node */
1792 	spa_list_remove(&node->follower_link);
1793 	remove_segment_owner(node->driver_node, node->info.id);
1794 
1795 	spa_list_consume(follower, &node->follower_list, follower_link) {
1796 		pw_log_debug("%p: reassign follower %p", impl, follower);
1797 		pw_impl_node_set_driver(follower, NULL);
1798 	}
1799 
1800 	if (node->registered) {
1801 		spa_list_remove(&node->link);
1802 		if (node->driver)
1803 			spa_list_remove(&node->driver_link);
1804 	}
1805 
1806 	if (node->node) {
1807 		spa_hook_remove(&node->listener);
1808 		spa_node_set_callbacks(node->node, NULL, NULL);
1809 	}
1810 
1811 	pw_log_debug("%p: destroy ports", node);
1812 	spa_list_consume(port, &node->input_ports, link)
1813 		pw_impl_port_destroy(port);
1814 	spa_list_consume(port, &node->output_ports, link)
1815 		pw_impl_port_destroy(port);
1816 
1817 	if (node->global) {
1818 		spa_hook_remove(&node->global_listener);
1819 		pw_global_destroy(node->global);
1820 	}
1821 
1822 	if (active || had_driver)
1823 		pw_context_recalc_graph(node->context,
1824 				"active node destroy");
1825 
1826 	pw_log_debug("%p: free", node);
1827 	pw_impl_node_emit_free(node);
1828 
1829 	spa_hook_list_clean(&node->listener_list);
1830 
1831 	pw_memblock_unref(node->activation);
1832 
1833 	pw_param_clear(&impl->param_list, SPA_ID_INVALID);
1834 	pw_param_clear(&impl->pending_list, SPA_ID_INVALID);
1835 
1836 	pw_map_clear(&node->input_port_map);
1837 	pw_map_clear(&node->output_port_map);
1838 
1839 	pw_work_queue_cancel(impl->work, node, SPA_ID_INVALID);
1840 
1841 	pw_properties_free(node->properties);
1842 
1843 	clear_info(node);
1844 
1845 	spa_system_close(node->context->data_system, node->source.fd);
1846 	free(impl);
1847 }
1848 
1849 SPA_EXPORT
pw_impl_node_for_each_port(struct pw_impl_node * node,enum pw_direction direction,int (* callback)(void * data,struct pw_impl_port * port),void * data)1850 int pw_impl_node_for_each_port(struct pw_impl_node *node,
1851 			  enum pw_direction direction,
1852 			  int (*callback) (void *data, struct pw_impl_port *port),
1853 			  void *data)
1854 {
1855 	struct spa_list *ports;
1856 	struct pw_impl_port *p, *t;
1857 	int res;
1858 
1859 	if (direction == PW_DIRECTION_INPUT)
1860 		ports = &node->input_ports;
1861 	else
1862 		ports = &node->output_ports;
1863 
1864 	spa_list_for_each_safe(p, t, ports, link)
1865 		if ((res = callback(data, p)) != 0)
1866 			return res;
1867 	return 0;
1868 }
1869 
1870 struct result_node_params_data {
1871 	struct impl *impl;
1872 	void *data;
1873 	int (*callback) (void *data, int seq,
1874 			uint32_t id, uint32_t index, uint32_t next,
1875 			struct spa_pod *param);
1876 	int seq;
1877 	uint32_t count;
1878 	unsigned int cache:1;
1879 };
1880 
result_node_params(void * data,int seq,int res,uint32_t type,const void * result)1881 static void result_node_params(void *data, int seq, int res, uint32_t type, const void *result)
1882 {
1883 	struct result_node_params_data *d = data;
1884 	struct impl *impl = d->impl;
1885 	switch (type) {
1886 	case SPA_RESULT_TYPE_NODE_PARAMS:
1887 	{
1888 		const struct spa_result_node_params *r = result;
1889 		if (d->seq == seq) {
1890 			d->callback(d->data, seq, r->id, r->index, r->next, r->param);
1891 			if (d->cache) {
1892 				if (d->count++ == 0)
1893 					pw_param_add(&impl->pending_list, r->id, NULL);
1894 				pw_param_add(&impl->pending_list, r->id, r->param);
1895 			}
1896 		}
1897 		break;
1898 	}
1899 	default:
1900 		break;
1901 	}
1902 }
1903 
1904 SPA_EXPORT
pw_impl_node_for_each_param(struct pw_impl_node * node,int seq,uint32_t param_id,uint32_t index,uint32_t max,const struct spa_pod * filter,int (* callback)(void * data,int seq,uint32_t id,uint32_t index,uint32_t next,struct spa_pod * param),void * data)1905 int pw_impl_node_for_each_param(struct pw_impl_node *node,
1906 			   int seq, uint32_t param_id,
1907 			   uint32_t index, uint32_t max,
1908 			   const struct spa_pod *filter,
1909 			   int (*callback) (void *data, int seq,
1910 					    uint32_t id, uint32_t index, uint32_t next,
1911 					    struct spa_pod *param),
1912 			   void *data)
1913 {
1914 	int res;
1915 	struct impl *impl = SPA_CONTAINER_OF(node, struct impl, this);
1916 	struct result_node_params_data user_data = { impl, data, callback, seq, 0, false };
1917 	struct spa_hook listener;
1918 	struct spa_param_info *pi;
1919 	static const struct spa_node_events node_events = {
1920 		SPA_VERSION_NODE_EVENTS,
1921 		.result = result_node_params,
1922 	};
1923 
1924 	pi = pw_param_info_find(node->info.params, node->info.n_params, param_id);
1925 	if (pi == NULL)
1926 		return -ENOENT;
1927 
1928 	if (max == 0)
1929 		max = UINT32_MAX;
1930 
1931 	pw_log_debug("%p: params id:%d (%s) index:%u max:%u cached:%d", node, param_id,
1932 			spa_debug_type_find_name(spa_type_param, param_id),
1933 			index, max, pi->user);
1934 
1935 	if (pi->user == 1) {
1936 		struct pw_param *p;
1937 		uint8_t buffer[4096];
1938 		struct spa_pod_builder b = { 0 };
1939 	        struct spa_result_node_params result;
1940 		uint32_t count = 0;
1941 
1942 		result.id = param_id;
1943 		result.next = 0;
1944 
1945 		spa_list_for_each(p, &impl->param_list, link) {
1946 			result.index = result.next++;
1947 			if (p->id != param_id)
1948 				continue;
1949 
1950 			if (result.index < index)
1951 				continue;
1952 
1953 			spa_pod_builder_init(&b, buffer, sizeof(buffer));
1954 			if (spa_pod_filter(&b, &result.param, p->param, filter) != 0)
1955 				continue;
1956 
1957 			pw_log_debug("%p: %d param %u", node, seq, result.index);
1958 			result_node_params(&user_data, seq, 0, SPA_RESULT_TYPE_NODE_PARAMS, &result);
1959 
1960 			if (++count == max)
1961 				break;
1962 		}
1963 		res = 0;
1964 	} else {
1965 		user_data.cache = impl->cache_params &&
1966 			(filter == NULL && index == 0 && max == UINT32_MAX);
1967 
1968 		spa_zero(listener);
1969 		spa_node_add_listener(node->node, &listener, &node_events, &user_data);
1970 		res = spa_node_enum_params(node->node, seq,
1971 						param_id, index, max,
1972 						filter);
1973 		spa_hook_remove(&listener);
1974 
1975 		if (user_data.cache) {
1976 			pw_param_update(&impl->param_list, &impl->pending_list);
1977 			pi->user = 1;
1978 		}
1979 	}
1980 	return res;
1981 }
1982 
1983 SPA_EXPORT
pw_impl_node_set_param(struct pw_impl_node * node,uint32_t id,uint32_t flags,const struct spa_pod * param)1984 int pw_impl_node_set_param(struct pw_impl_node *node,
1985 		uint32_t id, uint32_t flags, const struct spa_pod *param)
1986 {
1987 	pw_log_debug("%p: set_param id:%d (%s) flags:%08x param:%p", node, id,
1988 			spa_debug_type_find_name(spa_type_param, id), flags, param);
1989 	return spa_node_set_param(node->node, id, flags, param);
1990 }
1991 
1992 SPA_EXPORT
1993 struct pw_impl_port *
pw_impl_node_find_port(struct pw_impl_node * node,enum pw_direction direction,uint32_t port_id)1994 pw_impl_node_find_port(struct pw_impl_node *node, enum pw_direction direction, uint32_t port_id)
1995 {
1996 	struct pw_impl_port *port, *p;
1997 	struct pw_map *portmap;
1998 	struct spa_list *ports;
1999 
2000 	if (direction == PW_DIRECTION_INPUT) {
2001 		portmap = &node->input_port_map;
2002 		ports = &node->input_ports;
2003 	} else {
2004 		portmap = &node->output_port_map;
2005 		ports = &node->output_ports;
2006 	}
2007 
2008 	if (port_id != PW_ID_ANY)
2009 		port = pw_map_lookup(portmap, port_id);
2010 	else {
2011 		port = NULL;
2012 		/* try to find an unlinked port */
2013 		spa_list_for_each(p, ports, link) {
2014 			if (spa_list_is_empty(&p->links)) {
2015 				port = p;
2016 				break;
2017 			}
2018 			/* We can use this port if it can multiplex */
2019 			if (SPA_FLAG_IS_SET(p->mix_flags, PW_IMPL_PORT_MIX_FLAG_MULTI))
2020 				port = p;
2021 		}
2022 	}
2023 	pw_log_debug("%p: return %s port %d: %p", node,
2024 			pw_direction_as_string(direction), port_id, port);
2025 	return port;
2026 }
2027 
2028 SPA_EXPORT
pw_impl_node_get_free_port_id(struct pw_impl_node * node,enum pw_direction direction)2029 uint32_t pw_impl_node_get_free_port_id(struct pw_impl_node *node, enum pw_direction direction)
2030 {
2031 	uint32_t n_ports, max_ports;
2032 	struct pw_map *portmap;
2033 	uint32_t port_id;
2034 	bool dynamic;
2035 	int res;
2036 
2037 	if (direction == PW_DIRECTION_INPUT) {
2038 		max_ports = node->info.max_input_ports;
2039 		n_ports = node->info.n_input_ports;
2040 		portmap = &node->input_port_map;
2041 		dynamic = SPA_FLAG_IS_SET(node->spa_flags, SPA_NODE_FLAG_IN_DYNAMIC_PORTS);
2042 	} else {
2043 		max_ports = node->info.max_output_ports;
2044 		n_ports = node->info.n_output_ports;
2045 		portmap = &node->output_port_map;
2046 		dynamic = SPA_FLAG_IS_SET(node->spa_flags, SPA_NODE_FLAG_OUT_DYNAMIC_PORTS);
2047 	}
2048 	pw_log_debug("%p: direction %s n_ports:%u max_ports:%u",
2049 			node, pw_direction_as_string(direction), n_ports, max_ports);
2050 
2051 	if (!dynamic || n_ports >= max_ports) {
2052 		res = -ENOSPC;
2053 		goto error;
2054 	}
2055 
2056 	port_id = pw_map_insert_new(portmap, NULL);
2057 	if (port_id == SPA_ID_INVALID) {
2058 		res = -errno;
2059 		goto error;
2060 	}
2061 
2062 	pw_log_debug("%p: free port %d", node, port_id);
2063 
2064 	return port_id;
2065 
2066 error:
2067 	pw_log_warn("%p: no more port available: %s", node, spa_strerror(res));
2068 	errno = -res;
2069 	return SPA_ID_INVALID;
2070 }
2071 
on_state_complete(void * obj,void * data,int res,uint32_t seq)2072 static void on_state_complete(void *obj, void *data, int res, uint32_t seq)
2073 {
2074 	struct pw_impl_node *node = obj;
2075 	struct impl *impl = SPA_CONTAINER_OF(node, struct impl, this);
2076 	enum pw_node_state state = SPA_PTR_TO_INT(data);
2077 	char *error = NULL;
2078 
2079 	impl->pending_id = SPA_ID_INVALID;
2080 	impl->pending_play = false;
2081 
2082 	pw_log_debug("%p: state complete res:%d seq:%d", node, res, seq);
2083 	if (impl->last_error < 0) {
2084 		res = impl->last_error;
2085 		impl->last_error = 0;
2086 	}
2087 	if (SPA_RESULT_IS_ERROR(res)) {
2088 		if (node->info.state == PW_NODE_STATE_SUSPENDED) {
2089 			state = PW_NODE_STATE_SUSPENDED;
2090 			res = 0;
2091 		} else {
2092 			error = spa_aprintf("error changing node state: %s", spa_strerror(res));
2093 			state = PW_NODE_STATE_ERROR;
2094 		}
2095 	}
2096 	node_update_state(node, state, res, error);
2097 }
2098 
2099 /** Set the node state
2100  * \param node a \ref pw_impl_node
2101  * \param state a \ref pw_node_state
2102  * \return 0 on success < 0 on error
2103  *
2104  * Set the state of \a node to \a state.
2105  */
2106 SPA_EXPORT
pw_impl_node_set_state(struct pw_impl_node * node,enum pw_node_state state)2107 int pw_impl_node_set_state(struct pw_impl_node *node, enum pw_node_state state)
2108 {
2109 	int res = 0;
2110 	struct impl *impl = SPA_CONTAINER_OF(node, struct impl, this);
2111 	enum pw_node_state old = impl->pending_state;
2112 
2113 	pw_log_debug("%p: set state (%s) %s -> %s, active %d pause_on_idle:%d", node,
2114 			pw_node_state_as_string(node->info.state),
2115 			pw_node_state_as_string(old),
2116 			pw_node_state_as_string(state),
2117 			node->active,
2118 			impl->pause_on_idle);
2119 
2120 	if (old != state)
2121 		pw_impl_node_emit_state_request(node, state);
2122 
2123 	switch (state) {
2124 	case PW_NODE_STATE_CREATING:
2125 		return -EIO;
2126 
2127 	case PW_NODE_STATE_SUSPENDED:
2128 		res = suspend_node(node);
2129 		break;
2130 
2131 	case PW_NODE_STATE_IDLE:
2132 		if (impl->pause_on_idle)
2133 			res = pause_node(node);
2134 		break;
2135 
2136 	case PW_NODE_STATE_RUNNING:
2137 		if (node->active)
2138 			res = start_node(node);
2139 		break;
2140 
2141 	case PW_NODE_STATE_ERROR:
2142 		break;
2143 	}
2144 	if (SPA_RESULT_IS_ERROR(res))
2145 		return res;
2146 
2147 	if (SPA_RESULT_IS_ASYNC(res)) {
2148 		res = spa_node_sync(node->node, res);
2149 	}
2150 
2151 	if (old != state) {
2152 		if (impl->pending_id != SPA_ID_INVALID) {
2153 			pw_log_debug("cancel state from %s to %s to %s",
2154 				pw_node_state_as_string(node->info.state),
2155 				pw_node_state_as_string(impl->pending_state),
2156 				pw_node_state_as_string(state));
2157 
2158 			if (impl->pending_state == PW_NODE_STATE_RUNNING &&
2159 			    state < PW_NODE_STATE_RUNNING &&
2160 			    impl->pending_play) {
2161 				impl->pending_play = false;
2162 				spa_node_send_command(node->node,
2163 					&SPA_NODE_COMMAND_INIT(SPA_NODE_COMMAND_Pause));
2164 			}
2165 			pw_work_queue_cancel(impl->work, node, impl->pending_id);
2166 			node->info.state = impl->pending_state;
2167 		}
2168 		impl->pending_state = state;
2169 		impl->pending_id = pw_work_queue_add(impl->work,
2170 				node, res, on_state_complete, SPA_INT_TO_PTR(state));
2171 	}
2172 	return res;
2173 }
2174 
2175 SPA_EXPORT
pw_impl_node_set_active(struct pw_impl_node * node,bool active)2176 int pw_impl_node_set_active(struct pw_impl_node *node, bool active)
2177 {
2178 	bool old = node->active;
2179 
2180 	if (old != active) {
2181 		pw_log_debug("%p: %s", node, active ? "activate" : "deactivate");
2182 
2183 		node->active = active;
2184 		pw_impl_node_emit_active_changed(node, active);
2185 
2186 		if (node->registered)
2187 			pw_context_recalc_graph(node->context,
2188 					active ? "node activate" : "node deactivate");
2189 	}
2190 	return 0;
2191 }
2192 
2193 SPA_EXPORT
pw_impl_node_is_active(struct pw_impl_node * node)2194 bool pw_impl_node_is_active(struct pw_impl_node *node)
2195 {
2196 	return node->active;
2197 }
2198 
2199 SPA_EXPORT
pw_impl_node_send_command(struct pw_impl_node * node,const struct spa_command * command)2200 int pw_impl_node_send_command(struct pw_impl_node *node, const struct spa_command *command)
2201 {
2202 	return spa_node_send_command(node->node, command);
2203 }
2204