1 /* PipeWire
2  *
3  * Copyright © 2020 Wim Taymans
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining a
6  * copy of this software and associated documentation files (the "Software"),
7  * to deal in the Software without restriction, including without limitation
8  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9  * and/or sell copies of the Software, and to permit persons to whom the
10  * Software is furnished to do so, subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice (including the next
13  * paragraph) shall be included in all copies or substantial portions of the
14  * Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.  IN NO EVENT SHALL
19  * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
22  * DEALINGS IN THE SOFTWARE.
23  */
24 
25 #include "pipewire/core.h"
26 
27 #include "config.h"
28 
29 #include <errno.h>
30 #include <stdio.h>
31 #include <stdlib.h>
32 #include <string.h>
33 #include <time.h>
34 #include <sys/time.h>
35 
36 #include <pipewire/log.h>
37 
38 #include "log.h"
39 #define spa_debug pw_log_debug
40 
41 #include <spa/support/cpu.h>
42 #include <spa/utils/result.h>
43 #include <spa/utils/string.h>
44 #include <spa/debug/dict.h>
45 #include <spa/debug/mem.h>
46 #include <spa/debug/types.h>
47 #include <spa/param/audio/raw.h>
48 #include <spa/pod/pod.h>
49 #include <spa/param/audio/format-utils.h>
50 #include <spa/param/props.h>
51 #include <spa/utils/ringbuffer.h>
52 #include <spa/utils/json.h>
53 
54 #include <pipewire/pipewire.h>
55 #include <pipewire/private.h>
56 #include <pipewire/extensions/metadata.h>
57 
58 #include "pulse-server.h"
59 #include "client.h"
60 #include "collect.h"
61 #include "commands.h"
62 #include "dbus-name.h"
63 #include "defs.h"
64 #include "extension.h"
65 #include "format.h"
66 #include "internal.h"
67 #include "manager.h"
68 #include "message.h"
69 #include "message-handler.h"
70 #include "module.h"
71 #include "operation.h"
72 #include "pending-sample.h"
73 #include "quirks.h"
74 #include "reply.h"
75 #include "sample.h"
76 #include "sample-play.h"
77 #include "server.h"
78 #include "stream.h"
79 #include "utils.h"
80 #include "volume.h"
81 
82 #define DEFAULT_MIN_REQ		"256/48000"
83 #define DEFAULT_DEFAULT_REQ	"960/48000"
84 #define DEFAULT_MIN_FRAG	"256/48000"
85 #define DEFAULT_DEFAULT_FRAG	"96000/48000"
86 #define DEFAULT_DEFAULT_TLENGTH	"96000/48000"
87 #define DEFAULT_MIN_QUANTUM	"256/48000"
88 #define DEFAULT_FORMAT		"F32"
89 #define DEFAULT_POSITION	"[ FL FR ]"
90 
91 #define MAX_FORMATS	32
92 
93 bool debug_messages = false;
94 
95 struct latency_offset_data {
96 	int64_t prev_latency_offset;
97 	unsigned int initialized:1;
98 };
99 
find_sample(struct impl * impl,uint32_t idx,const char * name)100 static struct sample *find_sample(struct impl *impl, uint32_t idx, const char *name)
101 {
102 	union pw_map_item *item;
103 
104 	if (idx != SPA_ID_INVALID)
105 		return pw_map_lookup(&impl->samples, idx);
106 
107 	pw_array_for_each(item, &impl->samples.items) {
108 		struct sample *s = item->data;
109 		if (!pw_map_item_is_free(item) &&
110 		    spa_streq(s->name, name))
111 			return s;
112 	}
113 	return NULL;
114 }
115 
broadcast_subscribe_event(struct impl * impl,uint32_t mask,uint32_t event,uint32_t id)116 void broadcast_subscribe_event(struct impl *impl, uint32_t mask, uint32_t event, uint32_t id)
117 {
118 	struct server *s;
119 	spa_list_for_each(s, &impl->servers, link) {
120 		struct client *c;
121 		spa_list_for_each(c, &s->clients, link)
122 			client_queue_subscribe_event(c, mask, event, id);
123 	}
124 }
125 
do_command_auth(struct client * client,uint32_t command,uint32_t tag,struct message * m)126 static int do_command_auth(struct client *client, uint32_t command, uint32_t tag, struct message *m)
127 {
128 	struct message *reply;
129 	uint32_t version;
130 	const void *cookie;
131 	size_t len;
132 
133 	if (message_get(m,
134 			TAG_U32, &version,
135 			TAG_ARBITRARY, &cookie, &len,
136 			TAG_INVALID) < 0) {
137 		return -EPROTO;
138 	}
139 	if (version < 8)
140 		return -EPROTO;
141 	if (len != NATIVE_COOKIE_LENGTH)
142 		return -EINVAL;
143 
144 	if ((version & PROTOCOL_VERSION_MASK) >= 13)
145 		version &= PROTOCOL_VERSION_MASK;
146 
147 	client->version = version;
148 	client->authenticated = true;
149 
150 	pw_log_info("client:%p AUTH tag:%u version:%d", client, tag, version);
151 
152 	reply = reply_new(client, tag);
153 	message_put(reply,
154 			TAG_U32, PROTOCOL_VERSION,
155 			TAG_INVALID);
156 
157 	return client_queue_message(client, reply);
158 }
159 
reply_set_client_name(struct client * client,uint32_t tag)160 static int reply_set_client_name(struct client *client, uint32_t tag)
161 {
162 	struct message *reply;
163 	struct pw_client *c;
164 	uint32_t id;
165 
166 	c = pw_core_get_client(client->core);
167 	if (c == NULL)
168 		return -ENOENT;
169 
170 	id = pw_proxy_get_bound_id((struct pw_proxy*)c);
171 
172 	pw_log_info("[%s] reply tag:%u id:%u", client->name, tag, id);
173 
174 	reply = reply_new(client, tag);
175 
176 	if (client->version >= 13) {
177 		message_put(reply,
178 			TAG_U32, id,		/* client index */
179 			TAG_INVALID);
180 	}
181 	return client_queue_message(client, reply);
182 }
183 
manager_sync(void * data)184 static void manager_sync(void *data)
185 {
186 	struct client *client = data;
187 	struct operation *o;
188 
189 	pw_log_debug("%p: manager sync", client);
190 
191 	if (client->connect_tag != SPA_ID_INVALID) {
192 		reply_set_client_name(client, client->connect_tag);
193 		client->connect_tag = SPA_ID_INVALID;
194 	}
195 	spa_list_consume(o, &client->operations, link)
196 		operation_complete(o);
197 }
198 
find_stream(struct client * client,uint32_t id)199 static struct stream *find_stream(struct client *client, uint32_t id)
200 {
201 	union pw_map_item *item;
202 	pw_array_for_each(item, &client->streams.items) {
203 		struct stream *s = item->data;
204 		if (!pw_map_item_is_free(item) &&
205 		    s->id == id)
206 			return s;
207 	}
208 	return NULL;
209 }
210 
send_object_event(struct client * client,struct pw_manager_object * o,uint32_t facility)211 static int send_object_event(struct client *client, struct pw_manager_object *o,
212 		uint32_t facility)
213 {
214 	uint32_t event = 0, mask = 0, res_id = o->id;
215 
216 	if (pw_manager_object_is_sink(o)) {
217 		client_queue_subscribe_event(client,
218 				SUBSCRIPTION_MASK_SINK,
219 				SUBSCRIPTION_EVENT_SINK | facility,
220 				res_id);
221 	}
222 	if (pw_manager_object_is_source_or_monitor(o)) {
223 		if (!pw_manager_object_is_source(o))
224 			res_id |= MONITOR_FLAG;
225 		mask = SUBSCRIPTION_MASK_SOURCE;
226 		event = SUBSCRIPTION_EVENT_SOURCE;
227 	}
228 	else if (pw_manager_object_is_sink_input(o)) {
229 		mask = SUBSCRIPTION_MASK_SINK_INPUT;
230 		event = SUBSCRIPTION_EVENT_SINK_INPUT;
231 	}
232 	else if (pw_manager_object_is_source_output(o)) {
233 		mask = SUBSCRIPTION_MASK_SOURCE_OUTPUT;
234 		event = SUBSCRIPTION_EVENT_SOURCE_OUTPUT;
235 	}
236 	else if (pw_manager_object_is_module(o)) {
237 		mask = SUBSCRIPTION_MASK_MODULE;
238 		event = SUBSCRIPTION_EVENT_MODULE;
239 	}
240 	else if (pw_manager_object_is_client(o)) {
241 		mask = SUBSCRIPTION_MASK_CLIENT;
242 		event = SUBSCRIPTION_EVENT_CLIENT;
243 	}
244 	else if (pw_manager_object_is_card(o)) {
245 		mask = SUBSCRIPTION_MASK_CARD;
246 		event = SUBSCRIPTION_EVENT_CARD;
247 	} else
248 		event = SPA_ID_INVALID;
249 
250 	if (event != SPA_ID_INVALID)
251 		client_queue_subscribe_event(client,
252 				mask,
253 				event | facility,
254 				res_id);
255 	return 0;
256 }
257 
258 static struct pw_manager_object *find_device(struct client *client,
259 		uint32_t id, const char *name, bool sink, bool *is_monitor);
260 
get_node_latency_offset(struct pw_manager_object * o)261 static int64_t get_node_latency_offset(struct pw_manager_object *o)
262 {
263 	int64_t latency_offset = 0LL;
264 	struct pw_manager_param *p;
265 
266 	spa_list_for_each(p, &o->param_list, link) {
267 		if (p->id != SPA_PARAM_Props)
268 			continue;
269 		if (spa_pod_parse_object(p->param,
270 					SPA_TYPE_OBJECT_Props, NULL,
271 					SPA_PROP_latencyOffsetNsec, SPA_POD_Long(&latency_offset)) == 1)
272 			break;
273 	}
274 	return latency_offset;
275 }
276 
send_latency_offset_subscribe_event(struct client * client,struct pw_manager_object * o)277 static void send_latency_offset_subscribe_event(struct client *client, struct pw_manager_object *o)
278 {
279 	struct latency_offset_data *d;
280 	struct pw_node_info *info;
281 	const char *str;
282 	uint32_t card_id = SPA_ID_INVALID;
283 	int64_t latency_offset = 0LL;
284 	bool changed = false;
285 
286 	if (!pw_manager_object_is_sink(o) && !pw_manager_object_is_source_or_monitor(o))
287 		return;
288 
289 	/*
290 	 * Pulseaudio sends card change events on latency offset change.
291 	 */
292 	if ((info = o->info) == NULL || info->props == NULL)
293 		return;
294 	if ((str = spa_dict_lookup(info->props, PW_KEY_DEVICE_ID)) != NULL)
295 		card_id = (uint32_t)atoi(str);
296 	if (card_id == SPA_ID_INVALID)
297 		return;
298 
299 	d = pw_manager_object_add_data(o, "latency_offset_data", sizeof(struct latency_offset_data));
300 	if (d == NULL)
301 		return;
302 
303 	latency_offset = get_node_latency_offset(o);
304 	changed = (!d->initialized || latency_offset != d->prev_latency_offset);
305 
306 	d->prev_latency_offset = latency_offset;
307 	d->initialized = true;
308 
309 	if (changed)
310 		client_queue_subscribe_event(client,
311 				SUBSCRIPTION_MASK_CARD,
312 				SUBSCRIPTION_EVENT_CARD | SUBSCRIPTION_EVENT_CHANGE,
313 				card_id);
314 }
315 
send_default_change_subscribe_event(struct client * client,bool sink,bool source)316 static void send_default_change_subscribe_event(struct client *client, bool sink, bool source)
317 {
318 	struct pw_manager_object *def;
319 	bool changed = false;
320 
321 	if (sink) {
322 		def = find_device(client, SPA_ID_INVALID, NULL, true, NULL);
323 		if (client->prev_default_sink != def) {
324 			client->prev_default_sink = def;
325 			changed = true;
326 		}
327 	}
328 
329 	if (source) {
330 		def = find_device(client, SPA_ID_INVALID, NULL, false, NULL);
331 		if (client->prev_default_source != def) {
332 			client->prev_default_source = def;
333 			changed = true;
334 		}
335 	}
336 
337 	if (changed)
338 		client_queue_subscribe_event(client,
339 				SUBSCRIPTION_MASK_SERVER,
340 				SUBSCRIPTION_EVENT_CHANGE |
341 				SUBSCRIPTION_EVENT_SERVER,
342 				-1);
343 }
344 
handle_metadata(struct client * client,struct pw_manager_object * old,struct pw_manager_object * new,const char * name)345 static void handle_metadata(struct client *client, struct pw_manager_object *old,
346 		struct pw_manager_object *new, const char *name)
347 {
348 	if (spa_streq(name, "default")) {
349 		if (client->metadata_default == old)
350 			client->metadata_default = new;
351 	}
352 	else if (spa_streq(name, "route-settings")) {
353 		if (client->metadata_routes == old)
354 			client->metadata_routes = new;
355 	}
356 }
357 
frac_to_bytes_round_up(struct spa_fraction val,const struct sample_spec * ss)358 static uint32_t frac_to_bytes_round_up(struct spa_fraction val, const struct sample_spec *ss)
359 {
360 	uint64_t u;
361 	u = (uint64_t) (val.num * 1000000UL * (uint64_t) ss->rate) / val.denom;
362 	u = (u + 1000000UL - 1) / 1000000UL;
363 	u *= sample_spec_frame_size(ss);
364 	return (uint32_t) u;
365 }
366 
fix_playback_buffer_attr(struct stream * s,struct buffer_attr * attr)367 static uint32_t fix_playback_buffer_attr(struct stream *s, struct buffer_attr *attr)
368 {
369 	uint32_t frame_size, max_prebuf, minreq, latency, max_latency;
370 	struct defs *defs = &s->impl->defs;
371 
372 	frame_size = s->frame_size;
373 	minreq = frac_to_bytes_round_up(defs->min_req, &s->ss);
374 	max_latency = defs->max_quantum * frame_size;
375 
376 	if (attr->maxlength == (uint32_t) -1 || attr->maxlength > MAXLENGTH)
377 		attr->maxlength = MAXLENGTH;
378 	attr->maxlength -= attr->maxlength % frame_size;
379 	attr->maxlength = SPA_MAX(attr->maxlength, frame_size);
380 
381 	if (attr->tlength == (uint32_t) -1)
382 		attr->tlength = frac_to_bytes_round_up(defs->default_tlength, &s->ss);
383 	if (attr->tlength > attr->maxlength)
384 		attr->tlength = attr->maxlength;
385 	attr->tlength -= attr->tlength % frame_size;
386 	attr->tlength = SPA_MAX(attr->tlength, frame_size);
387 	attr->tlength = SPA_MAX(attr->tlength, minreq);
388 
389 	if (attr->minreq == (uint32_t) -1) {
390 		uint32_t process = frac_to_bytes_round_up(defs->default_req, &s->ss);
391 		/* With low-latency, tlength/4 gives a decent default in all of traditional,
392 		 * adjust latency and early request modes. */
393 		uint32_t m = attr->tlength / 4;
394 		m -= m % frame_size;
395 		attr->minreq = SPA_MIN(process, m);
396 	}
397 	attr->minreq = SPA_MAX(attr->minreq, minreq);
398 
399 	if (attr->tlength < attr->minreq+frame_size)
400 		attr->tlength = attr->minreq + frame_size;
401 
402 	if (s->early_requests) {
403 		latency = attr->minreq;
404 	} else if (s->adjust_latency) {
405 		if (attr->tlength > attr->minreq * 2)
406 			latency = SPA_MIN(max_latency, (attr->tlength - attr->minreq * 2) / 2);
407 		else
408 			latency = attr->minreq;
409 
410 		latency -= latency % frame_size;
411 
412 		if (attr->tlength >= latency)
413 			attr->tlength -= latency;
414 	} else {
415 		if (attr->tlength > attr->minreq * 2)
416 			latency = SPA_MIN(max_latency, attr->tlength - attr->minreq * 2);
417 		else
418 			latency = attr->minreq;
419 	}
420 
421 	if (attr->tlength < latency + 2 * attr->minreq)
422 		attr->tlength = latency + 2 * attr->minreq;
423 
424 	attr->minreq -= attr->minreq % frame_size;
425 	if (attr->minreq <= 0) {
426 		attr->minreq = frame_size;
427 		attr->tlength += frame_size*2;
428 	}
429 	if (attr->tlength <= attr->minreq)
430 		attr->tlength = attr->minreq*2 + frame_size;
431 
432 	max_prebuf = attr->tlength + frame_size - attr->minreq;
433 	if (attr->prebuf == (uint32_t) -1 || attr->prebuf > max_prebuf)
434 		attr->prebuf = max_prebuf;
435 	attr->prebuf -= attr->prebuf % frame_size;
436 
437 	s->missing = attr->tlength;
438 	attr->fragsize = 0;
439 
440 	pw_log_info("[%s] maxlength:%u tlength:%u minreq:%u/%u prebuf:%u latency:%u %u",
441 			s->client->name, attr->maxlength, attr->tlength,
442 			attr->minreq, minreq, attr->prebuf, latency, frame_size);
443 
444 	return latency / frame_size;
445 }
446 
reply_create_playback_stream(struct stream * stream,struct pw_manager_object * peer)447 static int reply_create_playback_stream(struct stream *stream, struct pw_manager_object *peer)
448 {
449 	struct client *client = stream->client;
450 	struct message *reply;
451 	uint32_t missing, peer_id;
452 	struct spa_dict_item items[5];
453 	char latency[32];
454 	char attr_maxlength[32];
455 	char attr_tlength[32];
456 	char attr_prebuf[32];
457 	char attr_minreq[32];
458 	const char *peer_name;
459 	struct spa_fraction lat;
460 	uint64_t lat_usec;
461 	struct defs *defs = &stream->impl->defs;
462 
463 	lat.denom = stream->ss.rate;
464 	lat.num = fix_playback_buffer_attr(stream, &stream->attr);
465 
466 	stream->buffer = calloc(1, stream->attr.maxlength);
467 	if (stream->buffer == NULL)
468 		return -errno;
469 
470 	if (lat.num * defs->min_quantum.denom / lat.denom < defs->min_quantum.num)
471 		lat.num = (defs->min_quantum.num * lat.denom +
472 				(defs->min_quantum.denom -1)) / defs->min_quantum.denom;
473 	lat_usec = lat.num * SPA_USEC_PER_SEC / lat.denom;
474 
475 	snprintf(latency, sizeof(latency), "%u/%u", lat.num, lat.denom);
476 	snprintf(attr_maxlength, sizeof(attr_maxlength), "%u", stream->attr.maxlength);
477 	snprintf(attr_tlength, sizeof(attr_tlength), "%u", stream->attr.tlength);
478 	snprintf(attr_prebuf, sizeof(attr_prebuf), "%u", stream->attr.prebuf);
479 	snprintf(attr_minreq, sizeof(attr_minreq), "%u", stream->attr.minreq);
480 
481 	items[0] = SPA_DICT_ITEM_INIT(PW_KEY_NODE_LATENCY, latency);
482 	items[1] = SPA_DICT_ITEM_INIT("pulse.attr.maxlength", attr_maxlength);
483 	items[2] = SPA_DICT_ITEM_INIT("pulse.attr.tlength", attr_tlength);
484 	items[3] = SPA_DICT_ITEM_INIT("pulse.attr.prebuf", attr_prebuf);
485 	items[4] = SPA_DICT_ITEM_INIT("pulse.attr.minreq", attr_minreq);
486 	pw_stream_update_properties(stream->stream, &SPA_DICT_INIT(items, 5));
487 
488 	if (stream->attr.prebuf > 0)
489 		stream->in_prebuf = true;
490 
491 	missing = stream_pop_missing(stream);
492 
493 	pw_log_info("[%s] reply CREATE_PLAYBACK_STREAM tag:%u missing:%u latency:%s",
494 			client->name, stream->create_tag, missing, latency);
495 
496 	reply = reply_new(client, stream->create_tag);
497 	message_put(reply,
498 		TAG_U32, stream->channel,		/* stream index/channel */
499 		TAG_U32, stream->id,			/* sink_input/stream index */
500 		TAG_U32, missing,			/* missing/requested bytes */
501 		TAG_INVALID);
502 
503 	if (peer && pw_manager_object_is_sink(peer)) {
504 		peer_id = peer->id;
505 		peer_name = pw_properties_get(peer->props, PW_KEY_NODE_NAME);
506 	} else {
507 		peer_id = SPA_ID_INVALID;
508 		peer_name = NULL;
509 	}
510 
511 	if (client->version >= 9) {
512 		message_put(reply,
513 			TAG_U32, stream->attr.maxlength,
514 			TAG_U32, stream->attr.tlength,
515 			TAG_U32, stream->attr.prebuf,
516 			TAG_U32, stream->attr.minreq,
517 			TAG_INVALID);
518 	}
519 	if (client->version >= 12) {
520 		message_put(reply,
521 			TAG_SAMPLE_SPEC, &stream->ss,
522 			TAG_CHANNEL_MAP, &stream->map,
523 			TAG_U32, peer_id,		/* sink index */
524 			TAG_STRING, peer_name,		/* sink name */
525 			TAG_BOOLEAN, false,		/* sink suspended state */
526 			TAG_INVALID);
527 	}
528 	if (client->version >= 13) {
529 		message_put(reply,
530 			TAG_USEC, lat_usec,		/* sink configured latency */
531 			TAG_INVALID);
532 	}
533 	if (client->version >= 21) {
534 		struct format_info info;
535 		spa_zero(info);
536 		info.encoding = ENCODING_PCM;
537 		message_put(reply,
538 			TAG_FORMAT_INFO, &info,		/* sink_input format */
539 			TAG_INVALID);
540 	}
541 
542 	stream->create_tag = SPA_ID_INVALID;
543 
544 	return client_queue_message(client, reply);
545 }
546 
fix_record_buffer_attr(struct stream * s,struct buffer_attr * attr)547 static uint32_t fix_record_buffer_attr(struct stream *s, struct buffer_attr *attr)
548 {
549 	uint32_t frame_size, minfrag, latency;
550 	struct defs *defs = &s->impl->defs;
551 
552 	frame_size = s->frame_size;
553 
554 	if (attr->maxlength == (uint32_t) -1 || attr->maxlength > MAXLENGTH)
555 		attr->maxlength = MAXLENGTH;
556 	attr->maxlength -= attr->maxlength % frame_size;
557 	attr->maxlength = SPA_MAX(attr->maxlength, frame_size);
558 
559 	minfrag = frac_to_bytes_round_up(defs->min_frag, &s->ss);
560 
561 	if (attr->fragsize == (uint32_t) -1 || attr->fragsize == 0)
562 		attr->fragsize = frac_to_bytes_round_up(defs->default_frag, &s->ss);
563 	attr->fragsize -= attr->fragsize % frame_size;
564 	attr->fragsize = SPA_MAX(attr->fragsize, minfrag);
565 	attr->fragsize = SPA_MAX(attr->fragsize, frame_size);
566 
567 	if (attr->fragsize > attr->maxlength)
568 		attr->fragsize = attr->maxlength;
569 
570 	attr->tlength = attr->minreq = attr->prebuf = 0;
571 
572 	if (s->early_requests) {
573 		latency = attr->fragsize;
574 	} else if (s->adjust_latency) {
575 		latency = attr->fragsize;
576 	} else {
577 		latency = attr->fragsize;
578 	}
579 
580 	pw_log_info("[%s] maxlength:%u fragsize:%u minfrag:%u latency:%u",
581 			s->client->name, attr->maxlength, attr->fragsize, minfrag,
582 			latency);
583 
584 	return latency / frame_size;
585 }
586 
reply_create_record_stream(struct stream * stream,struct pw_manager_object * peer)587 static int reply_create_record_stream(struct stream *stream, struct pw_manager_object *peer)
588 {
589 	struct client *client = stream->client;
590 	struct pw_manager *manager = client->manager;
591 	struct message *reply;
592 	struct spa_dict_item items[3];
593 	char latency[32], *tmp;
594 	char attr_maxlength[32];
595 	char attr_fragsize[32];
596 	const char *peer_name, *name;
597 	uint32_t peer_id;
598 	struct spa_fraction lat;
599 	uint64_t lat_usec;
600 	struct defs *defs = &stream->impl->defs;
601 
602 	lat.denom = stream->ss.rate;
603 	lat.num = fix_record_buffer_attr(stream, &stream->attr);
604 
605 	stream->buffer = calloc(1, stream->attr.maxlength);
606 	if (stream->buffer == NULL)
607 		return -errno;
608 
609 	if (lat.num * defs->min_quantum.denom / lat.denom < defs->min_quantum.num)
610 		lat.num = (defs->min_quantum.num * lat.denom +
611 				(defs->min_quantum.denom -1)) / defs->min_quantum.denom;
612 	lat_usec = lat.num * SPA_USEC_PER_SEC / lat.denom;
613 
614 	snprintf(latency, sizeof(latency), "%u/%u", lat.num, lat.denom);
615 
616 	snprintf(attr_maxlength, sizeof(attr_maxlength), "%u", stream->attr.maxlength);
617 	snprintf(attr_fragsize, sizeof(attr_fragsize), "%u", stream->attr.fragsize);
618 
619 	items[0] = SPA_DICT_ITEM_INIT(PW_KEY_NODE_LATENCY, latency);
620 	items[1] = SPA_DICT_ITEM_INIT("pulse.attr.maxlength", attr_maxlength);
621 	items[2] = SPA_DICT_ITEM_INIT("pulse.attr.fragsize", attr_fragsize);
622 	pw_stream_update_properties(stream->stream,
623 			&SPA_DICT_INIT(items, 3));
624 
625 	pw_log_info("[%s] reply CREATE_RECORD_STREAM tag:%u latency:%s",
626 			client->name, stream->create_tag, latency);
627 
628 	reply = reply_new(client, stream->create_tag);
629 	message_put(reply,
630 		TAG_U32, stream->channel,	/* stream index/channel */
631 		TAG_U32, stream->id,		/* source_output/stream index */
632 		TAG_INVALID);
633 
634 	if (peer && pw_manager_object_is_sink_input(peer))
635 		peer = find_linked(manager, peer->id, PW_DIRECTION_OUTPUT);
636 	if (peer && pw_manager_object_is_source_or_monitor(peer)) {
637 		name = pw_properties_get(peer->props, PW_KEY_NODE_NAME);
638 		if (!pw_manager_object_is_source(peer)) {
639 			size_t len = (name ? strlen(name) : 5) + 10;
640 			peer_id = peer->id | MONITOR_FLAG;
641 			peer_name = tmp = alloca(len);
642 			snprintf(tmp, len, "%s.monitor", name ? name : "sink");
643 		} else {
644 			peer_id = peer->id;
645 			peer_name = name;
646 		}
647 	} else {
648 		peer_id = SPA_ID_INVALID;
649 		peer_name = NULL;
650 	}
651 
652 	if (client->version >= 9) {
653 		message_put(reply,
654 			TAG_U32, stream->attr.maxlength,
655 			TAG_U32, stream->attr.fragsize,
656 			TAG_INVALID);
657 	}
658 	if (client->version >= 12) {
659 		message_put(reply,
660 			TAG_SAMPLE_SPEC, &stream->ss,
661 			TAG_CHANNEL_MAP, &stream->map,
662 			TAG_U32, peer_id,		/* source index */
663 			TAG_STRING, peer_name,		/* source name */
664 			TAG_BOOLEAN, false,		/* source suspended state */
665 			TAG_INVALID);
666 	}
667 	if (client->version >= 13) {
668 		message_put(reply,
669 			TAG_USEC, lat_usec,		/* source configured latency */
670 			TAG_INVALID);
671 	}
672 	if (client->version >= 22) {
673 		struct format_info info;
674 		spa_zero(info);
675 		info.encoding = ENCODING_PCM;
676 		message_put(reply,
677 			TAG_FORMAT_INFO, &info,		/* source_output format */
678 			TAG_INVALID);
679 	}
680 
681 	stream->create_tag = SPA_ID_INVALID;
682 
683 	return client_queue_message(client, reply);
684 }
685 
reply_create_stream(struct stream * stream,struct pw_manager_object * peer)686 static int reply_create_stream(struct stream *stream, struct pw_manager_object *peer)
687 {
688 	return stream->direction == PW_DIRECTION_OUTPUT ?
689 			reply_create_playback_stream(stream, peer) :
690 			reply_create_record_stream(stream, peer);
691 }
692 
manager_added(void * data,struct pw_manager_object * o)693 static void manager_added(void *data, struct pw_manager_object *o)
694 {
695 	struct client *client = data;
696 	struct pw_manager *manager = client->manager;
697 	const char *str;
698 
699 	register_object_message_handlers(o);
700 
701 	if (strcmp(o->type, PW_TYPE_INTERFACE_Core) == 0 && manager->info != NULL) {
702 		struct pw_core_info *info = manager->info;
703 		if (info->props) {
704 			if ((str = spa_dict_lookup(info->props, "default.clock.rate")) != NULL)
705 				client->impl->defs.sample_spec.rate = atoi(str);
706 			if ((str = spa_dict_lookup(info->props, "default.clock.max-quantum")) != NULL)
707 				client->impl->defs.max_quantum = atoi(str);
708 		}
709 	}
710 
711 	if (spa_streq(o->type, PW_TYPE_INTERFACE_Metadata)) {
712 		if (o->props != NULL &&
713 		    (str = pw_properties_get(o->props, PW_KEY_METADATA_NAME)) != NULL)
714 			handle_metadata(client, NULL, o, str);
715 	}
716 
717 	if (spa_streq(o->type, PW_TYPE_INTERFACE_Link)) {
718 		struct stream *s;
719 		struct pw_manager_object *peer = NULL;
720 		spa_list_for_each(s, &client->pending_streams, link) {
721 			peer = find_linked(manager, s->id, s->direction);
722 			if (peer)
723 				break;
724 		}
725 		if (peer) {
726 			reply_create_stream(s, peer);
727 			spa_list_remove(&s->link);
728 			s->pending = false;
729 		}
730 	}
731 
732 	send_object_event(client, o, SUBSCRIPTION_EVENT_NEW);
733 
734 	/* Adding sinks etc. may also change defaults */
735 	send_default_change_subscribe_event(client, pw_manager_object_is_sink(o), pw_manager_object_is_source_or_monitor(o));
736 }
737 
manager_updated(void * data,struct pw_manager_object * o)738 static void manager_updated(void *data, struct pw_manager_object *o)
739 {
740 	struct client *client = data;
741 
742 	send_object_event(client, o, SUBSCRIPTION_EVENT_CHANGE);
743 
744 	send_latency_offset_subscribe_event(client, o);
745 	send_default_change_subscribe_event(client, pw_manager_object_is_sink(o), pw_manager_object_is_source_or_monitor(o));
746 }
747 
manager_removed(void * data,struct pw_manager_object * o)748 static void manager_removed(void *data, struct pw_manager_object *o)
749 {
750 	struct client *client = data;
751 	const char *str;
752 
753 	send_object_event(client, o, SUBSCRIPTION_EVENT_REMOVE);
754 
755 	send_default_change_subscribe_event(client, pw_manager_object_is_sink(o), pw_manager_object_is_source_or_monitor(o));
756 
757 	if (spa_streq(o->type, PW_TYPE_INTERFACE_Metadata)) {
758 		if (o->props != NULL &&
759 		    (str = pw_properties_get(o->props, PW_KEY_METADATA_NAME)) != NULL)
760 			handle_metadata(client, o, NULL, str);
761 	}
762 }
763 
json_object_find(const char * obj,const char * key,char * value,size_t len)764 static int json_object_find(const char *obj, const char *key, char *value, size_t len)
765 {
766 	struct spa_json it[2];
767 	const char *v;
768 	char k[128];
769 
770 	spa_json_init(&it[0], obj, strlen(obj));
771 	if (spa_json_enter_object(&it[0], &it[1]) <= 0)
772 		return -EINVAL;
773 
774 	while (spa_json_get_string(&it[1], k, sizeof(k)) > 0) {
775 		if (spa_streq(k, key)) {
776 			if (spa_json_get_string(&it[1], value, len) <= 0)
777 				continue;
778 			return 0;
779 		} else {
780 			if (spa_json_next(&it[1], &v) <= 0)
781 				break;
782 		}
783 	}
784 	return -ENOENT;
785 }
786 
manager_metadata(void * data,struct pw_manager_object * o,uint32_t subject,const char * key,const char * type,const char * value)787 static void manager_metadata(void *data, struct pw_manager_object *o,
788 		uint32_t subject, const char *key, const char *type, const char *value)
789 {
790 	struct client *client = data;
791 	bool changed = false;
792 
793 	pw_log_debug("meta id:%d subject:%d key:%s type:%s value:%s",
794 			o->id, subject, key, type, value);
795 
796 	if (subject == PW_ID_CORE && o == client->metadata_default) {
797 		char name[1024];
798 
799 		if (key == NULL || spa_streq(key, "default.audio.sink")) {
800 			if (value != NULL) {
801 				if (json_object_find(value,
802 						"name", name, sizeof(name)) < 0)
803 					value = NULL;
804 				else
805 					value = name;
806 			}
807 			if ((changed = !spa_streq(client->default_sink, value))) {
808 				free(client->default_sink);
809 				client->default_sink = value ? strdup(value) : NULL;
810 			}
811 		}
812 		if (key == NULL || spa_streq(key, "default.audio.source")) {
813 			if (value != NULL) {
814 				if (json_object_find(value,
815 						"name", name, sizeof(name)) < 0)
816 					value = NULL;
817 				else
818 					value = name;
819 			}
820 			if ((changed = !spa_streq(client->default_source, value))) {
821 				free(client->default_source);
822 				client->default_source = value ? strdup(value) : NULL;
823 			}
824 		}
825 		if (changed)
826 			send_default_change_subscribe_event(client, true, true);
827 	}
828 	if (subject == PW_ID_CORE && o == client->metadata_routes) {
829 		if (key == NULL)
830 			pw_properties_clear(client->routes);
831 		else
832 			pw_properties_set(client->routes, key, value);
833 	}
834 }
835 
836 
do_free_client(void * obj,void * data,int res,uint32_t id)837 static void do_free_client(void *obj, void *data, int res, uint32_t id)
838 {
839 	struct client *client = data;
840 	client_free(client);
841 }
842 
manager_disconnect(void * data)843 static void manager_disconnect(void *data)
844 {
845 	struct client *client = data;
846 	pw_log_debug("manager_disconnect()");
847 	pw_work_queue_add(client->impl->work_queue, NULL, 0,
848 				do_free_client, client);
849 }
850 
851 static const struct pw_manager_events manager_events = {
852 	PW_VERSION_MANAGER_EVENTS,
853 	.sync = manager_sync,
854 	.added = manager_added,
855 	.updated = manager_updated,
856 	.removed = manager_removed,
857 	.metadata = manager_metadata,
858 	.disconnect = manager_disconnect
859 };
860 
do_set_client_name(struct client * client,uint32_t command,uint32_t tag,struct message * m)861 static int do_set_client_name(struct client *client, uint32_t command, uint32_t tag, struct message *m)
862 {
863 	struct impl *impl = client->impl;
864 	const char *name = NULL;
865 	int res = 0, changed = 0;
866 
867 	if (client->version < 13) {
868 		if (message_get(m,
869 				TAG_STRING, &name,
870 				TAG_INVALID) < 0)
871 			return -EPROTO;
872 		if (name)
873 			changed += pw_properties_set(client->props,
874 					PW_KEY_APP_NAME, name);
875 	} else {
876 		if (message_get(m,
877 				TAG_PROPLIST, client->props,
878 				TAG_INVALID) < 0)
879 			return -EPROTO;
880 		changed++;
881 	}
882 
883 	client->name = pw_properties_get(client->props, PW_KEY_APP_NAME);
884 	pw_log_info("[%s] %s tag:%d", client->name,
885 			commands[command].name, tag);
886 
887 	if (client->core == NULL) {
888 		client->core = pw_context_connect(impl->context,
889 				pw_properties_copy(client->props), 0);
890 		if (client->core == NULL) {
891 			res = -errno;
892 			goto error;
893 		}
894 		client->manager = pw_manager_new(client->core);
895 		if (client->manager == NULL) {
896 			res = -errno;
897 			goto error;
898 		}
899 		client->connect_tag = tag;
900 		pw_manager_add_listener(client->manager, &client->manager_listener,
901 				&manager_events, client);
902 	} else {
903 		if (changed)
904 			pw_core_update_properties(client->core, &client->props->dict);
905 
906 		if (client->connect_tag == SPA_ID_INVALID)
907 			res = reply_set_client_name(client, tag);
908 	}
909 
910 	client_update_quirks(client);
911 
912 	return res;
913 error:
914 	pw_log_error("%p: failed to connect client: %s", impl, spa_strerror(res));
915 	return res;
916 
917 }
918 
do_subscribe(struct client * client,uint32_t command,uint32_t tag,struct message * m)919 static int do_subscribe(struct client *client, uint32_t command, uint32_t tag, struct message *m)
920 {
921 	uint32_t mask;
922 
923 	if (message_get(m,
924 			TAG_U32, &mask,
925 			TAG_INVALID) < 0)
926 		return -EPROTO;
927 
928 	pw_log_info("[%s] SUBSCRIBE tag:%u mask:%08x",
929 			client->name, tag, mask);
930 
931 	client->subscribed = mask;
932 
933 	return reply_simple_ack(client, tag);
934 }
935 
stream_control_info(void * data,uint32_t id,const struct pw_stream_control * control)936 static void stream_control_info(void *data, uint32_t id,
937 		const struct pw_stream_control *control)
938 {
939 	struct stream *stream = data;
940 
941 	switch (id) {
942 	case SPA_PROP_channelVolumes:
943 		stream->volume.channels = control->n_values;
944 		memcpy(stream->volume.values, control->values, control->n_values * sizeof(float));
945 		pw_log_info("stream %p: volume changed %f", stream, stream->volume.values[0]);
946 		break;
947 	case SPA_PROP_mute:
948 		stream->muted = control->values[0] >= 0.5;
949 		pw_log_info("stream %p: mute changed %d", stream, stream->muted);
950 		break;
951 	}
952 }
953 
on_stream_cleanup(void * obj,void * data,int res,uint32_t id)954 static void on_stream_cleanup(void *obj, void *data, int res, uint32_t id)
955 {
956 	struct stream *stream = obj;
957 	struct client *client = stream->client;
958 	stream_free(stream);
959 	if (client->ref <= 0)
960 		client_free(client);
961 }
962 
stream_state_changed(void * data,enum pw_stream_state old,enum pw_stream_state state,const char * error)963 static void stream_state_changed(void *data, enum pw_stream_state old,
964 		enum pw_stream_state state, const char *error)
965 {
966 	struct stream *stream = data;
967 	struct client *client = stream->client;
968 	struct impl *impl = client->impl;
969 
970 	switch (state) {
971 	case PW_STREAM_STATE_ERROR:
972 		reply_error(client, -1, stream->create_tag, -EIO);
973 		stream->done = true;
974 		break;
975 	case PW_STREAM_STATE_UNCONNECTED:
976 		if (stream->create_tag != SPA_ID_INVALID)
977 			reply_error(client, -1, stream->create_tag, -ENOENT);
978 		else if (!client->disconnecting)
979 			stream->killed = true;
980 		stream->done = true;
981 		break;
982 	case PW_STREAM_STATE_CONNECTING:
983 	case PW_STREAM_STATE_PAUSED:
984 	case PW_STREAM_STATE_STREAMING:
985 		break;
986 	}
987 	if (stream->done) {
988 		pw_work_queue_add(impl->work_queue, stream, 0,
989 				on_stream_cleanup, client);
990 	}
991 }
992 
get_buffers_param(struct stream * s,struct buffer_attr * attr,struct spa_pod_builder * b)993 static const struct spa_pod *get_buffers_param(struct stream *s,
994 		struct buffer_attr *attr, struct spa_pod_builder *b)
995 {
996 	const struct spa_pod *param;
997 	uint32_t blocks, buffers, size, maxsize, stride;
998 
999 	blocks = 1;
1000 	stride = s->frame_size;
1001 
1002 	maxsize = 8192 * 32 * s->frame_size;
1003 	if (s->direction == PW_DIRECTION_OUTPUT) {
1004 		size = attr->minreq;
1005 	} else {
1006 		size = attr->fragsize;
1007 	}
1008 	buffers = SPA_CLAMP(maxsize / size, MIN_BUFFERS, MAX_BUFFERS);
1009 
1010 	pw_log_info("[%s] stride %d maxsize %d size %u buffers %d", s->client->name,
1011 			stride, maxsize, size, buffers);
1012 
1013 	param = spa_pod_builder_add_object(b,
1014 			SPA_TYPE_OBJECT_ParamBuffers, SPA_PARAM_Buffers,
1015 			SPA_PARAM_BUFFERS_buffers, SPA_POD_CHOICE_RANGE_Int(buffers,
1016 				MIN_BUFFERS, MAX_BUFFERS),
1017 			SPA_PARAM_BUFFERS_blocks,  SPA_POD_Int(blocks),
1018 			SPA_PARAM_BUFFERS_size,    SPA_POD_CHOICE_RANGE_Int(
1019 								size, size, maxsize),
1020 			SPA_PARAM_BUFFERS_stride,  SPA_POD_Int(stride));
1021 	return param;
1022 }
1023 
stream_param_changed(void * data,uint32_t id,const struct spa_pod * param)1024 static void stream_param_changed(void *data, uint32_t id, const struct spa_pod *param)
1025 {
1026 	struct stream *stream = data;
1027 	const struct spa_pod *params[4];
1028 	uint32_t n_params = 0;
1029 	uint8_t buffer[4096];
1030 	struct spa_pod_builder b = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer));
1031 	int res;
1032 
1033 	if (id != SPA_PARAM_Format || param == NULL)
1034 		return;
1035 
1036 	if ((res = format_parse_param(param, &stream->ss, &stream->map, NULL, NULL)) < 0) {
1037 		pw_stream_set_error(stream->stream, res, "format not supported");
1038 		return;
1039 	}
1040 
1041 	pw_log_info("[%s] got format:%s rate:%u channels:%u", stream->client->name,
1042 			format_id2name(stream->ss.format),
1043 			stream->ss.rate, stream->ss.channels);
1044 
1045 	stream->frame_size = sample_spec_frame_size(&stream->ss);
1046 	if (stream->frame_size == 0) {
1047 		pw_stream_set_error(stream->stream, res, "format not supported");
1048 		return;
1049 	}
1050 	stream->rate = stream->ss.rate;
1051 
1052 	if (stream->create_tag != SPA_ID_INVALID) {
1053 		struct pw_manager_object *peer;
1054 		stream->id = pw_stream_get_node_id(stream->stream);
1055 
1056 		if (stream->volume_set) {
1057 			pw_stream_set_control(stream->stream,
1058 				SPA_PROP_channelVolumes, stream->volume.channels, stream->volume.values, 0);
1059 		}
1060 		if (stream->muted_set) {
1061 			float val = stream->muted ? 1.0f : 0.0f;
1062 			pw_stream_set_control(stream->stream,
1063 				SPA_PROP_mute, 1, &val, 0);
1064 		}
1065 		if (stream->corked)
1066 			pw_stream_set_active(stream->stream, false);
1067 
1068 		/* if peer exists, reply immediately, otherwise reply when the link is created */
1069 		peer = find_linked(stream->client->manager, stream->id, stream->direction);
1070 		if (peer) {
1071 			reply_create_stream(stream, peer);
1072 		} else {
1073 			spa_list_append(&stream->client->pending_streams, &stream->link);
1074 			stream->pending = true;
1075 		}
1076 	}
1077 
1078 	params[n_params++] = get_buffers_param(stream, &stream->attr, &b);
1079 	pw_stream_update_params(stream->stream, params, n_params);
1080 }
1081 
stream_io_changed(void * data,uint32_t id,void * area,uint32_t size)1082 static void stream_io_changed(void *data, uint32_t id, void *area, uint32_t size)
1083 {
1084 	struct stream *stream = data;
1085 	switch (id) {
1086 	case SPA_IO_RateMatch:
1087 		stream->rate_match = area;
1088 		break;
1089 	case SPA_IO_Position:
1090 		stream->position = area;
1091 		break;
1092 	}
1093 }
1094 
1095 struct process_data {
1096 	struct pw_time pwt;
1097 	uint32_t read_inc;
1098 	uint32_t write_inc;
1099 	uint32_t underrun_for;
1100 	uint32_t playing_for;
1101 	uint32_t missing;
1102 	uint32_t minreq;
1103 	uint32_t quantum;
1104 	unsigned int underrun:1;
1105 };
1106 
1107 static int
do_process_done(struct spa_loop * loop,bool async,uint32_t seq,const void * data,size_t size,void * user_data)1108 do_process_done(struct spa_loop *loop,
1109 		bool async, uint32_t seq, const void *data, size_t size, void *user_data)
1110 {
1111 	struct stream *stream = user_data;
1112 	struct client *client = stream->client;
1113 	struct impl *impl = client->impl;
1114 	const struct process_data *pd = data;
1115 	uint32_t index, towrite;
1116 	int32_t avail;
1117 
1118 	stream->timestamp = pd->pwt.now;
1119 	if (pd->pwt.rate.denom > 0)
1120 		stream->delay = pd->pwt.delay * SPA_USEC_PER_SEC / pd->pwt.rate.denom;
1121 	else
1122 		stream->delay = 0;
1123 
1124 	if (stream->direction == PW_DIRECTION_OUTPUT) {
1125 		if (pd->quantum != stream->last_quantum)
1126 			stream_update_minreq(stream, pd->minreq);
1127 		stream->last_quantum = pd->quantum;
1128 
1129 		stream->read_index += pd->read_inc;
1130 		if (stream->corked) {
1131 			if (stream->underrun_for != (uint64_t)-1)
1132 				stream->underrun_for += pd->underrun_for;
1133 			stream->playing_for = 0;
1134 			return 0;
1135 		}
1136 		if (pd->underrun != stream->is_underrun) {
1137 			stream->is_underrun = pd->underrun;
1138 			stream->underrun_for = 0;
1139 			stream->playing_for = 0;
1140 			if (pd->underrun)
1141 				stream_send_underflow(stream, stream->read_index, pd->underrun_for);
1142 			else
1143 				stream_send_started(stream);
1144 		}
1145 		stream->missing += pd->missing;
1146 		stream->missing = SPA_MIN(stream->missing, (int64_t)stream->attr.tlength);
1147 		stream->playing_for += pd->playing_for;
1148 		if (stream->underrun_for != (uint64_t)-1)
1149 			stream->underrun_for += pd->underrun_for;
1150 
1151 		stream_send_request(stream);
1152 	} else {
1153 		struct message *msg;
1154 		stream->write_index += pd->write_inc;
1155 
1156 		avail = spa_ringbuffer_get_read_index(&stream->ring, &index);
1157 
1158 		if (!spa_list_is_empty(&client->out_messages)) {
1159 			pw_log_debug("%p: [%s] pending read:%u avail:%d",
1160 					stream, client->name, index, avail);
1161 			return 0;
1162 		}
1163 
1164 		if (avail <= 0) {
1165 			/* underrun, can't really happen but if it does we
1166 			 * do nothing and wait for more data */
1167 			pw_log_warn("%p: [%s] underrun read:%u avail:%d",
1168 					stream, client->name, index, avail);
1169 		} else {
1170 			if (avail > (int32_t)stream->attr.maxlength) {
1171 				uint32_t skip = avail - stream->attr.fragsize;
1172 				/* overrun, catch up to latest fragment and send it */
1173 				pw_log_warn("%p: [%s] overrun recover read:%u avail:%d max:%u skip:%u",
1174 					stream, client->name, index, avail, stream->attr.maxlength, skip);
1175 				index += skip;
1176 				stream->read_index += skip;
1177 				avail = stream->attr.fragsize;
1178 			}
1179 
1180 			while (avail > 0) {
1181 				towrite = avail;
1182 				if (towrite > stream->attr.fragsize)
1183 					towrite = stream->attr.fragsize;
1184 
1185 				msg = message_alloc(impl, stream->channel, towrite);
1186 				if (msg == NULL)
1187 					return -errno;
1188 
1189 				spa_ringbuffer_read_data(&stream->ring,
1190 						stream->buffer, stream->attr.maxlength,
1191 						index % stream->attr.maxlength,
1192 						msg->data, towrite);
1193 
1194 				client_queue_message(client, msg);
1195 
1196 				index += towrite;
1197 				avail -= towrite;
1198 				stream->read_index += towrite;
1199 			}
1200 			spa_ringbuffer_read_update(&stream->ring, index);
1201 		}
1202 	}
1203 	return 0;
1204 }
1205 
1206 
stream_process(void * data)1207 static void stream_process(void *data)
1208 {
1209 	struct stream *stream = data;
1210 	struct client *client = stream->client;
1211 	struct impl *impl = stream->impl;
1212 	void *p;
1213 	struct pw_buffer *buffer;
1214 	struct spa_buffer *buf;
1215 	uint32_t size, minreq = 0, index;
1216 	struct process_data pd;
1217 	bool do_flush = false;
1218 
1219 	if (stream->create_tag != SPA_ID_INVALID)
1220 		return;
1221 
1222 	pw_log_trace_fp("%p: process", stream);
1223 	buffer = pw_stream_dequeue_buffer(stream->stream);
1224 	if (buffer == NULL)
1225 		return;
1226 
1227 	buf = buffer->buffer;
1228 	if ((p = buf->datas[0].data) == NULL)
1229 		return;
1230 
1231 	spa_zero(pd);
1232 
1233 	if (stream->direction == PW_DIRECTION_OUTPUT) {
1234 		int32_t avail = spa_ringbuffer_get_read_index(&stream->ring, &index);
1235 
1236 		if (stream->rate_match)
1237 			minreq = stream->rate_match->size * stream->frame_size;
1238 		if (minreq == 0)
1239 			minreq = stream->attr.minreq;
1240 
1241 		pd.minreq = minreq;
1242 		pd.quantum = stream->position ? stream->position->clock.duration : minreq;
1243 
1244 		if (avail < (int32_t)minreq || stream->corked) {
1245 			/* underrun, produce a silence buffer */
1246 			size = SPA_MIN(buf->datas[0].maxsize, minreq);
1247 			memset(p, 0, size);
1248 
1249 			if (stream->draining) {
1250 				stream->draining = false;
1251 				do_flush = true;
1252 			} else {
1253 				pd.underrun_for = size;
1254 				pd.underrun = true;
1255 			}
1256 			if ((stream->attr.prebuf == 0 || do_flush) && !stream->corked) {
1257 				pd.missing = size;
1258 				pd.playing_for = size;
1259 				if (avail > 0) {
1260 					spa_ringbuffer_read_data(&stream->ring,
1261 						stream->buffer, stream->attr.maxlength,
1262 						index % stream->attr.maxlength,
1263 						p, avail);
1264 					index += avail;
1265 					pd.read_inc = avail;
1266 				}
1267 				spa_ringbuffer_read_update(&stream->ring, index);
1268 			}
1269 			pw_log_debug("%p: [%s] underrun read:%u avail:%d max:%u",
1270 					stream, client->name, index, avail, minreq);
1271 		} else {
1272 			if (avail > (int32_t)stream->attr.maxlength) {
1273 				uint32_t skip = avail - stream->attr.maxlength;
1274 				/* overrun, reported by other side, here we skip
1275 				 * ahead to the oldest data. */
1276 				pw_log_debug("%p: [%s] overrun read:%u avail:%d max:%u skip:%u",
1277 						stream, client->name, index, avail,
1278 						stream->attr.maxlength, skip);
1279 				index += skip;
1280 				pd.read_inc = skip;
1281 				avail = stream->attr.maxlength;
1282 			}
1283 			size = SPA_MIN(buf->datas[0].maxsize, (uint32_t)avail);
1284 			size = SPA_MIN(size, minreq);
1285 
1286 			spa_ringbuffer_read_data(&stream->ring,
1287 					stream->buffer, stream->attr.maxlength,
1288 					index % stream->attr.maxlength,
1289 					p, size);
1290 
1291 			index += size;
1292 			pd.read_inc += size;
1293 			spa_ringbuffer_read_update(&stream->ring, index);
1294 
1295 			pd.playing_for = size;
1296 			pd.missing = size;
1297 			pd.underrun = false;
1298 		}
1299 		buf->datas[0].chunk->offset = 0;
1300 		buf->datas[0].chunk->stride = stream->frame_size;
1301 		buf->datas[0].chunk->size = size;
1302 		buffer->size = size / stream->frame_size;
1303 	} else  {
1304 		int32_t filled = spa_ringbuffer_get_write_index(&stream->ring, &index);
1305 		size = buf->datas[0].chunk->size;
1306 		if (filled < 0) {
1307 			/* underrun, can't really happen because we never read more
1308 			 * than what's available on the other side  */
1309 			pw_log_warn("%p: [%s] underrun write:%u filled:%d",
1310 					stream, client->name, index, filled);
1311 		} else if ((uint32_t)filled + size > stream->attr.maxlength) {
1312 			/* overrun, can happen when the other side is not
1313 			 * reading fast enough. We still write our data into the
1314 			 * ringbuffer and expect the other side to warn and catch up. */
1315 			pw_log_debug("%p: [%s] overrun write:%u filled:%d size:%u max:%u",
1316 					stream, client->name, index, filled,
1317 					size, stream->attr.maxlength);
1318 		}
1319 
1320 		spa_ringbuffer_write_data(&stream->ring,
1321 				stream->buffer, stream->attr.maxlength,
1322 				index % stream->attr.maxlength,
1323 				SPA_PTROFF(p, buf->datas[0].chunk->offset, void),
1324 				SPA_MIN(size, stream->attr.maxlength));
1325 
1326 		index += size;
1327 		pd.write_inc = size;
1328 		spa_ringbuffer_write_update(&stream->ring, index);
1329 	}
1330 	pw_stream_queue_buffer(stream->stream, buffer);
1331 
1332 	if (do_flush)
1333 		pw_stream_flush(stream->stream, true);
1334 
1335 	pw_stream_get_time(stream->stream, &pd.pwt);
1336 
1337 	pw_loop_invoke(impl->loop,
1338 			do_process_done, 1, &pd, sizeof(pd), false, stream);
1339 }
1340 
stream_drained(void * data)1341 static void stream_drained(void *data)
1342 {
1343 	struct stream *stream = data;
1344 	if (stream->drain_tag != 0) {
1345 		pw_log_info("[%s] drained channel:%u tag:%d",
1346 				stream->client->name, stream->channel,
1347 				stream->drain_tag);
1348 		reply_simple_ack(stream->client, stream->drain_tag);
1349 		stream->drain_tag = 0;
1350 
1351 		pw_stream_set_active(stream->stream, true);
1352 	}
1353 }
1354 
1355 static const struct pw_stream_events stream_events =
1356 {
1357 	PW_VERSION_STREAM_EVENTS,
1358 	.control_info = stream_control_info,
1359 	.state_changed = stream_state_changed,
1360 	.param_changed = stream_param_changed,
1361 	.io_changed = stream_io_changed,
1362 	.process = stream_process,
1363 	.drained = stream_drained,
1364 };
1365 
log_format_info(struct impl * impl,enum spa_log_level level,struct format_info * format)1366 static void log_format_info(struct impl *impl, enum spa_log_level level, struct format_info *format)
1367 {
1368 	const struct spa_dict_item *it;
1369 	pw_logt(level, mod_topic, "%p: format %s",
1370 			impl, format_encoding2name(format->encoding));
1371 	spa_dict_for_each(it, &format->props->dict)
1372 		pw_logt(level, mod_topic, "%p:  '%s': '%s'",
1373 				impl, it->key, it->value);
1374 }
1375 
do_create_playback_stream(struct client * client,uint32_t command,uint32_t tag,struct message * m)1376 static int do_create_playback_stream(struct client *client, uint32_t command, uint32_t tag, struct message *m)
1377 {
1378 	struct impl *impl = client->impl;
1379 	const char *name = NULL;
1380 	int res;
1381 	struct sample_spec ss;
1382 	struct channel_map map;
1383 	uint32_t sink_index, syncid, rate = 0;
1384 	const char *sink_name;
1385 	struct buffer_attr attr = { 0 };
1386 	bool corked = false,
1387 		no_remap = false,
1388 		no_remix = false,
1389 		fix_format = false,
1390 		fix_rate = false,
1391 		fix_channels = false,
1392 		no_move = false,
1393 		variable_rate = false,
1394 		muted = false,
1395 		adjust_latency = false,
1396 		early_requests = false,
1397 		dont_inhibit_auto_suspend = false,
1398 		volume_set = true,
1399 		muted_set = false,
1400 		fail_on_suspend = false,
1401 		relative_volume = false,
1402 		passthrough = false;
1403 	struct volume volume;
1404 	struct pw_properties *props = NULL;
1405 	uint8_t n_formats = 0;
1406 	struct stream *stream = NULL;
1407 	uint32_t n_params = 0, n_valid_formats = 0, flags;
1408 	const struct spa_pod *params[MAX_FORMATS];
1409 	uint8_t buffer[4096];
1410 	struct spa_pod_builder b = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer));
1411 
1412 	props = pw_properties_copy(client->props);
1413 	if (props == NULL)
1414 		goto error_errno;
1415 
1416 	if (client->version < 13) {
1417 		if ((res = message_get(m,
1418 				TAG_STRING, &name,
1419 				TAG_INVALID)) < 0)
1420 			goto error_protocol;
1421 		if (name == NULL)
1422 			goto error_protocol;
1423 	}
1424 	if (message_get(m,
1425 			TAG_SAMPLE_SPEC, &ss,
1426 			TAG_CHANNEL_MAP, &map,
1427 			TAG_U32, &sink_index,
1428 			TAG_STRING, &sink_name,
1429 			TAG_U32, &attr.maxlength,
1430 			TAG_BOOLEAN, &corked,
1431 			TAG_U32, &attr.tlength,
1432 			TAG_U32, &attr.prebuf,
1433 			TAG_U32, &attr.minreq,
1434 			TAG_U32, &syncid,
1435 			TAG_CVOLUME, &volume,
1436 			TAG_INVALID) < 0)
1437 		goto error_protocol;
1438 
1439 	pw_log_info("[%s] CREATE_PLAYBACK_STREAM tag:%u corked:%u sink-name:%s sink-idx:%u",
1440 			client->name, tag, corked, sink_name, sink_index);
1441 
1442 	if (sink_index != SPA_ID_INVALID && sink_name != NULL)
1443 		goto error_invalid;
1444 
1445 	if (client->version >= 12) {
1446 		if (message_get(m,
1447 				TAG_BOOLEAN, &no_remap,
1448 				TAG_BOOLEAN, &no_remix,
1449 				TAG_BOOLEAN, &fix_format,
1450 				TAG_BOOLEAN, &fix_rate,
1451 				TAG_BOOLEAN, &fix_channels,
1452 				TAG_BOOLEAN, &no_move,
1453 				TAG_BOOLEAN, &variable_rate,
1454 				TAG_INVALID) < 0)
1455 			goto error_protocol;
1456 	}
1457 	if (client->version >= 13) {
1458 		if (message_get(m,
1459 				TAG_BOOLEAN, &muted,
1460 				TAG_BOOLEAN, &adjust_latency,
1461 				TAG_PROPLIST, props,
1462 				TAG_INVALID) < 0)
1463 			goto error_protocol;
1464 	}
1465 	if (client->version >= 14) {
1466 		if (message_get(m,
1467 				TAG_BOOLEAN, &volume_set,
1468 				TAG_BOOLEAN, &early_requests,
1469 				TAG_INVALID) < 0)
1470 			goto error_protocol;
1471 	}
1472 	if (client->version >= 15) {
1473 		if (message_get(m,
1474 				TAG_BOOLEAN, &muted_set,
1475 				TAG_BOOLEAN, &dont_inhibit_auto_suspend,
1476 				TAG_BOOLEAN, &fail_on_suspend,
1477 				TAG_INVALID) < 0)
1478 			goto error_protocol;
1479 	}
1480 	if (client->version >= 17) {
1481 		if (message_get(m,
1482 				TAG_BOOLEAN, &relative_volume,
1483 				TAG_INVALID) < 0)
1484 			goto error_protocol;
1485 	}
1486 	if (client->version >= 18) {
1487 		if (message_get(m,
1488 				TAG_BOOLEAN, &passthrough,
1489 				TAG_INVALID) < 0)
1490 			goto error_protocol;
1491 	}
1492 
1493 	if (client->version >= 21) {
1494 		if (message_get(m,
1495 				TAG_U8, &n_formats,
1496 				TAG_INVALID) < 0)
1497 			goto error_protocol;
1498 
1499 		if (n_formats) {
1500 			uint8_t i;
1501 			for (i = 0; i < n_formats; i++) {
1502 				struct format_info format;
1503 				uint32_t r;
1504 
1505 				if (message_get(m,
1506 						TAG_FORMAT_INFO, &format,
1507 						TAG_INVALID) < 0)
1508 					goto error_protocol;
1509 
1510 				if (n_params < MAX_FORMATS &&
1511 				    (params[n_params] = format_info_build_param(&b,
1512 						SPA_PARAM_EnumFormat, &format, &r)) != NULL) {
1513 					n_params++;
1514 					n_valid_formats++;
1515 					if (r > rate)
1516 						rate = r;
1517 				} else {
1518 					log_format_info(impl, SPA_LOG_LEVEL_WARN, &format);
1519 				}
1520 				format_info_clear(&format);
1521 			}
1522 		}
1523 	}
1524 	if (sample_spec_valid(&ss)) {
1525 		if (fix_format || fix_rate || fix_channels) {
1526 			struct sample_spec sfix = ss;
1527 			if (fix_format)
1528 				sfix.format = SPA_AUDIO_FORMAT_UNKNOWN;
1529 			if (fix_rate)
1530 				sfix.rate = 0;
1531 			if (fix_channels)
1532 				sfix.channels = 0;
1533 			if (n_params < MAX_FORMATS &&
1534 			    (params[n_params] = format_build_param(&b,
1535 					SPA_PARAM_EnumFormat, &sfix,
1536 					sfix.channels > 0 ? &map : NULL)) != NULL) {
1537 				n_params++;
1538 				n_valid_formats++;
1539 			}
1540 		}
1541 		if (n_params < MAX_FORMATS &&
1542 		    (params[n_params] = format_build_param(&b,
1543 				SPA_PARAM_EnumFormat, &ss,
1544 				ss.channels > 0 ? &map : NULL)) != NULL) {
1545 			n_params++;
1546 			n_valid_formats++;
1547 		} else {
1548 			pw_log_warn("%p: unsupported format:%s rate:%d channels:%u",
1549 					impl, format_id2name(ss.format), ss.rate,
1550 					ss.channels);
1551 		}
1552 		rate = ss.rate;
1553 	}
1554 
1555 	if (m->offset != m->length)
1556 		goto error_protocol;
1557 
1558 	if (n_valid_formats == 0)
1559 		goto error_no_formats;
1560 
1561 	stream = stream_new(client, STREAM_TYPE_PLAYBACK, tag, &ss, &map, &attr);
1562 	if (stream == NULL)
1563 		goto error_errno;
1564 
1565 	stream->corked = corked;
1566 	stream->adjust_latency = adjust_latency;
1567 	stream->early_requests = early_requests;
1568 	stream->volume = volume;
1569 	stream->volume_set = volume_set;
1570 	stream->muted = muted;
1571 	stream->muted_set = muted_set;
1572 	stream->is_underrun = true;
1573 	stream->underrun_for = -1;
1574 
1575 	if (rate != 0)
1576 		pw_properties_setf(props, PW_KEY_NODE_RATE, "1/%u", rate);
1577 	if (no_remix)
1578 		pw_properties_set(props, PW_KEY_STREAM_DONT_REMIX, "true");
1579 	flags = 0;
1580 	if (no_move)
1581 		flags |= PW_STREAM_FLAG_DONT_RECONNECT;
1582 
1583 	if (sink_name != NULL)
1584 		pw_properties_set(props,
1585 				PW_KEY_NODE_TARGET, sink_name);
1586 	else if (sink_index != SPA_ID_INVALID && sink_index != 0)
1587 		pw_properties_setf(props,
1588 				PW_KEY_NODE_TARGET, "%u", sink_index);
1589 
1590 	stream->stream = pw_stream_new(client->core, name, props);
1591 	props = NULL;
1592 	if (stream->stream == NULL)
1593 		goto error_errno;
1594 
1595 	pw_log_debug("%p: new stream %p channel:%d passthrough:%d",
1596 			impl, stream, stream->channel, passthrough);
1597 
1598 	pw_stream_add_listener(stream->stream,
1599 			&stream->stream_listener,
1600 			&stream_events, stream);
1601 
1602 	pw_stream_connect(stream->stream,
1603 			PW_DIRECTION_OUTPUT,
1604 			SPA_ID_INVALID,
1605 			flags |
1606 			PW_STREAM_FLAG_AUTOCONNECT |
1607 			PW_STREAM_FLAG_RT_PROCESS |
1608 			PW_STREAM_FLAG_MAP_BUFFERS,
1609 			params, n_params);
1610 
1611 	return 0;
1612 
1613 error_errno:
1614 	res = -errno;
1615 	goto error;
1616 error_protocol:
1617 	res = -EPROTO;
1618 	goto error;
1619 error_no_formats:
1620 	res = -ENOTSUP;
1621 	goto error;
1622 error_invalid:
1623 	res = -EINVAL;
1624 	goto error;
1625 error:
1626 	pw_properties_free(props);
1627 	if (stream)
1628 		stream_free(stream);
1629 	return res;
1630 }
1631 
do_create_record_stream(struct client * client,uint32_t command,uint32_t tag,struct message * m)1632 static int do_create_record_stream(struct client *client, uint32_t command, uint32_t tag, struct message *m)
1633 {
1634 	struct impl *impl = client->impl;
1635 	const char *name = NULL;
1636 	int res;
1637 	struct sample_spec ss;
1638 	struct channel_map map;
1639 	uint32_t source_index;
1640 	const char *source_name;
1641 	struct buffer_attr attr;
1642 	bool corked = false,
1643 		no_remap = false,
1644 		no_remix = false,
1645 		fix_format = false,
1646 		fix_rate = false,
1647 		fix_channels = false,
1648 		no_move = false,
1649 		variable_rate = false,
1650 		peak_detect = false,
1651 		adjust_latency = false,
1652 		early_requests = false,
1653 		dont_inhibit_auto_suspend = false,
1654 		volume_set = true,
1655 		muted = false,
1656 		muted_set = false,
1657 		fail_on_suspend = false,
1658 		relative_volume = false,
1659 		passthrough = false;
1660 	uint32_t direct_on_input_idx = SPA_ID_INVALID;
1661 	struct volume volume = VOLUME_INIT;
1662 	struct pw_properties *props = NULL;
1663 	uint8_t n_formats = 0;
1664 	struct stream *stream = NULL;
1665 	uint32_t n_params = 0, n_valid_formats = 0, flags, id, rate = 0;
1666 	const struct spa_pod *params[MAX_FORMATS];
1667 	uint8_t buffer[4096];
1668 	struct spa_pod_builder b = SPA_POD_BUILDER_INIT(buffer, sizeof(buffer));
1669 
1670 	props = pw_properties_copy(client->props);
1671 	if (props == NULL)
1672 		goto error_errno;
1673 
1674 	if (client->version < 13) {
1675 		if (message_get(m,
1676 				TAG_STRING, &name,
1677 				TAG_INVALID) < 0)
1678 			goto error_protocol;
1679 		if (name == NULL)
1680 			goto error_protocol;
1681 	}
1682 	if (message_get(m,
1683 			TAG_SAMPLE_SPEC, &ss,
1684 			TAG_CHANNEL_MAP, &map,
1685 			TAG_U32, &source_index,
1686 			TAG_STRING, &source_name,
1687 			TAG_U32, &attr.maxlength,
1688 			TAG_BOOLEAN, &corked,
1689 			TAG_U32, &attr.fragsize,
1690 			TAG_INVALID) < 0)
1691 		goto error_protocol;
1692 
1693 	pw_log_info("[%s] CREATE_RECORD_STREAM tag:%u corked:%u source-name:%s source-index:%u",
1694 			client->name, tag, corked, source_name, source_index);
1695 
1696 	if (source_index != SPA_ID_INVALID && source_name != NULL)
1697 		goto error_invalid;
1698 
1699 	if (client->version >= 12) {
1700 		if (message_get(m,
1701 				TAG_BOOLEAN, &no_remap,
1702 				TAG_BOOLEAN, &no_remix,
1703 				TAG_BOOLEAN, &fix_format,
1704 				TAG_BOOLEAN, &fix_rate,
1705 				TAG_BOOLEAN, &fix_channels,
1706 				TAG_BOOLEAN, &no_move,
1707 				TAG_BOOLEAN, &variable_rate,
1708 				TAG_INVALID) < 0)
1709 			goto error_protocol;
1710 	}
1711 	if (client->version >= 13) {
1712 		if (message_get(m,
1713 				TAG_BOOLEAN, &peak_detect,
1714 				TAG_BOOLEAN, &adjust_latency,
1715 				TAG_PROPLIST, props,
1716 				TAG_U32, &direct_on_input_idx,
1717 				TAG_INVALID) < 0)
1718 			goto error_protocol;
1719 	}
1720 	if (client->version >= 14) {
1721 		if (message_get(m,
1722 				TAG_BOOLEAN, &early_requests,
1723 				TAG_INVALID) < 0)
1724 			goto error_protocol;
1725 	}
1726 	if (client->version >= 15) {
1727 		if (message_get(m,
1728 				TAG_BOOLEAN, &dont_inhibit_auto_suspend,
1729 				TAG_BOOLEAN, &fail_on_suspend,
1730 				TAG_INVALID) < 0)
1731 			goto error_protocol;
1732 	}
1733 	if (client->version >= 22) {
1734 		if (message_get(m,
1735 				TAG_U8, &n_formats,
1736 				TAG_INVALID) < 0)
1737 			goto error_protocol;
1738 
1739 		if (n_formats) {
1740 			uint8_t i;
1741 			for (i = 0; i < n_formats; i++) {
1742 				struct format_info format;
1743 				uint32_t r;
1744 
1745 				if (message_get(m,
1746 						TAG_FORMAT_INFO, &format,
1747 						TAG_INVALID) < 0)
1748 					goto error_protocol;
1749 
1750 				if (n_params < MAX_FORMATS &&
1751 				    (params[n_params] = format_info_build_param(&b,
1752 						SPA_PARAM_EnumFormat, &format, &r)) != NULL) {
1753 					n_params++;
1754 					n_valid_formats++;
1755 					if (r > rate)
1756 						rate = r;
1757 				} else {
1758 					log_format_info(impl, SPA_LOG_LEVEL_WARN, &format);
1759 				}
1760 				format_info_clear(&format);
1761 			}
1762 		}
1763 		if (message_get(m,
1764 				TAG_CVOLUME, &volume,
1765 				TAG_BOOLEAN, &muted,
1766 				TAG_BOOLEAN, &volume_set,
1767 				TAG_BOOLEAN, &muted_set,
1768 				TAG_BOOLEAN, &relative_volume,
1769 				TAG_BOOLEAN, &passthrough,
1770 				TAG_INVALID) < 0)
1771 			goto error_protocol;
1772 	} else {
1773 		volume_set = false;
1774 	}
1775 	if (sample_spec_valid(&ss)) {
1776 		if (fix_format || fix_rate || fix_channels) {
1777 			struct sample_spec sfix = ss;
1778 			if (fix_format)
1779 				sfix.format = SPA_AUDIO_FORMAT_UNKNOWN;
1780 			if (fix_rate)
1781 				sfix.rate = 0;
1782 			if (fix_channels)
1783 				sfix.channels = 0;
1784 			if (n_params < MAX_FORMATS &&
1785 			    (params[n_params] = format_build_param(&b,
1786 					SPA_PARAM_EnumFormat, &sfix,
1787 					sfix.channels > 0 ? &map : NULL)) != NULL) {
1788 				n_params++;
1789 				n_valid_formats++;
1790 			}
1791 		}
1792 		if (n_params < MAX_FORMATS &&
1793 		    (params[n_params] = format_build_param(&b,
1794 				SPA_PARAM_EnumFormat, &ss,
1795 				ss.channels > 0 ? &map : NULL)) != NULL) {
1796 			n_params++;
1797 			n_valid_formats++;
1798 		} else {
1799 			pw_log_warn("%p: unsupported format:%s rate:%d channels:%u",
1800 					impl, format_id2name(ss.format), ss.rate,
1801 					ss.channels);
1802 		}
1803 		rate = ss.rate;
1804 	}
1805 	if (m->offset != m->length)
1806 		goto error_protocol;
1807 
1808 	if (n_valid_formats == 0)
1809 		goto error_no_formats;
1810 
1811 	stream = stream_new(client, STREAM_TYPE_RECORD, tag, &ss, &map, &attr);
1812 	if (stream == NULL)
1813 		goto error_errno;
1814 
1815 	stream->corked = corked;
1816 	stream->adjust_latency = adjust_latency;
1817 	stream->early_requests = early_requests;
1818 	stream->volume = volume;
1819 	stream->volume_set = volume_set;
1820 	stream->muted = muted;
1821 	stream->muted_set = muted_set;
1822 
1823 	if (client->quirks & QUIRK_REMOVE_CAPTURE_DONT_MOVE)
1824 		no_move = false;
1825 
1826 	if (rate != 0)
1827 		pw_properties_setf(props, PW_KEY_NODE_RATE, "1/%u", rate);
1828 	if (peak_detect)
1829 		pw_properties_set(props, PW_KEY_STREAM_MONITOR, "true");
1830 	if (no_remix)
1831 		pw_properties_set(props, PW_KEY_STREAM_DONT_REMIX, "true");
1832 	flags = 0;
1833 	if (no_move)
1834 		flags |= PW_STREAM_FLAG_DONT_RECONNECT;
1835 
1836 	if (direct_on_input_idx != SPA_ID_INVALID) {
1837 		source_index = direct_on_input_idx;
1838 	} else if (source_name != NULL) {
1839 		if ((id = atoi(source_name)) != 0)
1840 			source_index = id;
1841 	}
1842 	if (source_index != SPA_ID_INVALID && source_index != 0) {
1843 		if (source_index & MONITOR_FLAG)
1844 			source_index &= INDEX_MASK;
1845 		pw_properties_setf(props,
1846 				PW_KEY_NODE_TARGET, "%u", source_index);
1847 	} else if (source_name != NULL) {
1848 		if (spa_strendswith(source_name, ".monitor")) {
1849 			pw_properties_setf(props,
1850 					PW_KEY_NODE_TARGET,
1851 					"%.*s", (int)strlen(source_name)-8, source_name);
1852 			pw_properties_set(props,
1853 					PW_KEY_STREAM_CAPTURE_SINK, "true");
1854 		} else {
1855 			pw_properties_set(props,
1856 					PW_KEY_NODE_TARGET, source_name);
1857 		}
1858 	}
1859 
1860 	stream->stream = pw_stream_new(client->core, name, props);
1861 	props = NULL;
1862 	if (stream->stream == NULL)
1863 		goto error_errno;
1864 
1865 	pw_stream_add_listener(stream->stream,
1866 			&stream->stream_listener,
1867 			&stream_events, stream);
1868 
1869 	pw_stream_connect(stream->stream,
1870 			PW_DIRECTION_INPUT,
1871 			SPA_ID_INVALID,
1872 			flags |
1873 			PW_STREAM_FLAG_AUTOCONNECT |
1874 			PW_STREAM_FLAG_RT_PROCESS |
1875 			PW_STREAM_FLAG_MAP_BUFFERS,
1876 			params, n_params);
1877 
1878 	return 0;
1879 
1880 error_errno:
1881 	res = -errno;
1882 	goto error;
1883 error_protocol:
1884 	res = -EPROTO;
1885 	goto error;
1886 error_no_formats:
1887 	res = -ENOTSUP;
1888 	goto error;
1889 error_invalid:
1890 	res = -EINVAL;
1891 	goto error;
1892 error:
1893 	pw_properties_free(props);
1894 	if (stream)
1895 		stream_free(stream);
1896 	return res;
1897 }
1898 
do_delete_stream(struct client * client,uint32_t command,uint32_t tag,struct message * m)1899 static int do_delete_stream(struct client *client, uint32_t command, uint32_t tag, struct message *m)
1900 {
1901 	uint32_t channel;
1902 	struct stream *stream;
1903 	int res;
1904 
1905 	if ((res = message_get(m,
1906 			TAG_U32, &channel,
1907 			TAG_INVALID)) < 0)
1908 		return -EPROTO;
1909 
1910 	pw_log_info("[%s] DELETE_STREAM tag:%u channel:%u",
1911 			client->name, tag, channel);
1912 
1913 	stream = pw_map_lookup(&client->streams, channel);
1914 	if (stream == NULL)
1915 		return -ENOENT;
1916 	if (command == COMMAND_DELETE_PLAYBACK_STREAM &&
1917 	    stream->type != STREAM_TYPE_PLAYBACK)
1918 		return -ENOENT;
1919 	if (command == COMMAND_DELETE_RECORD_STREAM &&
1920 	    stream->type != STREAM_TYPE_RECORD)
1921 		return -ENOENT;
1922 	if (command == COMMAND_DELETE_UPLOAD_STREAM &&
1923 	    stream->type != STREAM_TYPE_UPLOAD)
1924 		return -ENOENT;
1925 
1926 	stream_free(stream);
1927 
1928 	return reply_simple_ack(client, tag);
1929 }
1930 
do_get_playback_latency(struct client * client,uint32_t command,uint32_t tag,struct message * m)1931 static int do_get_playback_latency(struct client *client, uint32_t command, uint32_t tag, struct message *m)
1932 {
1933 	struct impl *impl = client->impl;
1934 	struct message *reply;
1935 	uint32_t channel;
1936 	struct timeval tv, now;
1937 	struct stream *stream;
1938 	int res;
1939 
1940 	if ((res = message_get(m,
1941 			TAG_U32, &channel,
1942 			TAG_TIMEVAL, &tv,
1943 			TAG_INVALID)) < 0)
1944 		return -EPROTO;
1945 
1946 	pw_log_debug("%p: %s tag:%u channel:%u", impl, commands[command].name, tag, channel);
1947 	stream = pw_map_lookup(&client->streams, channel);
1948 	if (stream == NULL || stream->type != STREAM_TYPE_PLAYBACK)
1949 		return -ENOENT;
1950 
1951 	pw_log_debug("read:%"PRIx64" write:%"PRIx64" queued:%"PRIi64" delay:%"PRIi64
1952 			" playing:%"PRIu64,
1953 			stream->read_index, stream->write_index,
1954 			stream->write_index - stream->read_index, stream->delay,
1955 			stream->playing_for);
1956 
1957 	gettimeofday(&now, NULL);
1958 
1959 	reply = reply_new(client, tag);
1960 	message_put(reply,
1961 		TAG_USEC, stream->delay,	/* sink latency + queued samples */
1962 		TAG_USEC, 0LL,			/* always 0 */
1963 		TAG_BOOLEAN, stream->playing_for > 0 &&
1964 				!stream->corked,	/* playing state */
1965 		TAG_TIMEVAL, &tv,
1966 		TAG_TIMEVAL, &now,
1967 		TAG_S64, stream->write_index,
1968 		TAG_S64, stream->read_index,
1969 		TAG_INVALID);
1970 
1971 	if (client->version >= 13) {
1972 		message_put(reply,
1973 			TAG_U64, stream->underrun_for,
1974 			TAG_U64, stream->playing_for,
1975 			TAG_INVALID);
1976 	}
1977 	return client_queue_message(client, reply);
1978 }
1979 
do_get_record_latency(struct client * client,uint32_t command,uint32_t tag,struct message * m)1980 static int do_get_record_latency(struct client *client, uint32_t command, uint32_t tag, struct message *m)
1981 {
1982 	struct impl *impl = client->impl;
1983 	struct message *reply;
1984 	uint32_t channel;
1985 	struct timeval tv, now;
1986 	struct stream *stream;
1987 	int res;
1988 
1989 	if ((res = message_get(m,
1990 			TAG_U32, &channel,
1991 			TAG_TIMEVAL, &tv,
1992 			TAG_INVALID)) < 0)
1993 		return -EPROTO;
1994 
1995 	pw_log_debug("%p: %s channel:%u", impl, commands[command].name, channel);
1996 	stream = pw_map_lookup(&client->streams, channel);
1997 	if (stream == NULL || stream->type != STREAM_TYPE_RECORD)
1998 		return -ENOENT;
1999 
2000 	gettimeofday(&now, NULL);
2001 	reply = reply_new(client, tag);
2002 	message_put(reply,
2003 		TAG_USEC, 0LL,			/* monitor latency */
2004 		TAG_USEC, stream->delay,	/* source latency + queued */
2005 		TAG_BOOLEAN, !stream->corked,	/* playing state */
2006 		TAG_TIMEVAL, &tv,
2007 		TAG_TIMEVAL, &now,
2008 		TAG_S64, stream->write_index,
2009 		TAG_S64, stream->read_index,
2010 		TAG_INVALID);
2011 
2012 	return client_queue_message(client, reply);
2013 }
2014 
do_create_upload_stream(struct client * client,uint32_t command,uint32_t tag,struct message * m)2015 static int do_create_upload_stream(struct client *client, uint32_t command, uint32_t tag, struct message *m)
2016 {
2017 	const char *name;
2018 	struct sample_spec ss;
2019 	struct channel_map map;
2020 	struct pw_properties *props = NULL;
2021 	uint32_t length;
2022 	struct stream *stream = NULL;
2023 	struct message *reply;
2024 	int res;
2025 
2026 	if ((props = pw_properties_copy(client->props)) == NULL)
2027 		goto error_errno;
2028 
2029 	if ((res = message_get(m,
2030 			TAG_STRING, &name,
2031 			TAG_SAMPLE_SPEC, &ss,
2032 			TAG_CHANNEL_MAP, &map,
2033 			TAG_U32, &length,
2034 			TAG_INVALID)) < 0)
2035 		goto error_proto;
2036 
2037 	if (client->version >= 13) {
2038 		if ((res = message_get(m,
2039 				TAG_PROPLIST, props,
2040 				TAG_INVALID)) < 0)
2041 			goto error_proto;
2042 
2043 	} else {
2044 		pw_properties_set(props, PW_KEY_MEDIA_NAME, name);
2045 	}
2046 	if (name == NULL)
2047 		name = pw_properties_get(props, "event.id");
2048 	if (name == NULL)
2049 		name = pw_properties_get(props, PW_KEY_MEDIA_NAME);
2050 
2051 	if (name == NULL ||
2052 	    !sample_spec_valid(&ss) ||
2053 	    !channel_map_valid(&map) ||
2054 	    ss.channels != map.channels ||
2055 	    length == 0 || (length % sample_spec_frame_size(&ss) != 0))
2056 		goto error_invalid;
2057 	if (length >= SCACHE_ENTRY_SIZE_MAX)
2058 		goto error_toolarge;
2059 
2060 	pw_log_info("[%s] %s tag:%u name:%s length:%d",
2061 			client->name, commands[command].name, tag,
2062 			name, length);
2063 
2064 	stream = stream_new(client, STREAM_TYPE_UPLOAD, tag, &ss, &map, &(struct buffer_attr) {
2065 		.maxlength = length,
2066 	});
2067 	if (stream == NULL)
2068 		goto error_errno;
2069 
2070 	stream->props = props;
2071 
2072 	stream->buffer = calloc(1, stream->attr.maxlength);
2073 	if (stream->buffer == NULL)
2074 		goto error_errno;
2075 
2076 	reply = reply_new(client, tag);
2077 	message_put(reply,
2078 		TAG_U32, stream->channel,
2079 		TAG_U32, length,
2080 		TAG_INVALID);
2081 	return client_queue_message(client, reply);
2082 
2083 error_errno:
2084 	res = -errno;
2085 	goto error;
2086 error_proto:
2087 	res = -EPROTO;
2088 	goto error;
2089 error_invalid:
2090 	res = -EINVAL;
2091 	goto error;
2092 error_toolarge:
2093 	res = -EOVERFLOW;
2094 	goto error;
2095 error:
2096 	pw_properties_free(props);
2097 	if (stream)
2098 		stream_free(stream);
2099 	return res;
2100 }
2101 
do_finish_upload_stream(struct client * client,uint32_t command,uint32_t tag,struct message * m)2102 static int do_finish_upload_stream(struct client *client, uint32_t command, uint32_t tag, struct message *m)
2103 {
2104 	struct impl *impl = client->impl;
2105 	uint32_t channel, event;
2106 	struct stream *stream;
2107 	struct sample *sample;
2108 	const char *name;
2109 	int res;
2110 
2111 	if (message_get(m,
2112 			TAG_U32, &channel,
2113 			TAG_INVALID) < 0)
2114 		return -EPROTO;
2115 
2116 	stream = pw_map_lookup(&client->streams, channel);
2117 	if (stream == NULL || stream->type != STREAM_TYPE_UPLOAD)
2118 		return -ENOENT;
2119 
2120 	name = pw_properties_get(stream->props, "event.id");
2121 	if (name == NULL)
2122 		name = pw_properties_get(stream->props, PW_KEY_MEDIA_NAME);
2123 	if (name == NULL)
2124 		goto error_invalid;
2125 
2126 	pw_log_info("[%s] %s tag:%u channel:%u name:%s",
2127 			client->name, commands[command].name, tag,
2128 			channel, name);
2129 
2130 	struct sample *old = find_sample(impl, SPA_ID_INVALID, name);
2131 	if (old == NULL || (old != NULL && old->ref > 1)) {
2132 		sample = calloc(1, sizeof(*sample));
2133 		if (sample == NULL)
2134 			goto error_errno;
2135 
2136 		if (old != NULL) {
2137 			sample->index = old->index;
2138 			spa_assert_se(pw_map_insert_at(&impl->samples, sample->index, sample) == 0);
2139 
2140 			old->index = SPA_ID_INVALID;
2141 			sample_unref(old);
2142 		} else {
2143 			sample->index = pw_map_insert_new(&impl->samples, sample);
2144 			if (sample->index == SPA_ID_INVALID)
2145 				goto error_errno;
2146 		}
2147 	} else {
2148 		pw_properties_free(old->props);
2149 		free(old->buffer);
2150 		impl->stat.sample_cache -= old->length;
2151 
2152 		sample = old;
2153 	}
2154 
2155 	if (old != NULL)
2156 		event = SUBSCRIPTION_EVENT_CHANGE;
2157 	else
2158 		event = SUBSCRIPTION_EVENT_NEW;
2159 
2160 	sample->ref = 1;
2161 	sample->impl = impl;
2162 	sample->name = name;
2163 	sample->props = stream->props;
2164 	sample->ss = stream->ss;
2165 	sample->map = stream->map;
2166 	sample->buffer = stream->buffer;
2167 	sample->length = stream->attr.maxlength;
2168 
2169 	impl->stat.sample_cache += sample->length;
2170 
2171 	stream->props = NULL;
2172 	stream->buffer = NULL;
2173 	stream_free(stream);
2174 
2175 	broadcast_subscribe_event(impl,
2176 			SUBSCRIPTION_MASK_SAMPLE_CACHE,
2177 			event | SUBSCRIPTION_EVENT_SAMPLE_CACHE,
2178 			sample->index);
2179 
2180 	return reply_simple_ack(client, tag);
2181 
2182 error_errno:
2183 	res = -errno;
2184 	free(sample);
2185 	goto error;
2186 error_invalid:
2187 	res = -EINVAL;
2188 	goto error;
2189 error:
2190 	stream_free(stream);
2191 	return res;
2192 }
2193 
get_default(struct client * client,bool sink)2194 static const char *get_default(struct client *client, bool sink)
2195 {
2196 	struct selector sel;
2197 	struct pw_manager *manager = client->manager;
2198 	struct pw_manager_object *o;
2199 	const char *def, *str, *mon;
2200 
2201 	spa_zero(sel);
2202 	if (sink) {
2203 		sel.type = pw_manager_object_is_sink;
2204 		sel.key = PW_KEY_NODE_NAME;
2205 		sel.value = client->default_sink;
2206 		def = DEFAULT_SINK;
2207 	} else {
2208 		sel.type = pw_manager_object_is_source_or_monitor;
2209 		sel.key = PW_KEY_NODE_NAME;
2210 		sel.value = client->default_source;
2211 		def = DEFAULT_SOURCE;
2212 	}
2213 	sel.accumulate = select_best;
2214 
2215 	o = select_object(manager, &sel);
2216 	if (o == NULL || o->props == NULL)
2217 		return def;
2218 	str = pw_properties_get(o->props, PW_KEY_NODE_NAME);
2219 
2220 	if (!sink && pw_manager_object_is_monitor(o)) {
2221 		def = DEFAULT_MONITOR;
2222 		if (str != NULL &&
2223 		    (mon = pw_properties_get(o->props, PW_KEY_NODE_NAME".monitor")) == NULL) {
2224 			pw_properties_setf(o->props,
2225 					PW_KEY_NODE_NAME".monitor",
2226 					"%s.monitor", str);
2227 		}
2228 		str = pw_properties_get(o->props, PW_KEY_NODE_NAME".monitor");
2229 	}
2230 	if (str == NULL)
2231 		str = def;
2232 	return str;
2233 }
2234 
find_device(struct client * client,uint32_t id,const char * name,bool sink,bool * is_monitor)2235 static struct pw_manager_object *find_device(struct client *client,
2236 		uint32_t id, const char *name, bool sink, bool *is_monitor)
2237 {
2238 	struct selector sel;
2239 	bool monitor = false, find_default = false;
2240 
2241 	if (name != NULL) {
2242 		if (spa_streq(name, DEFAULT_MONITOR)) {
2243 			if (sink)
2244 				return NULL;
2245 			sink = true;
2246 			find_default = true;
2247 		} else if (spa_streq(name, DEFAULT_SOURCE)) {
2248 			if (sink)
2249 				return NULL;
2250 			find_default = true;
2251 		} else if (spa_streq(name, DEFAULT_SINK)) {
2252 			if (!sink)
2253 				return NULL;
2254 			find_default = true;
2255 		} else if (spa_atou32(name, &id, 0)) {
2256 			name = NULL;
2257 		}
2258 	}
2259 	if (name == NULL && (id == SPA_ID_INVALID || id == 0))
2260 		find_default = true;
2261 
2262 	if (find_default) {
2263 		name = get_default(client, sink);
2264 		id = SPA_ID_INVALID;
2265 	}
2266 
2267 	if (id != SPA_ID_INVALID) {
2268 		if (id & MONITOR_FLAG) {
2269 			if (sink)
2270 				return NULL;
2271 			monitor = true;
2272 			id &= ~MONITOR_FLAG;
2273 		}
2274 	} else if (name != NULL) {
2275 		if (spa_strendswith(name, ".monitor")) {
2276 			name = strndupa(name, strlen(name)-8);
2277 			monitor = true;
2278 		}
2279 	} else
2280 		return NULL;
2281 
2282 	if (is_monitor)
2283 		*is_monitor = monitor;
2284 
2285 	spa_zero(sel);
2286 	sel.type = sink ?
2287 		pw_manager_object_is_sink :
2288 		pw_manager_object_is_source_or_monitor;
2289 	sel.id = id;
2290 	sel.key = PW_KEY_NODE_NAME;
2291 	sel.value = name;
2292 
2293 	return select_object(client->manager, &sel);
2294 }
2295 
sample_play_ready(void * data,uint32_t index)2296 static void sample_play_ready(void *data, uint32_t index)
2297 {
2298 	struct pending_sample *ps = data;
2299 	struct client *client = ps->client;
2300 	struct message *reply;
2301 
2302 	pw_log_info("[%s] PLAY_SAMPLE tag:%u index:%u",
2303 			client->name, ps->tag, index);
2304 
2305 	reply = reply_new(client, ps->tag);
2306 	if (client->version >= 13)
2307 		message_put(reply,
2308 			TAG_U32, index,
2309 			TAG_INVALID);
2310 
2311 	client_queue_message(client, reply);
2312 }
2313 
on_sample_done(void * obj,void * data,int res,uint32_t id)2314 static void on_sample_done(void *obj, void *data, int res, uint32_t id)
2315 {
2316 	struct pending_sample *ps = obj;
2317 	struct client *client = ps->client;
2318 	pending_sample_free(ps);
2319 	if (client->ref <= 0)
2320 		client_free(client);
2321 }
2322 
sample_play_done(void * data,int res)2323 static void sample_play_done(void *data, int res)
2324 {
2325 	struct pending_sample *ps = data;
2326 	struct client *client = ps->client;
2327 	struct impl *impl = client->impl;
2328 
2329 	if (res < 0)
2330 		reply_error(client, COMMAND_PLAY_SAMPLE, ps->tag, res);
2331 	else
2332 		pw_log_info("[%s] PLAY_SAMPLE done tag:%u", client->name, ps->tag);
2333 
2334 	ps->done = true;
2335 	pw_work_queue_add(impl->work_queue, ps, 0,
2336 				on_sample_done, client);
2337 }
2338 
2339 static const struct sample_play_events sample_play_events = {
2340 	VERSION_SAMPLE_PLAY_EVENTS,
2341 	.ready = sample_play_ready,
2342 	.done = sample_play_done,
2343 };
2344 
do_play_sample(struct client * client,uint32_t command,uint32_t tag,struct message * m)2345 static int do_play_sample(struct client *client, uint32_t command, uint32_t tag, struct message *m)
2346 {
2347 	struct impl *impl = client->impl;
2348 	uint32_t sink_index, volume;
2349 	struct sample *sample;
2350 	struct sample_play *play;
2351 	const char *sink_name, *name;
2352 	struct pw_properties *props = NULL;
2353 	struct pending_sample *ps;
2354 	struct pw_manager_object *o;
2355 	int res;
2356 
2357 	if ((props = pw_properties_new(NULL, NULL)) == NULL)
2358 		goto error_errno;
2359 
2360 	if ((res = message_get(m,
2361 			TAG_U32, &sink_index,
2362 			TAG_STRING, &sink_name,
2363 			TAG_U32, &volume,
2364 			TAG_STRING, &name,
2365 			TAG_INVALID)) < 0)
2366 		goto error_proto;
2367 
2368 	if (client->version >= 13) {
2369 		if ((res = message_get(m,
2370 				TAG_PROPLIST, props,
2371 				TAG_INVALID)) < 0)
2372 			goto error_proto;
2373 
2374 	}
2375 	pw_log_info("[%s] %s tag:%u sink_index:%u sink_name:%s name:%s",
2376 			client->name, commands[command].name, tag,
2377 			sink_index, sink_name, name);
2378 
2379 	pw_properties_update(props, &client->props->dict);
2380 
2381 	if (sink_index != SPA_ID_INVALID && sink_name != NULL)
2382 		goto error_inval;
2383 
2384 	o = find_device(client, sink_index, sink_name, PW_DIRECTION_OUTPUT, NULL);
2385 	if (o == NULL)
2386 		goto error_noent;
2387 
2388 	sample = find_sample(impl, SPA_ID_INVALID, name);
2389 	if (sample == NULL)
2390 		goto error_noent;
2391 
2392 	pw_properties_setf(props, PW_KEY_NODE_TARGET, "%u", o->id);
2393 
2394 	play = sample_play_new(client->core, sample, props, sizeof(struct pending_sample));
2395 	props = NULL;
2396 	if (play == NULL)
2397 		goto error_errno;
2398 
2399 	ps = play->user_data;
2400 	ps->client = client;
2401 	ps->play = play;
2402 	ps->tag = tag;
2403 	sample_play_add_listener(play, &ps->listener, &sample_play_events, ps);
2404 	spa_list_append(&client->pending_samples, &ps->link);
2405 	client->ref++;
2406 
2407 	return 0;
2408 
2409 error_errno:
2410 	res = -errno;
2411 	goto error;
2412 error_proto:
2413 	res = -EPROTO;
2414 	goto error;
2415 error_inval:
2416 	res = -EINVAL;
2417 	goto error;
2418 error_noent:
2419 	res = -ENOENT;
2420 	goto error;
2421 error:
2422 	pw_properties_free(props);
2423 	return res;
2424 }
2425 
do_remove_sample(struct client * client,uint32_t command,uint32_t tag,struct message * m)2426 static int do_remove_sample(struct client *client, uint32_t command, uint32_t tag, struct message *m)
2427 {
2428 	struct impl *impl = client->impl;
2429 	const char *name;
2430 	struct sample *sample;
2431 	int res;
2432 
2433 	if ((res = message_get(m,
2434 			TAG_STRING, &name,
2435 			TAG_INVALID)) < 0)
2436 		return -EPROTO;
2437 
2438 	pw_log_info("[%s] %s tag:%u name:%s",
2439 			client->name, commands[command].name, tag,
2440 			name);
2441 	if (name == NULL)
2442 		return -EINVAL;
2443 	if ((sample = find_sample(impl, SPA_ID_INVALID, name)) == NULL)
2444 		return -ENOENT;
2445 
2446 	broadcast_subscribe_event(impl,
2447 			SUBSCRIPTION_MASK_SAMPLE_CACHE,
2448 			SUBSCRIPTION_EVENT_REMOVE |
2449 			SUBSCRIPTION_EVENT_SAMPLE_CACHE,
2450 			sample->index);
2451 
2452 	pw_map_remove(&impl->samples, sample->index);
2453 	sample->index = SPA_ID_INVALID;
2454 
2455 	sample_unref(sample);
2456 
2457 	return reply_simple_ack(client, tag);
2458 }
2459 
do_cork_stream(struct client * client,uint32_t command,uint32_t tag,struct message * m)2460 static int do_cork_stream(struct client *client, uint32_t command, uint32_t tag, struct message *m)
2461 {
2462 	uint32_t channel;
2463 	bool cork;
2464 	struct stream *stream;
2465 	int res;
2466 
2467 	if ((res = message_get(m,
2468 			TAG_U32, &channel,
2469 			TAG_BOOLEAN, &cork,
2470 			TAG_INVALID)) < 0)
2471 		return -EPROTO;
2472 
2473 	pw_log_info("[%s] %s tag:%u channel:%u cork:%s",
2474 			client->name, commands[command].name, tag,
2475 			channel, cork ? "yes" : "no");
2476 
2477 	stream = pw_map_lookup(&client->streams, channel);
2478 	if (stream == NULL || stream->type == STREAM_TYPE_UPLOAD)
2479 		return -ENOENT;
2480 
2481 	stream->corked = cork;
2482 	pw_stream_set_active(stream->stream, !cork);
2483 	if (cork) {
2484 		stream->is_underrun = true;
2485 	} else {
2486 		stream->playing_for = 0;
2487 		stream->underrun_for = -1;
2488 	}
2489 
2490 	return reply_simple_ack(client, tag);
2491 }
2492 
do_flush_trigger_prebuf_stream(struct client * client,uint32_t command,uint32_t tag,struct message * m)2493 static int do_flush_trigger_prebuf_stream(struct client *client, uint32_t command, uint32_t tag, struct message *m)
2494 {
2495 	uint32_t channel;
2496 	struct stream *stream;
2497 	int res;
2498 
2499 	if ((res = message_get(m,
2500 			TAG_U32, &channel,
2501 			TAG_INVALID)) < 0)
2502 		return -EPROTO;
2503 
2504 	pw_log_info("[%s] %s tag:%u channel:%u",
2505 			client->name, commands[command].name, tag, channel);
2506 
2507 	stream = pw_map_lookup(&client->streams, channel);
2508 	if (stream == NULL || stream->type == STREAM_TYPE_UPLOAD)
2509 		return -ENOENT;
2510 
2511 	switch (command) {
2512 	case COMMAND_FLUSH_PLAYBACK_STREAM:
2513 	case COMMAND_FLUSH_RECORD_STREAM:
2514 		stream_flush(stream);
2515 		break;
2516 	case COMMAND_TRIGGER_PLAYBACK_STREAM:
2517 	case COMMAND_PREBUF_PLAYBACK_STREAM:
2518 		break;
2519 	default:
2520 		return -EINVAL;
2521 	}
2522 
2523 	return reply_simple_ack(client, tag);
2524 }
2525 
set_node_volume_mute(struct pw_manager_object * o,struct volume * vol,bool * mute,bool is_monitor)2526 static int set_node_volume_mute(struct pw_manager_object *o,
2527 		struct volume *vol, bool *mute, bool is_monitor)
2528 {
2529 	char buf[1024];
2530 	struct spa_pod_builder b = SPA_POD_BUILDER_INIT(buf, sizeof(buf));
2531 	struct spa_pod_frame f[1];
2532 	struct spa_pod *param;
2533 	uint32_t volprop, muteprop;
2534 
2535 	if (!SPA_FLAG_IS_SET(o->permissions, PW_PERM_W | PW_PERM_X))
2536 		return -EACCES;
2537 	if (o->proxy == NULL)
2538 		return -ENOENT;
2539 
2540 	if (is_monitor) {
2541 		volprop = SPA_PROP_monitorVolumes;
2542 		muteprop = SPA_PROP_monitorMute;
2543 	} else {
2544 		volprop = SPA_PROP_channelVolumes;
2545 		muteprop = SPA_PROP_mute;
2546 	}
2547 
2548 	spa_pod_builder_push_object(&b, &f[0],
2549 			SPA_TYPE_OBJECT_Props,  SPA_PARAM_Props);
2550 	if (vol)
2551 		spa_pod_builder_add(&b,
2552 				volprop, SPA_POD_Array(sizeof(float),
2553 							SPA_TYPE_Float,
2554 							vol->channels,
2555 							vol->values), 0);
2556 	if (mute)
2557 		spa_pod_builder_add(&b,
2558 				muteprop, SPA_POD_Bool(*mute), 0);
2559 	param = spa_pod_builder_pop(&b, &f[0]);
2560 
2561 	pw_node_set_param((struct pw_node*)o->proxy,
2562 		SPA_PARAM_Props, 0, param);
2563 	return 0;
2564 }
2565 
set_card_volume_mute_delay(struct pw_manager_object * o,uint32_t id,uint32_t device_id,struct volume * vol,bool * mute,int64_t * latency_offset)2566 static int set_card_volume_mute_delay(struct pw_manager_object *o, uint32_t id,
2567 		uint32_t device_id, struct volume *vol, bool *mute, int64_t *latency_offset)
2568 {
2569 	char buf[1024];
2570 	struct spa_pod_builder b = SPA_POD_BUILDER_INIT(buf, sizeof(buf));
2571 	struct spa_pod_frame f[2];
2572 	struct spa_pod *param;
2573 
2574 	if (!SPA_FLAG_IS_SET(o->permissions, PW_PERM_W | PW_PERM_X))
2575 		return -EACCES;
2576 
2577 	if (o->proxy == NULL)
2578 		return -ENOENT;
2579 
2580 	spa_pod_builder_push_object(&b, &f[0],
2581 			SPA_TYPE_OBJECT_ParamRoute, SPA_PARAM_Route);
2582 	spa_pod_builder_add(&b,
2583 			SPA_PARAM_ROUTE_index, SPA_POD_Int(id),
2584 			SPA_PARAM_ROUTE_device, SPA_POD_Int(device_id),
2585 			0);
2586 	spa_pod_builder_prop(&b, SPA_PARAM_ROUTE_props, 0);
2587 	spa_pod_builder_push_object(&b, &f[1],
2588 			SPA_TYPE_OBJECT_Props,  SPA_PARAM_Props);
2589 	if (vol)
2590 		spa_pod_builder_add(&b,
2591 				SPA_PROP_channelVolumes, SPA_POD_Array(sizeof(float),
2592 								SPA_TYPE_Float,
2593 								vol->channels,
2594 								vol->values), 0);
2595 	if (mute)
2596 		spa_pod_builder_add(&b,
2597 				SPA_PROP_mute, SPA_POD_Bool(*mute), 0);
2598 	if (latency_offset)
2599 		spa_pod_builder_add(&b,
2600 				SPA_PROP_latencyOffsetNsec, SPA_POD_Long(*latency_offset), 0);
2601 	spa_pod_builder_pop(&b, &f[1]);
2602 	spa_pod_builder_prop(&b, SPA_PARAM_ROUTE_save, 0);
2603 	spa_pod_builder_bool(&b, true);
2604 	param = spa_pod_builder_pop(&b, &f[0]);
2605 
2606 	pw_device_set_param((struct pw_device*)o->proxy,
2607 			SPA_PARAM_Route, 0, param);
2608 	return 0;
2609 }
2610 
set_card_port(struct pw_manager_object * o,uint32_t device_id,uint32_t port_id)2611 static int set_card_port(struct pw_manager_object *o, uint32_t device_id,
2612 		uint32_t port_id)
2613 {
2614 	char buf[1024];
2615 	struct spa_pod_builder b = SPA_POD_BUILDER_INIT(buf, sizeof(buf));
2616 
2617 	if (!SPA_FLAG_IS_SET(o->permissions, PW_PERM_W | PW_PERM_X))
2618 		return -EACCES;
2619 
2620 	if (o->proxy == NULL)
2621 		return -ENOENT;
2622 
2623 	pw_device_set_param((struct pw_device*)o->proxy,
2624 			SPA_PARAM_Route, 0,
2625 			spa_pod_builder_add_object(&b,
2626 				SPA_TYPE_OBJECT_ParamRoute, SPA_PARAM_Route,
2627 				SPA_PARAM_ROUTE_index, SPA_POD_Int(port_id),
2628 				SPA_PARAM_ROUTE_device, SPA_POD_Int(device_id),
2629 				SPA_PARAM_ROUTE_save, SPA_POD_Bool(true)));
2630 
2631 	return 0;
2632 }
2633 
do_set_stream_volume(struct client * client,uint32_t command,uint32_t tag,struct message * m)2634 static int do_set_stream_volume(struct client *client, uint32_t command, uint32_t tag, struct message *m)
2635 {
2636 	struct pw_manager *manager = client->manager;
2637 	uint32_t id;
2638 	struct stream *stream;
2639 	struct volume volume;
2640 	int res;
2641 
2642 	if ((res = message_get(m,
2643 			TAG_U32, &id,
2644 			TAG_CVOLUME, &volume,
2645 			TAG_INVALID)) < 0)
2646 		return -EPROTO;
2647 
2648 	pw_log_info("[%s] %s tag:%u index:%u",
2649 			client->name, commands[command].name, tag, id);
2650 
2651 	stream = find_stream(client, id);
2652 	if (stream != NULL) {
2653 
2654 		if (volume_compare(&stream->volume, &volume) == 0)
2655 			goto done;
2656 
2657 		pw_stream_set_control(stream->stream,
2658 				SPA_PROP_channelVolumes, volume.channels, volume.values,
2659 				0);
2660 	} else {
2661 		struct selector sel;
2662 		struct pw_manager_object *o;
2663 
2664 		spa_zero(sel);
2665 		sel.id = id;
2666 		if (command == COMMAND_SET_SINK_INPUT_VOLUME)
2667 			sel.type = pw_manager_object_is_sink_input;
2668 		else
2669 			sel.type = pw_manager_object_is_source_output;
2670 
2671 		o = select_object(manager, &sel);
2672 		if (o == NULL)
2673 			return -ENOENT;
2674 
2675 		if ((res = set_node_volume_mute(o, &volume, NULL, false)) < 0)
2676 			return res;
2677 	}
2678 done:
2679 	return operation_new(client, tag);
2680 }
2681 
do_set_stream_mute(struct client * client,uint32_t command,uint32_t tag,struct message * m)2682 static int do_set_stream_mute(struct client *client, uint32_t command, uint32_t tag, struct message *m)
2683 {
2684 	struct pw_manager *manager = client->manager;
2685 	uint32_t id;
2686 	struct stream *stream;
2687 	int res;
2688 	bool mute;
2689 
2690 	if ((res = message_get(m,
2691 			TAG_U32, &id,
2692 			TAG_BOOLEAN, &mute,
2693 			TAG_INVALID)) < 0)
2694 		return -EPROTO;
2695 
2696 	pw_log_info("[%s] DO_SET_STREAM_MUTE tag:%u id:%u mute:%u",
2697 			client->name, tag, id, mute);
2698 
2699 	stream = find_stream(client, id);
2700 	if (stream != NULL) {
2701 		float val;
2702 
2703 		if (stream->muted == mute)
2704 			goto done;
2705 
2706 		val = mute ? 1.0f : 0.0f;
2707 		pw_stream_set_control(stream->stream,
2708 				SPA_PROP_mute, 1, &val,
2709 				0);
2710 	} else {
2711 		struct selector sel;
2712 		struct pw_manager_object *o;
2713 
2714 		spa_zero(sel);
2715 		sel.id = id;
2716 		if (command == COMMAND_SET_SINK_INPUT_MUTE)
2717 			sel.type = pw_manager_object_is_sink_input;
2718 		else
2719 			sel.type = pw_manager_object_is_source_output;
2720 
2721 		o = select_object(manager, &sel);
2722 		if (o == NULL)
2723 			return -ENOENT;
2724 
2725 		if ((res = set_node_volume_mute(o, NULL, &mute, false)) < 0)
2726 			return res;
2727 	}
2728 done:
2729 	return operation_new(client, tag);
2730 }
2731 
do_set_volume(struct client * client,uint32_t command,uint32_t tag,struct message * m)2732 static int do_set_volume(struct client *client, uint32_t command, uint32_t tag, struct message *m)
2733 {
2734 	struct impl *impl = client->impl;
2735 	struct pw_manager *manager = client->manager;
2736 	struct pw_node_info *info;
2737 	uint32_t id, card_id = SPA_ID_INVALID;
2738 	const char *name, *str;
2739 	struct volume volume;
2740 	struct pw_manager_object *o, *card = NULL;
2741 	int res;
2742 	struct device_info dev_info;
2743 	enum pw_direction direction;
2744 	bool is_monitor;
2745 
2746 	if ((res = message_get(m,
2747 			TAG_U32, &id,
2748 			TAG_STRING, &name,
2749 			TAG_CVOLUME, &volume,
2750 			TAG_INVALID)) < 0)
2751 		return -EPROTO;
2752 
2753 	pw_log_info("[%s] %s tag:%u index:%u name:%s",
2754 			client->name, commands[command].name, tag, id, name);
2755 
2756 	if ((id == SPA_ID_INVALID && name == NULL) ||
2757 	    (id != SPA_ID_INVALID && name != NULL))
2758 		return -EINVAL;
2759 
2760 	if (command == COMMAND_SET_SINK_VOLUME)
2761 		direction = PW_DIRECTION_OUTPUT;
2762 	else
2763 		direction = PW_DIRECTION_INPUT;
2764 
2765 	o = find_device(client, id, name, direction == PW_DIRECTION_OUTPUT, &is_monitor);
2766 	if (o == NULL || (info = o->info) == NULL || info->props == NULL)
2767 		return -ENOENT;
2768 
2769 	dev_info = DEVICE_INFO_INIT(direction);
2770 
2771 	if ((str = spa_dict_lookup(info->props, PW_KEY_DEVICE_ID)) != NULL)
2772 		card_id = (uint32_t)atoi(str);
2773 	if ((str = spa_dict_lookup(info->props, "card.profile.device")) != NULL)
2774 		dev_info.device = (uint32_t)atoi(str);
2775 	if (card_id != SPA_ID_INVALID) {
2776 		struct selector sel = { .id = card_id, .type = pw_manager_object_is_card, };
2777 		card = select_object(manager, &sel);
2778 	}
2779 	collect_device_info(o, card, &dev_info, is_monitor, &impl->defs);
2780 
2781 	if (dev_info.have_volume &&
2782 	    volume_compare(&dev_info.volume_info.volume, &volume) == 0)
2783 		goto done;
2784 
2785 	if (card != NULL && !is_monitor && dev_info.active_port != SPA_ID_INVALID)
2786 		res = set_card_volume_mute_delay(card, dev_info.active_port,
2787 				dev_info.device, &volume, NULL, NULL);
2788 	else
2789 		res = set_node_volume_mute(o, &volume, NULL, is_monitor);
2790 
2791 	if (res < 0)
2792 		return res;
2793 
2794 done:
2795 	return operation_new(client, tag);
2796 }
2797 
do_set_mute(struct client * client,uint32_t command,uint32_t tag,struct message * m)2798 static int do_set_mute(struct client *client, uint32_t command, uint32_t tag, struct message *m)
2799 {
2800 	struct impl *impl = client->impl;
2801 	struct pw_manager *manager = client->manager;
2802 	struct pw_node_info *info;
2803 	uint32_t id, card_id = SPA_ID_INVALID;
2804 	const char *name, *str;
2805 	bool mute;
2806 	struct pw_manager_object *o, *card = NULL;
2807 	int res;
2808 	struct device_info dev_info;
2809 	enum pw_direction direction;
2810 	bool is_monitor;
2811 
2812 	if ((res = message_get(m,
2813 			TAG_U32, &id,
2814 			TAG_STRING, &name,
2815 			TAG_BOOLEAN, &mute,
2816 			TAG_INVALID)) < 0)
2817 		return -EPROTO;
2818 
2819 	pw_log_info("[%s] %s tag:%u index:%u name:%s mute:%d",
2820 			client->name, commands[command].name, tag, id, name, mute);
2821 
2822 	if ((id == SPA_ID_INVALID && name == NULL) ||
2823 	    (id != SPA_ID_INVALID && name != NULL))
2824 		return -EINVAL;
2825 
2826 	if (command == COMMAND_SET_SINK_MUTE)
2827 		direction = PW_DIRECTION_OUTPUT;
2828 	else
2829 		direction = PW_DIRECTION_INPUT;
2830 
2831 	o = find_device(client, id, name, direction == PW_DIRECTION_OUTPUT, &is_monitor);
2832 	if (o == NULL || (info = o->info) == NULL || info->props == NULL)
2833 		return -ENOENT;
2834 
2835 	dev_info = DEVICE_INFO_INIT(direction);
2836 
2837 	if ((str = spa_dict_lookup(info->props, PW_KEY_DEVICE_ID)) != NULL)
2838 		card_id = (uint32_t)atoi(str);
2839 	if ((str = spa_dict_lookup(info->props, "card.profile.device")) != NULL)
2840 		dev_info.device = (uint32_t)atoi(str);
2841 	if (card_id != SPA_ID_INVALID) {
2842 		struct selector sel = { .id = card_id, .type = pw_manager_object_is_card, };
2843 		card = select_object(manager, &sel);
2844 	}
2845 	collect_device_info(o, card, &dev_info, is_monitor, &impl->defs);
2846 
2847 	if (dev_info.have_volume &&
2848 	    dev_info.volume_info.mute == mute)
2849 		goto done;
2850 
2851 	if (card != NULL && !is_monitor && dev_info.active_port != SPA_ID_INVALID)
2852 		res = set_card_volume_mute_delay(card, dev_info.active_port,
2853 				dev_info.device, NULL, &mute, NULL);
2854 	else
2855 		res = set_node_volume_mute(o, NULL, &mute, is_monitor);
2856 
2857 	if (res < 0)
2858 		return res;
2859 done:
2860 	return operation_new(client, tag);
2861 }
2862 
do_set_port(struct client * client,uint32_t command,uint32_t tag,struct message * m)2863 static int do_set_port(struct client *client, uint32_t command, uint32_t tag, struct message *m)
2864 {
2865 	struct pw_manager *manager = client->manager;
2866 	struct pw_node_info *info;
2867 	uint32_t id, card_id = SPA_ID_INVALID, device_id = SPA_ID_INVALID;
2868 	uint32_t port_id = SPA_ID_INVALID;
2869 	const char *name, *str, *port_name;
2870 	struct pw_manager_object *o, *card = NULL;
2871 	int res;
2872 	enum pw_direction direction;
2873 
2874 	if ((res = message_get(m,
2875 			TAG_U32, &id,
2876 			TAG_STRING, &name,
2877 			TAG_STRING, &port_name,
2878 			TAG_INVALID)) < 0)
2879 		return -EPROTO;
2880 
2881 	pw_log_info("[%s] %s tag:%u index:%u name:%s port:%s",
2882 			client->name, commands[command].name, tag, id, name, port_name);
2883 
2884 	if ((id == SPA_ID_INVALID && name == NULL) ||
2885 	    (id != SPA_ID_INVALID && name != NULL))
2886 		return -EINVAL;
2887 
2888 	if (command == COMMAND_SET_SINK_PORT)
2889 		direction = PW_DIRECTION_OUTPUT;
2890 	else
2891 		direction = PW_DIRECTION_INPUT;
2892 
2893 	o = find_device(client, id, name, direction == PW_DIRECTION_OUTPUT, NULL);
2894 	if (o == NULL || (info = o->info) == NULL || info->props == NULL)
2895 		return -ENOENT;
2896 
2897 	if ((str = spa_dict_lookup(info->props, PW_KEY_DEVICE_ID)) != NULL)
2898 		card_id = (uint32_t)atoi(str);
2899 	if ((str = spa_dict_lookup(info->props, "card.profile.device")) != NULL)
2900 		device_id = (uint32_t)atoi(str);
2901 	if (card_id != SPA_ID_INVALID) {
2902 		struct selector sel = { .id = card_id, .type = pw_manager_object_is_card, };
2903 		card = select_object(manager, &sel);
2904 	}
2905 	if (card == NULL || device_id == SPA_ID_INVALID)
2906 		return -ENOENT;
2907 
2908 	port_id = find_port_id(card, direction, port_name);
2909 	if (port_id == SPA_ID_INVALID)
2910 		return -ENOENT;
2911 
2912 	if ((res = set_card_port(card, device_id, port_id)) < 0)
2913 		return res;
2914 
2915 	return operation_new(client, tag);
2916 }
2917 
do_set_port_latency_offset(struct client * client,uint32_t command,uint32_t tag,struct message * m)2918 static int do_set_port_latency_offset(struct client *client, uint32_t command, uint32_t tag, struct message *m)
2919 {
2920 	struct pw_manager *manager = client->manager;
2921 	const char *port_name = NULL;
2922 	struct pw_manager_object *card;
2923 	struct selector sel;
2924 	struct card_info card_info = CARD_INFO_INIT;
2925 	struct port_info *port_info;
2926 	int64_t offset;
2927 	int64_t value;
2928 	int res;
2929 	uint32_t n_ports;
2930 	size_t i;
2931 
2932 	spa_zero(sel);
2933 	sel.key = PW_KEY_DEVICE_NAME;
2934 	sel.type = pw_manager_object_is_card;
2935 
2936 	if ((res = message_get(m,
2937 			TAG_U32, &sel.id,
2938 			TAG_STRING, &sel.value,
2939 			TAG_STRING, &port_name,
2940 			TAG_S64, &offset,
2941 			TAG_INVALID)) < 0)
2942 		return -EPROTO;
2943 
2944 	pw_log_info("[%s] %s tag:%u index:%u card_name:%s port_name:%s offset:%"PRIi64,
2945 			client->name, commands[command].name, tag, sel.id, sel.value, port_name, offset);
2946 
2947 	if ((sel.id == SPA_ID_INVALID && sel.value == NULL) ||
2948 	    (sel.id != SPA_ID_INVALID && sel.value != NULL))
2949 		return -EINVAL;
2950 	if (port_name == NULL)
2951 		return -EINVAL;
2952 
2953 	value = offset * 1000;  /* to nsec */
2954 
2955 	if ((card = select_object(manager, &sel)) == NULL)
2956 		return -ENOENT;
2957 
2958 	collect_card_info(card, &card_info);
2959 	port_info = alloca(card_info.n_ports * sizeof(*port_info));
2960 	card_info.active_profile = SPA_ID_INVALID;
2961 	n_ports = collect_port_info(card, &card_info, NULL, port_info);
2962 
2963 	/* Set offset on all devices of the port */
2964 	res = -ENOENT;
2965 	for (i = 0; i < n_ports; i++) {
2966 		struct port_info *pi = &port_info[i];
2967 		size_t j;
2968 
2969 		if (!spa_streq(pi->name, port_name))
2970 			continue;
2971 
2972 		res = 0;
2973 		for (j = 0; j < pi->n_devices; ++j) {
2974 			res = set_card_volume_mute_delay(card, pi->id, pi->devices[j], NULL, NULL, &value);
2975 			if (res < 0)
2976 				break;
2977 		}
2978 
2979 		if (res < 0)
2980 			break;
2981 
2982 		return operation_new(client, tag);
2983 	}
2984 
2985 	return res;
2986 }
2987 
do_set_stream_name(struct client * client,uint32_t command,uint32_t tag,struct message * m)2988 static int do_set_stream_name(struct client *client, uint32_t command, uint32_t tag, struct message *m)
2989 {
2990 	uint32_t channel;
2991 	struct stream *stream;
2992 	const char *name = NULL;
2993 	struct spa_dict_item items[1];
2994 	int res;
2995 
2996 	if ((res = message_get(m,
2997 			TAG_U32, &channel,
2998 			TAG_STRING, &name,
2999 			TAG_INVALID)) < 0)
3000 		return -EPROTO;
3001 
3002 	if (name == NULL)
3003 		return -EINVAL;
3004 
3005 	pw_log_info("[%s] SET_STREAM_NAME tag:%u channel:%d name:%s",
3006 			client->name, tag, channel, name);
3007 
3008 	stream = pw_map_lookup(&client->streams, channel);
3009 	if (stream == NULL || stream->type == STREAM_TYPE_UPLOAD)
3010 		return -ENOENT;
3011 
3012 	items[0] = SPA_DICT_ITEM_INIT(PW_KEY_MEDIA_NAME, name);
3013 	pw_stream_update_properties(stream->stream,
3014 			&SPA_DICT_INIT(items, 1));
3015 
3016 	return reply_simple_ack(client, tag);
3017 }
3018 
do_update_proplist(struct client * client,uint32_t command,uint32_t tag,struct message * m)3019 static int do_update_proplist(struct client *client, uint32_t command, uint32_t tag, struct message *m)
3020 {
3021 	uint32_t channel, mode;
3022 	struct stream *stream;
3023 	struct pw_properties *props;
3024 	int res;
3025 
3026 	props = pw_properties_new(NULL, NULL);
3027 	if (props == NULL)
3028 		return -errno;
3029 
3030 	if (command != COMMAND_UPDATE_CLIENT_PROPLIST) {
3031 		if (message_get(m,
3032 				TAG_U32, &channel,
3033 				TAG_INVALID) < 0)
3034 			goto error_protocol;
3035 	} else {
3036 		channel = SPA_ID_INVALID;
3037 	}
3038 
3039 	pw_log_info("[%s] %s tag:%u channel:%d",
3040 			client->name, commands[command].name, tag, channel);
3041 
3042 	if (message_get(m,
3043 			TAG_U32, &mode,
3044 			TAG_PROPLIST, props,
3045 			TAG_INVALID) < 0)
3046 		goto error_protocol;
3047 
3048 	if (command != COMMAND_UPDATE_CLIENT_PROPLIST) {
3049 		stream = pw_map_lookup(&client->streams, channel);
3050 		if (stream == NULL || stream->type == STREAM_TYPE_UPLOAD)
3051 			goto error_noentity;
3052 
3053 		pw_stream_update_properties(stream->stream, &props->dict);
3054 	} else {
3055 		if (pw_properties_update(client->props, &props->dict) > 0) {
3056 			client_update_quirks(client);
3057 			pw_core_update_properties(client->core, &client->props->dict);
3058 		}
3059 	}
3060 	res = reply_simple_ack(client, tag);
3061 exit:
3062 	pw_properties_free(props);
3063 	return res;
3064 
3065 error_protocol:
3066 	res = -EPROTO;
3067 	goto exit;
3068 error_noentity:
3069 	res = -ENOENT;
3070 	goto exit;
3071 }
3072 
do_remove_proplist(struct client * client,uint32_t command,uint32_t tag,struct message * m)3073 static int do_remove_proplist(struct client *client, uint32_t command, uint32_t tag, struct message *m)
3074 {
3075 	uint32_t i, channel;
3076 	struct stream *stream;
3077 	struct pw_properties *props;
3078 	struct spa_dict dict;
3079 	struct spa_dict_item *items;
3080 	int res;
3081 
3082 	props = pw_properties_new(NULL, NULL);
3083 	if (props == NULL)
3084 		return -errno;
3085 
3086 	if (command != COMMAND_REMOVE_CLIENT_PROPLIST) {
3087 		if (message_get(m,
3088 				TAG_U32, &channel,
3089 				TAG_INVALID) < 0)
3090 			goto error_protocol;
3091 	} else {
3092 		channel = SPA_ID_INVALID;
3093 	}
3094 
3095 	pw_log_info("[%s] %s tag:%u channel:%d",
3096 			client->name, commands[command].name, tag, channel);
3097 
3098 	while (true) {
3099 		const char *key;
3100 
3101 		if (message_get(m,
3102 				TAG_STRING, &key,
3103 				TAG_INVALID) < 0)
3104 			goto error_protocol;
3105 		if (key == NULL)
3106 			break;
3107 		pw_properties_set(props, key, key);
3108 	}
3109 
3110 	dict.n_items = props->dict.n_items;
3111 	dict.items = items = alloca(sizeof(struct spa_dict_item) * dict.n_items);
3112 	for (i = 0; i < dict.n_items; i++) {
3113 		items[i].key = props->dict.items[i].key;
3114 		items[i].value = NULL;
3115 	}
3116 
3117 	if (command != COMMAND_UPDATE_CLIENT_PROPLIST) {
3118 		stream = pw_map_lookup(&client->streams, channel);
3119 		if (stream == NULL || stream->type == STREAM_TYPE_UPLOAD)
3120 			goto error_noentity;
3121 
3122 		pw_stream_update_properties(stream->stream, &dict);
3123 	} else {
3124 		pw_core_update_properties(client->core, &dict);
3125 	}
3126 	res = reply_simple_ack(client, tag);
3127 exit:
3128 	pw_properties_free(props);
3129 	return res;
3130 
3131 error_protocol:
3132 	res = -EPROTO;
3133 	goto exit;
3134 error_noentity:
3135 	res = -ENOENT;
3136 	goto exit;
3137 }
3138 
3139 
do_get_server_info(struct client * client,uint32_t command,uint32_t tag,struct message * m)3140 static int do_get_server_info(struct client *client, uint32_t command, uint32_t tag, struct message *m)
3141 {
3142 	struct impl *impl = client->impl;
3143 	struct pw_manager *manager = client->manager;
3144 	struct pw_core_info *info = manager ? manager->info : NULL;
3145 	char name[256];
3146 	struct message *reply;
3147 
3148 	pw_log_info("[%s] GET_SERVER_INFO tag:%u", client->name, tag);
3149 
3150 	snprintf(name, sizeof(name), "PulseAudio (on PipeWire %s)", pw_get_library_version());
3151 
3152 	reply = reply_new(client, tag);
3153 	message_put(reply,
3154 		TAG_STRING, name,
3155 		TAG_STRING, "15.0.0",
3156 		TAG_STRING, pw_get_user_name(),
3157 		TAG_STRING, pw_get_host_name(),
3158 		TAG_SAMPLE_SPEC, &impl->defs.sample_spec,
3159 		TAG_STRING, manager ? get_default(client, true) : "",	/* default sink name */
3160 		TAG_STRING, manager ? get_default(client, false) : "",	/* default source name */
3161 		TAG_U32, info ? info->cookie : 0,			/* cookie */
3162 		TAG_INVALID);
3163 
3164 	if (client->version >= 15) {
3165 		message_put(reply,
3166 			TAG_CHANNEL_MAP, &impl->defs.channel_map,
3167 			TAG_INVALID);
3168 	}
3169 	return client_queue_message(client, reply);
3170 }
3171 
do_stat(struct client * client,uint32_t command,uint32_t tag,struct message * m)3172 static int do_stat(struct client *client, uint32_t command, uint32_t tag, struct message *m)
3173 {
3174 	struct impl *impl = client->impl;
3175 	struct message *reply;
3176 
3177 	pw_log_info("[%s] STAT tag:%u", client->name, tag);
3178 
3179 	reply = reply_new(client, tag);
3180 	message_put(reply,
3181 		TAG_U32, impl->stat.n_allocated,	/* n_allocated */
3182 		TAG_U32, impl->stat.allocated,		/* allocated size */
3183 		TAG_U32, impl->stat.n_accumulated,	/* n_accumulated */
3184 		TAG_U32, impl->stat.accumulated,	/* accumulated_size */
3185 		TAG_U32, impl->stat.sample_cache,	/* sample cache size */
3186 		TAG_INVALID);
3187 
3188 	return client_queue_message(client, reply);
3189 }
3190 
do_lookup(struct client * client,uint32_t command,uint32_t tag,struct message * m)3191 static int do_lookup(struct client *client, uint32_t command, uint32_t tag, struct message *m)
3192 {
3193 	struct message *reply;
3194 	struct pw_manager_object *o;
3195 	const char *name;
3196 	bool is_sink = command == COMMAND_LOOKUP_SINK;
3197 	bool is_monitor;
3198 
3199 	if (message_get(m,
3200 			TAG_STRING, &name,
3201 			TAG_INVALID) < 0)
3202 		return -EPROTO;
3203 
3204 	pw_log_info("[%s] LOOKUP tag:%u name:'%s'", client->name, tag, name);
3205 
3206 	if ((o = find_device(client, SPA_ID_INVALID, name, is_sink, &is_monitor)) == NULL)
3207 		return -ENOENT;
3208 
3209 	reply = reply_new(client, tag);
3210 	message_put(reply,
3211 		TAG_U32, is_monitor ? o->id | MONITOR_FLAG : o->id,
3212 		TAG_INVALID);
3213 
3214 	return client_queue_message(client, reply);
3215 }
3216 
do_drain_stream(struct client * client,uint32_t command,uint32_t tag,struct message * m)3217 static int do_drain_stream(struct client *client, uint32_t command, uint32_t tag, struct message *m)
3218 {
3219 	uint32_t channel;
3220 	struct stream *stream;
3221 
3222 	if (message_get(m,
3223 			TAG_U32, &channel,
3224 			TAG_INVALID) < 0)
3225 		return -EPROTO;
3226 
3227 	pw_log_info("[%s] DRAIN tag:%u channel:%d", client->name, tag, channel);
3228 	stream = pw_map_lookup(&client->streams, channel);
3229 	if (stream == NULL || stream->type != STREAM_TYPE_PLAYBACK)
3230 		return -ENOENT;
3231 
3232 	stream->drain_tag = tag;
3233 	stream->draining = true;
3234 	pw_stream_set_active(stream->stream, true);
3235 
3236 	return 0;
3237 }
3238 
fill_client_info(struct client * client,struct message * m,struct pw_manager_object * o)3239 static int fill_client_info(struct client *client, struct message *m,
3240 		struct pw_manager_object *o)
3241 {
3242 	struct pw_client_info *info = o->info;
3243 	const char *str;
3244 	uint32_t module_id = SPA_ID_INVALID;
3245 
3246 	if (!pw_manager_object_is_client(o) || info == NULL || info->props == NULL)
3247 		return -ENOENT;
3248 
3249 	if ((str = spa_dict_lookup(info->props, PW_KEY_MODULE_ID)) != NULL)
3250 		module_id = (uint32_t)atoi(str);
3251 
3252 	message_put(m,
3253 		TAG_U32, o->id,				/* client index */
3254 		TAG_STRING, pw_properties_get(o->props, PW_KEY_APP_NAME),
3255 		TAG_U32, module_id,			/* module */
3256 		TAG_STRING, "PipeWire",			/* driver */
3257 		TAG_INVALID);
3258 	if (client->version >= 13) {
3259 		message_put(m,
3260 			TAG_PROPLIST, info->props,
3261 			TAG_INVALID);
3262 	}
3263 	return 0;
3264 }
3265 
fill_module_info(struct client * client,struct message * m,struct pw_manager_object * o)3266 static int fill_module_info(struct client *client, struct message *m,
3267 		struct pw_manager_object *o)
3268 {
3269 	struct pw_module_info *info = o->info;
3270 
3271 	if (!pw_manager_object_is_module(o) || info == NULL || info->props == NULL)
3272 		return -ENOENT;
3273 
3274 	message_put(m,
3275 		TAG_U32, o->id,				/* module index */
3276 		TAG_STRING, info->name,
3277 		TAG_STRING, info->args,
3278 		TAG_U32, -1,				/* n_used */
3279 		TAG_INVALID);
3280 
3281 	if (client->version < 15) {
3282 		message_put(m,
3283 			TAG_BOOLEAN, false,		/* auto unload deprecated */
3284 			TAG_INVALID);
3285 	}
3286 	if (client->version >= 15) {
3287 		message_put(m,
3288 			TAG_PROPLIST, info->props,
3289 			TAG_INVALID);
3290 	}
3291 	return 0;
3292 }
3293 
fill_ext_module_info(struct client * client,struct message * m,struct module * module)3294 static int fill_ext_module_info(struct client *client, struct message *m,
3295 		struct module *module)
3296 {
3297 	message_put(m,
3298 		TAG_U32, module->idx,			/* module index */
3299 		TAG_STRING, module->name,
3300 		TAG_STRING, module->args,
3301 		TAG_U32, -1,				/* n_used */
3302 		TAG_INVALID);
3303 
3304 	if (client->version < 15) {
3305 		message_put(m,
3306 			TAG_BOOLEAN, false,		/* auto unload deprecated */
3307 			TAG_INVALID);
3308 	}
3309 	if (client->version >= 15) {
3310 		message_put(m,
3311 			TAG_PROPLIST, module->props,
3312 			TAG_INVALID);
3313 	}
3314 	return 0;
3315 }
3316 
get_port_latency_offset(struct client * client,struct pw_manager_object * card,struct port_info * pi)3317 static int64_t get_port_latency_offset(struct client *client, struct pw_manager_object *card, struct port_info *pi)
3318 {
3319 	struct pw_manager *m = client->manager;
3320 	struct pw_manager_object *o;
3321 	size_t j;
3322 
3323 	/*
3324 	 * The latency offset is a property of nodes in PipeWire, so we look it up on the
3325 	 * nodes. We'll return the latency offset of the first node in the port.
3326 	 *
3327 	 * This is also because we need to be consistent with
3328 	 * send_latency_offset_subscribe_event, which sends events on node changes. The
3329 	 * route data might not be updated yet when these events arrive.
3330 	 */
3331 	for (j = 0; j < pi->n_devices; ++j) {
3332 		spa_list_for_each(o, &m->object_list, link) {
3333 			const char *str;
3334 			uint32_t card_id = SPA_ID_INVALID;
3335 			uint32_t device_id = SPA_ID_INVALID;
3336 			struct pw_node_info *info;
3337 
3338 			if (o->creating || o->removing)
3339 				continue;
3340 			if (!pw_manager_object_is_sink(o) && !pw_manager_object_is_source_or_monitor(o))
3341 				continue;
3342 			if ((info = o->info) == NULL || info->props == NULL)
3343 				continue;
3344 			if ((str = spa_dict_lookup(info->props, PW_KEY_DEVICE_ID)) != NULL)
3345 				card_id = (uint32_t)atoi(str);
3346 			if (card_id != card->id)
3347 				continue;
3348 
3349 			if ((str = spa_dict_lookup(info->props, "card.profile.device")) != NULL)
3350 				device_id = (uint32_t)atoi(str);
3351 
3352 			if (device_id == pi->devices[j])
3353 				return get_node_latency_offset(o);
3354 		}
3355 	}
3356 
3357 	return 0LL;
3358 }
3359 
fill_card_info(struct client * client,struct message * m,struct pw_manager_object * o)3360 static int fill_card_info(struct client *client, struct message *m,
3361 		struct pw_manager_object *o)
3362 {
3363 	struct pw_device_info *info = o->info;
3364 	const char *str, *drv_name;
3365 	uint32_t module_id = SPA_ID_INVALID, n_profiles, n;
3366 	struct card_info card_info = CARD_INFO_INIT;
3367 	struct profile_info *profile_info;
3368 
3369 	if (!pw_manager_object_is_card(o) || info == NULL || info->props == NULL)
3370 		return -ENOENT;
3371 
3372 	if ((str = spa_dict_lookup(info->props, PW_KEY_MODULE_ID)) != NULL)
3373 		module_id = (uint32_t)atoi(str);
3374 
3375 	drv_name = spa_dict_lookup(info->props, PW_KEY_DEVICE_API);
3376 	if (drv_name && spa_streq("bluez5", drv_name))
3377 		drv_name = "module-bluez5-device.c"; /* blueman needs this */
3378 
3379 	message_put(m,
3380 		TAG_U32, o->id,				/* card index */
3381 		TAG_STRING, spa_dict_lookup(info->props, PW_KEY_DEVICE_NAME),
3382 		TAG_U32, module_id,
3383 		TAG_STRING, drv_name,
3384 		TAG_INVALID);
3385 
3386 	collect_card_info(o, &card_info);
3387 
3388 	message_put(m,
3389 		TAG_U32, card_info.n_profiles,			/* n_profiles */
3390 		TAG_INVALID);
3391 
3392 	profile_info = alloca(card_info.n_profiles * sizeof(*profile_info));
3393 	n_profiles = collect_profile_info(o, &card_info, profile_info);
3394 
3395 	for (n = 0; n < n_profiles; n++) {
3396 		struct profile_info *pi = &profile_info[n];
3397 
3398 		message_put(m,
3399 			TAG_STRING, pi->name,			/* profile name */
3400 			TAG_STRING, pi->description,		/* profile description */
3401 			TAG_U32, pi->n_sinks,			/* n_sinks */
3402 			TAG_U32, pi->n_sources,			/* n_sources */
3403 			TAG_U32, pi->priority,			/* priority */
3404 			TAG_INVALID);
3405 
3406 		if (client->version >= 29) {
3407 			message_put(m,
3408 				TAG_U32, pi->available != SPA_PARAM_AVAILABILITY_no,		/* available */
3409 				TAG_INVALID);
3410 		}
3411 	}
3412 	message_put(m,
3413 		TAG_STRING, card_info.active_profile_name,	/* active profile name */
3414 		TAG_PROPLIST, info->props,
3415 		TAG_INVALID);
3416 
3417 	if (client->version >= 26) {
3418 		uint32_t n_ports;
3419 		struct port_info *port_info, *pi;
3420 
3421 		port_info = alloca(card_info.n_ports * sizeof(*port_info));
3422 		card_info.active_profile = SPA_ID_INVALID;
3423 		n_ports = collect_port_info(o, &card_info, NULL, port_info);
3424 
3425 		message_put(m,
3426 			TAG_U32, n_ports,				/* n_ports */
3427 			TAG_INVALID);
3428 
3429 		for (n = 0; n < n_ports; n++) {
3430 			struct spa_dict_item *items;
3431 			struct spa_dict *pdict = NULL, dict;
3432 			uint32_t i, pi_n_profiles;
3433 
3434 			pi = &port_info[n];
3435 
3436 			if (pi->info && pi->n_props > 0) {
3437 				items = alloca(pi->n_props * sizeof(*items));
3438 				dict.items = items;
3439 				pdict = collect_props(pi->info, &dict);
3440 			}
3441 
3442 			message_put(m,
3443 				TAG_STRING, pi->name,			/* port name */
3444 				TAG_STRING, pi->description,		/* port description */
3445 				TAG_U32, pi->priority,			/* port priority */
3446 				TAG_U32, pi->available,			/* port available */
3447 				TAG_U8, pi->direction == SPA_DIRECTION_INPUT ? 2 : 1,	/* port direction */
3448 				TAG_PROPLIST, pdict,			/* port proplist */
3449 				TAG_INVALID);
3450 
3451 			pi_n_profiles = SPA_MIN(pi->n_profiles, n_profiles);
3452 			if (pi->n_profiles != pi_n_profiles) {
3453 				/* libpulse assumes port profile array size <= n_profiles */
3454 				pw_log_error("%p: card %d port %d profiles inconsistent (%d < %d)",
3455 						client->impl, o->id, n, n_profiles, pi->n_profiles);
3456 			}
3457 
3458 			message_put(m,
3459 				TAG_U32, pi_n_profiles,		/* n_profiles */
3460 				TAG_INVALID);
3461 
3462 			for (i = 0; i < pi_n_profiles; i++) {
3463 				uint32_t j;
3464 				const char *name = "off";
3465 
3466 				for (j = 0; j < n_profiles; ++j) {
3467 					if (profile_info[j].id == pi->profiles[i]) {
3468 						name = profile_info[j].name;
3469 						break;
3470 					}
3471 				}
3472 
3473 				message_put(m,
3474 					TAG_STRING, name,	/* profile name */
3475 					TAG_INVALID);
3476 			}
3477 			if (client->version >= 27) {
3478 				int64_t latency_offset = get_port_latency_offset(client, o, pi);
3479 				message_put(m,
3480 					TAG_S64, latency_offset / 1000,	/* port latency offset */
3481 					TAG_INVALID);
3482 			}
3483 			if (client->version >= 34) {
3484 				message_put(m,
3485 					TAG_STRING, pi->availability_group,	/* available group */
3486 					TAG_U32, pi->type,		/* port type */
3487 					TAG_INVALID);
3488 			}
3489 		}
3490 	}
3491 	return 0;
3492 }
3493 
fill_sink_info(struct client * client,struct message * m,struct pw_manager_object * o)3494 static int fill_sink_info(struct client *client, struct message *m,
3495 		struct pw_manager_object *o)
3496 {
3497 	struct impl *impl = client->impl;
3498 	struct pw_node_info *info = o->info;
3499 	struct pw_manager *manager = client->manager;
3500 	const char *name, *desc, *str;
3501 	char *monitor_name = NULL;
3502 	uint32_t module_id = SPA_ID_INVALID;
3503 	uint32_t card_id = SPA_ID_INVALID;
3504 	struct pw_manager_object *card = NULL;
3505 	uint32_t flags;
3506 	struct card_info card_info = CARD_INFO_INIT;
3507 	struct device_info dev_info = DEVICE_INFO_INIT(PW_DIRECTION_OUTPUT);
3508 	size_t size;
3509 
3510 	if (!pw_manager_object_is_sink(o) || info == NULL || info->props == NULL)
3511 		return -ENOENT;
3512 
3513 	name = spa_dict_lookup(info->props, PW_KEY_NODE_NAME);
3514 	if ((desc = spa_dict_lookup(info->props, PW_KEY_NODE_DESCRIPTION)) == NULL)
3515 		desc = name ? name : "Unknown";
3516 	if (name == NULL)
3517 		name = "unknown";
3518 
3519 	size = strlen(name) + 10;
3520 	monitor_name = alloca(size);
3521 	if (pw_manager_object_is_source(o))
3522 		snprintf(monitor_name, size, "%s", name);
3523 	else
3524 		snprintf(monitor_name, size, "%s.monitor", name);
3525 
3526 	if ((str = spa_dict_lookup(info->props, PW_KEY_MODULE_ID)) != NULL)
3527 		module_id = (uint32_t)atoi(str);
3528 	if ((str = spa_dict_lookup(info->props, PW_KEY_DEVICE_ID)) != NULL)
3529 		card_id = (uint32_t)atoi(str);
3530 	if ((str = spa_dict_lookup(info->props, "card.profile.device")) != NULL)
3531 		dev_info.device = (uint32_t)atoi(str);
3532 	if (card_id != SPA_ID_INVALID) {
3533 		struct selector sel = { .id = card_id, .type = pw_manager_object_is_card, };
3534 		card = select_object(manager, &sel);
3535 	}
3536 	if (card)
3537 		collect_card_info(card, &card_info);
3538 
3539 	collect_device_info(o, card, &dev_info, false, &impl->defs);
3540 
3541 	if (!sample_spec_valid(&dev_info.ss) ||
3542 	    !channel_map_valid(&dev_info.map) ||
3543 	    !volume_valid(&dev_info.volume_info.volume)) {
3544 		pw_log_warn("%d: sink not ready: sample:%d map:%d volume:%d",
3545 				o->id, sample_spec_valid(&dev_info.ss),
3546 				channel_map_valid(&dev_info.map),
3547 				volume_valid(&dev_info.volume_info.volume));
3548 		return -ENOENT;
3549 	}
3550 
3551 	flags = SINK_LATENCY | SINK_DYNAMIC_LATENCY | SINK_DECIBEL_VOLUME;
3552 	if ((str = spa_dict_lookup(info->props, PW_KEY_DEVICE_API)) != NULL)
3553 		flags |= SINK_HARDWARE;
3554 	if ((str = spa_dict_lookup(info->props, PW_KEY_NODE_NETWORK)) != NULL)
3555 		flags |= SINK_NETWORK;
3556 	if (SPA_FLAG_IS_SET(dev_info.volume_info.flags, VOLUME_HW_VOLUME))
3557 		flags |= SINK_HW_VOLUME_CTRL;
3558 	if (SPA_FLAG_IS_SET(dev_info.volume_info.flags, VOLUME_HW_MUTE))
3559 		flags |= SINK_HW_MUTE_CTRL;
3560 	if (dev_info.have_iec958codecs)
3561 		flags |= SINK_SET_FORMATS;
3562 
3563 	if (client->quirks & QUIRK_FORCE_S16_FORMAT)
3564 		dev_info.ss.format = SPA_AUDIO_FORMAT_S16;
3565 
3566 	message_put(m,
3567 		TAG_U32, o->id,				/* sink index */
3568 		TAG_STRING, name,
3569 		TAG_STRING, desc,
3570 		TAG_SAMPLE_SPEC, &dev_info.ss,
3571 		TAG_CHANNEL_MAP, &dev_info.map,
3572 		TAG_U32, module_id,			/* module index */
3573 		TAG_CVOLUME, &dev_info.volume_info.volume,
3574 		TAG_BOOLEAN, dev_info.volume_info.mute,
3575 		TAG_U32, o->id | MONITOR_FLAG,		/* monitor source */
3576 		TAG_STRING, monitor_name,		/* monitor source name */
3577 		TAG_USEC, 0LL,				/* latency */
3578 		TAG_STRING, "PipeWire",			/* driver */
3579 		TAG_U32, flags,				/* flags */
3580 		TAG_INVALID);
3581 
3582 	if (client->version >= 13) {
3583 		message_put(m,
3584 			TAG_PROPLIST, info->props,
3585 			TAG_USEC, 0LL,			/* requested latency */
3586 			TAG_INVALID);
3587 	}
3588 	if (client->version >= 15) {
3589 		bool is_linked = collect_is_linked(manager, o->id, SPA_DIRECTION_INPUT);
3590 		int state = node_state(info->state);
3591 
3592 		/* running with nothing linked is probably the monitor that is
3593 		 * keeping this sink busy */
3594 		if (state == STATE_RUNNING && !is_linked)
3595 			state = STATE_IDLE;
3596 
3597 		message_put(m,
3598 			TAG_VOLUME, dev_info.volume_info.base,	/* base volume */
3599 			TAG_U32, state,				/* state */
3600 			TAG_U32, dev_info.volume_info.steps,	/* n_volume_steps */
3601 			TAG_U32, card_id,			/* card index */
3602 			TAG_INVALID);
3603 	}
3604 	if (client->version >= 16) {
3605 		uint32_t n_ports, n;
3606 		struct port_info *port_info, *pi;
3607 
3608 		port_info = alloca(card_info.n_ports * sizeof(*port_info));
3609 		n_ports = collect_port_info(card, &card_info, &dev_info, port_info);
3610 
3611 		message_put(m,
3612 			TAG_U32, n_ports,			/* n_ports */
3613 			TAG_INVALID);
3614 		for (n = 0; n < n_ports; n++) {
3615 			pi = &port_info[n];
3616 			message_put(m,
3617 				TAG_STRING, pi->name,		/* name */
3618 				TAG_STRING, pi->description,	/* description */
3619 				TAG_U32, pi->priority,		/* priority */
3620 				TAG_INVALID);
3621 			if (client->version >= 24) {
3622 				message_put(m,
3623 					TAG_U32, pi->available,		/* available */
3624 					TAG_INVALID);
3625 			}
3626 			if (client->version >= 34) {
3627 				message_put(m,
3628 					TAG_STRING, pi->availability_group,	/* availability_group */
3629 					TAG_U32, pi->type,			/* type */
3630 					TAG_INVALID);
3631 			}
3632 		}
3633 		message_put(m,
3634 			TAG_STRING, dev_info.active_port_name,		/* active port name */
3635 			TAG_INVALID);
3636 	}
3637 	if (client->version >= 21) {
3638 		struct pw_manager_param *p;
3639 		struct format_info info[32];
3640 		uint32_t i, n_info = 0;
3641 
3642 		spa_list_for_each(p, &o->param_list, link) {
3643 			uint32_t index = 0;
3644 
3645 			if (p->id != SPA_PARAM_EnumFormat)
3646 				continue;
3647 
3648 			while (n_info < SPA_N_ELEMENTS(info)) {
3649 				spa_zero(info[n_info]);
3650 				if (format_info_from_param(&info[n_info], p->param, index++) < 0)
3651 					break;
3652 				if (info[n_info].encoding == ENCODING_ANY ||
3653 				    (info[n_info].encoding == ENCODING_PCM && info[n_info].props != NULL)) {
3654 					format_info_clear(&info[n_info]);
3655 					continue;
3656 				}
3657 				n_info++;
3658 			}
3659 		}
3660 		message_put(m,
3661 			TAG_U8, n_info,				/* n_formats */
3662 			TAG_INVALID);
3663 		for (i = 0; i < n_info; i++) {
3664 			message_put(m,
3665 				TAG_FORMAT_INFO, &info[i],
3666 				TAG_INVALID);
3667 			format_info_clear(&info[i]);
3668 		}
3669 	}
3670 	return 0;
3671 }
3672 
fill_source_info(struct client * client,struct message * m,struct pw_manager_object * o)3673 static int fill_source_info(struct client *client, struct message *m,
3674 		struct pw_manager_object *o)
3675 {
3676 	struct impl *impl = client->impl;
3677 	struct pw_node_info *info = o->info;
3678 	struct pw_manager *manager = client->manager;
3679 	bool is_monitor;
3680 	const char *name, *desc, *str;
3681 	char *monitor_name = NULL;
3682 	char *monitor_desc = NULL;
3683 	uint32_t module_id = SPA_ID_INVALID;
3684 	uint32_t card_id = SPA_ID_INVALID;
3685 	struct pw_manager_object *card = NULL;
3686 	uint32_t flags;
3687 	struct card_info card_info = CARD_INFO_INIT;
3688 	struct device_info dev_info = DEVICE_INFO_INIT(PW_DIRECTION_INPUT);
3689 	size_t size;
3690 
3691 	is_monitor = pw_manager_object_is_monitor(o);
3692 	if ((!pw_manager_object_is_source(o) && !is_monitor) || info == NULL || info->props == NULL)
3693 		return -ENOENT;
3694 
3695 	name = spa_dict_lookup(info->props, PW_KEY_NODE_NAME);
3696 	if ((desc = spa_dict_lookup(info->props, PW_KEY_NODE_DESCRIPTION)) == NULL)
3697 		desc = name ? name : "Unknown";
3698 	if (name == NULL)
3699 		name = "unknown";
3700 
3701 	size = strlen(name) + 10;
3702 	monitor_name = alloca(size);
3703 	snprintf(monitor_name, size, "%s.monitor", name);
3704 
3705 	size = strlen(desc) + 20;
3706 	monitor_desc = alloca(size);
3707 	snprintf(monitor_desc, size, "Monitor of %s", desc);
3708 
3709 	if ((str = spa_dict_lookup(info->props, PW_KEY_MODULE_ID)) != NULL)
3710 		module_id = (uint32_t)atoi(str);
3711 	if ((str = spa_dict_lookup(info->props, PW_KEY_DEVICE_ID)) != NULL)
3712 		card_id = (uint32_t)atoi(str);
3713 	if ((str = spa_dict_lookup(info->props, "card.profile.device")) != NULL)
3714 		dev_info.device = (uint32_t)atoi(str);
3715 
3716 	if (card_id != SPA_ID_INVALID) {
3717 		struct selector sel = { .id = card_id, .type = pw_manager_object_is_card, };
3718 		card = select_object(manager, &sel);
3719 	}
3720 	if (card)
3721 		collect_card_info(card, &card_info);
3722 
3723 	collect_device_info(o, card, &dev_info, is_monitor, &impl->defs);
3724 
3725 	if (!sample_spec_valid(&dev_info.ss) ||
3726 	    !channel_map_valid(&dev_info.map) ||
3727 	    !volume_valid(&dev_info.volume_info.volume)) {
3728 		pw_log_warn("%d: source not ready: sample:%d map:%d volume:%d",
3729 				o->id, sample_spec_valid(&dev_info.ss),
3730 				channel_map_valid(&dev_info.map),
3731 				volume_valid(&dev_info.volume_info.volume));
3732 		return -ENOENT;
3733 	}
3734 
3735 	flags = SOURCE_LATENCY | SOURCE_DYNAMIC_LATENCY | SOURCE_DECIBEL_VOLUME;
3736 	if ((str = spa_dict_lookup(info->props, PW_KEY_DEVICE_API)) != NULL)
3737 		flags |= SOURCE_HARDWARE;
3738 	if ((str = spa_dict_lookup(info->props, PW_KEY_NODE_NETWORK)) != NULL)
3739 		flags |= SOURCE_NETWORK;
3740 	if (SPA_FLAG_IS_SET(dev_info.volume_info.flags, VOLUME_HW_VOLUME))
3741 		flags |= SOURCE_HW_VOLUME_CTRL;
3742 	if (SPA_FLAG_IS_SET(dev_info.volume_info.flags, VOLUME_HW_MUTE))
3743 		flags |= SOURCE_HW_MUTE_CTRL;
3744 
3745 	if (client->quirks & QUIRK_FORCE_S16_FORMAT)
3746 		dev_info.ss.format = SPA_AUDIO_FORMAT_S16;
3747 
3748 	message_put(m,
3749 		TAG_U32, is_monitor ? o->id | MONITOR_FLAG: o->id,	/* source index */
3750 		TAG_STRING, is_monitor ? monitor_name : name,
3751 		TAG_STRING, is_monitor ? monitor_desc : desc,
3752 		TAG_SAMPLE_SPEC, &dev_info.ss,
3753 		TAG_CHANNEL_MAP, &dev_info.map,
3754 		TAG_U32, module_id,				/* module index */
3755 		TAG_CVOLUME, &dev_info.volume_info.volume,
3756 		TAG_BOOLEAN, dev_info.volume_info.mute,
3757 		TAG_U32, is_monitor ? o->id : SPA_ID_INVALID,	/* monitor of sink */
3758 		TAG_STRING, is_monitor ? name : NULL,		/* monitor of sink name */
3759 		TAG_USEC, 0LL,					/* latency */
3760 		TAG_STRING, "PipeWire",				/* driver */
3761 		TAG_U32, flags,					/* flags */
3762 		TAG_INVALID);
3763 
3764 	if (client->version >= 13) {
3765 		message_put(m,
3766 			TAG_PROPLIST, info->props,
3767 			TAG_USEC, 0LL,			/* requested latency */
3768 			TAG_INVALID);
3769 	}
3770 	if (client->version >= 15) {
3771 		bool is_linked = collect_is_linked(manager, o->id, SPA_DIRECTION_OUTPUT);
3772 		int state = node_state(info->state);
3773 
3774 		/* running with nothing linked is probably the sink that is
3775 		 * keeping this source busy */
3776 		if (state == STATE_RUNNING && !is_linked)
3777 			state = STATE_IDLE;
3778 
3779 		message_put(m,
3780 			TAG_VOLUME, dev_info.volume_info.base,	/* base volume */
3781 			TAG_U32, state,				/* state */
3782 			TAG_U32, dev_info.volume_info.steps,	/* n_volume_steps */
3783 			TAG_U32, card_id,			/* card index */
3784 			TAG_INVALID);
3785 	}
3786 	if (client->version >= 16) {
3787 		uint32_t n_ports, n;
3788 		struct port_info *port_info, *pi;
3789 
3790 		port_info = alloca(card_info.n_ports * sizeof(*port_info));
3791 		n_ports = collect_port_info(card, &card_info, &dev_info, port_info);
3792 
3793 		message_put(m,
3794 			TAG_U32, n_ports,			/* n_ports */
3795 			TAG_INVALID);
3796 		for (n = 0; n < n_ports; n++) {
3797 			pi = &port_info[n];
3798 			message_put(m,
3799 				TAG_STRING, pi->name,		/* name */
3800 				TAG_STRING, pi->description,	/* description */
3801 				TAG_U32, pi->priority,		/* priority */
3802 				TAG_INVALID);
3803 			if (client->version >= 24) {
3804 				message_put(m,
3805 					TAG_U32, pi->available,		/* available */
3806 					TAG_INVALID);
3807 			}
3808 			if (client->version >= 34) {
3809 				message_put(m,
3810 					TAG_STRING, pi->availability_group,	/* availability_group */
3811 					TAG_U32, pi->type,			/* type */
3812 					TAG_INVALID);
3813 			}
3814 		}
3815 		message_put(m,
3816 			TAG_STRING, dev_info.active_port_name,		/* active port name */
3817 			TAG_INVALID);
3818 	}
3819 	if (client->version >= 21) {
3820 		struct format_info info;
3821 		spa_zero(info);
3822 		info.encoding = ENCODING_PCM;
3823 		message_put(m,
3824 			TAG_U8, 1,			/* n_formats */
3825 			TAG_FORMAT_INFO, &info,
3826 			TAG_INVALID);
3827 	}
3828 	return 0;
3829 }
3830 
get_media_name(struct pw_node_info * info)3831 static const char *get_media_name(struct pw_node_info *info)
3832 {
3833 	const char *media_name;
3834 	media_name = spa_dict_lookup(info->props, PW_KEY_MEDIA_NAME);
3835 	if (media_name == NULL)
3836 		media_name = "";
3837 	return media_name;
3838 }
3839 
fill_sink_input_info(struct client * client,struct message * m,struct pw_manager_object * o)3840 static int fill_sink_input_info(struct client *client, struct message *m,
3841 		struct pw_manager_object *o)
3842 {
3843 	struct impl *impl = client->impl;
3844 	struct pw_node_info *info = o->info;
3845 	struct pw_manager *manager = client->manager;
3846 	struct pw_manager_object *peer;
3847 	const char *str;
3848 	uint32_t module_id = SPA_ID_INVALID, client_id = SPA_ID_INVALID;
3849 	struct device_info dev_info = DEVICE_INFO_INIT(PW_DIRECTION_OUTPUT);
3850 
3851 	if (!pw_manager_object_is_sink_input(o) || info == NULL || info->props == NULL)
3852 		return -ENOENT;
3853 
3854 	if ((str = spa_dict_lookup(info->props, PW_KEY_MODULE_ID)) != NULL)
3855 		module_id = (uint32_t)atoi(str);
3856 	if (!pw_manager_object_is_virtual(o) &&
3857 	    (str = spa_dict_lookup(info->props, PW_KEY_CLIENT_ID)) != NULL)
3858 		client_id = (uint32_t)atoi(str);
3859 
3860 	collect_device_info(o, NULL, &dev_info, false, &impl->defs);
3861 
3862 	if (!sample_spec_valid(&dev_info.ss) ||
3863 	    !channel_map_valid(&dev_info.map) ||
3864 	    !volume_valid(&dev_info.volume_info.volume))
3865 		return -ENOENT;
3866 
3867 	peer = find_linked(manager, o->id, PW_DIRECTION_OUTPUT);
3868 
3869 	message_put(m,
3870 		TAG_U32, o->id,					/* sink_input index */
3871 		TAG_STRING, get_media_name(info),
3872 		TAG_U32, module_id,				/* module index */
3873 		TAG_U32, client_id,				/* client index */
3874 		TAG_U32, peer ? peer->id : SPA_ID_INVALID,	/* sink index */
3875 		TAG_SAMPLE_SPEC, &dev_info.ss,
3876 		TAG_CHANNEL_MAP, &dev_info.map,
3877 		TAG_CVOLUME, &dev_info.volume_info.volume,
3878 		TAG_USEC, 0LL,				/* latency */
3879 		TAG_USEC, 0LL,				/* sink latency */
3880 		TAG_STRING, "PipeWire",			/* resample method */
3881 		TAG_STRING, "PipeWire",			/* driver */
3882 		TAG_INVALID);
3883 	if (client->version >= 11)
3884 		message_put(m,
3885 			TAG_BOOLEAN, dev_info.volume_info.mute,	/* muted */
3886 			TAG_INVALID);
3887 	if (client->version >= 13)
3888 		message_put(m,
3889 			TAG_PROPLIST, info->props,
3890 			TAG_INVALID);
3891 	if (client->version >= 19)
3892 		message_put(m,
3893 			TAG_BOOLEAN, info->state != PW_NODE_STATE_RUNNING,		/* corked */
3894 			TAG_INVALID);
3895 	if (client->version >= 20)
3896 		message_put(m,
3897 			TAG_BOOLEAN, true,		/* has_volume */
3898 			TAG_BOOLEAN, true,		/* volume writable */
3899 			TAG_INVALID);
3900 	if (client->version >= 21) {
3901 		struct format_info fi;
3902 		format_info_from_spec(&fi, &dev_info.ss, &dev_info.map);
3903 		message_put(m,
3904 			TAG_FORMAT_INFO, &fi,
3905 			TAG_INVALID);
3906 		format_info_clear(&fi);
3907 	}
3908 	return 0;
3909 }
3910 
fill_source_output_info(struct client * client,struct message * m,struct pw_manager_object * o)3911 static int fill_source_output_info(struct client *client, struct message *m,
3912 		struct pw_manager_object *o)
3913 {
3914 	struct impl *impl = client->impl;
3915 	struct pw_node_info *info = o->info;
3916 	struct pw_manager *manager = client->manager;
3917 	struct pw_manager_object *peer;
3918 	const char *str;
3919 	uint32_t module_id = SPA_ID_INVALID, client_id = SPA_ID_INVALID;
3920 	uint32_t peer_id;
3921 	struct device_info dev_info = DEVICE_INFO_INIT(PW_DIRECTION_INPUT);
3922 
3923 	if (!pw_manager_object_is_source_output(o) || info == NULL || info->props == NULL)
3924 		return -ENOENT;
3925 
3926 	if ((str = spa_dict_lookup(info->props, PW_KEY_MODULE_ID)) != NULL)
3927 		module_id = (uint32_t)atoi(str);
3928 	if (!pw_manager_object_is_virtual(o) &&
3929 	    (str = spa_dict_lookup(info->props, PW_KEY_CLIENT_ID)) != NULL)
3930 		client_id = (uint32_t)atoi(str);
3931 
3932 	collect_device_info(o, NULL, &dev_info, false, &impl->defs);
3933 
3934 	if (!sample_spec_valid(&dev_info.ss) ||
3935 	    !channel_map_valid(&dev_info.map) ||
3936 	    !volume_valid(&dev_info.volume_info.volume))
3937 		return -ENOENT;
3938 
3939 	peer = find_linked(manager, o->id, PW_DIRECTION_INPUT);
3940 	if (peer && pw_manager_object_is_source_or_monitor(peer)) {
3941 		peer_id = peer->id;
3942 		if (!pw_manager_object_is_source(peer))
3943 			peer_id |= MONITOR_FLAG;
3944 	} else {
3945 		peer_id = SPA_ID_INVALID;
3946 	}
3947 
3948 	message_put(m,
3949 		TAG_U32, o->id,					/* source_output index */
3950 		TAG_STRING, get_media_name(info),
3951 		TAG_U32, module_id,				/* module index */
3952 		TAG_U32, client_id,				/* client index */
3953 		TAG_U32, peer_id,				/* source index */
3954 		TAG_SAMPLE_SPEC, &dev_info.ss,
3955 		TAG_CHANNEL_MAP, &dev_info.map,
3956 		TAG_USEC, 0LL,				/* latency */
3957 		TAG_USEC, 0LL,				/* source latency */
3958 		TAG_STRING, "PipeWire",			/* resample method */
3959 		TAG_STRING, "PipeWire",			/* driver */
3960 		TAG_INVALID);
3961 	if (client->version >= 13)
3962 		message_put(m,
3963 			TAG_PROPLIST, info->props,
3964 			TAG_INVALID);
3965 	if (client->version >= 19)
3966 		message_put(m,
3967 			TAG_BOOLEAN, info->state != PW_NODE_STATE_RUNNING,		/* corked */
3968 			TAG_INVALID);
3969 	if (client->version >= 22) {
3970 		struct format_info fi;
3971 		format_info_from_spec(&fi, &dev_info.ss, &dev_info.map);
3972 		message_put(m,
3973 			TAG_CVOLUME, &dev_info.volume_info.volume,
3974 			TAG_BOOLEAN, dev_info.volume_info.mute,	/* muted */
3975 			TAG_BOOLEAN, true,		/* has_volume */
3976 			TAG_BOOLEAN, true,		/* volume writable */
3977 			TAG_FORMAT_INFO, &fi,
3978 			TAG_INVALID);
3979 		format_info_clear(&fi);
3980 	}
3981 	return 0;
3982 }
3983 
do_get_info(struct client * client,uint32_t command,uint32_t tag,struct message * m)3984 static int do_get_info(struct client *client, uint32_t command, uint32_t tag, struct message *m)
3985 {
3986 	struct impl *impl = client->impl;
3987 	struct pw_manager *manager = client->manager;
3988 	struct message *reply = NULL;
3989 	int res;
3990 	struct pw_manager_object *o;
3991 	struct selector sel;
3992 	int (*fill_func) (struct client *client, struct message *m, struct pw_manager_object *o) = NULL;
3993 
3994 	spa_zero(sel);
3995 
3996 	if (message_get(m,
3997 			TAG_U32, &sel.id,
3998 			TAG_INVALID) < 0)
3999 		goto error_protocol;
4000 
4001 	reply = reply_new(client, tag);
4002 
4003 	if (command == COMMAND_GET_MODULE_INFO && (sel.id & MODULE_FLAG) != 0) {
4004 		struct module *module;
4005 		module = pw_map_lookup(&impl->modules, sel.id & INDEX_MASK);
4006 		if (module == NULL)
4007 			goto error_noentity;
4008 		fill_ext_module_info(client, reply, module);
4009 		return client_queue_message(client, reply);
4010 	}
4011 
4012 	switch (command) {
4013 	case COMMAND_GET_CLIENT_INFO:
4014 		sel.type = pw_manager_object_is_client;
4015 		fill_func = fill_client_info;
4016 		break;
4017 	case COMMAND_GET_MODULE_INFO:
4018 		sel.type = pw_manager_object_is_module;
4019 		fill_func = fill_module_info;
4020 		break;
4021 	case COMMAND_GET_CARD_INFO:
4022 		sel.type = pw_manager_object_is_card;
4023 		sel.key = PW_KEY_DEVICE_NAME;
4024 		fill_func = fill_card_info;
4025 		break;
4026 	case COMMAND_GET_SINK_INFO:
4027 		sel.type = pw_manager_object_is_sink;
4028 		sel.key = PW_KEY_NODE_NAME;
4029 		fill_func = fill_sink_info;
4030 		break;
4031 	case COMMAND_GET_SOURCE_INFO:
4032 		sel.type = pw_manager_object_is_source_or_monitor;
4033 		sel.key = PW_KEY_NODE_NAME;
4034 		fill_func = fill_source_info;
4035 		break;
4036 	case COMMAND_GET_SINK_INPUT_INFO:
4037 		sel.type = pw_manager_object_is_sink_input;
4038 		fill_func = fill_sink_input_info;
4039 		break;
4040 	case COMMAND_GET_SOURCE_OUTPUT_INFO:
4041 		sel.type = pw_manager_object_is_source_output;
4042 		fill_func = fill_source_output_info;
4043 		break;
4044 	}
4045 	if (sel.key) {
4046 		if (message_get(m,
4047 				TAG_STRING, &sel.value,
4048 				TAG_INVALID) < 0)
4049 			goto error_protocol;
4050 	}
4051 	if (fill_func == NULL)
4052 		goto error_invalid;
4053 
4054 	if (sel.id != SPA_ID_INVALID && sel.value != NULL)
4055 		goto error_invalid;
4056 
4057 	pw_log_info("[%s] %s tag:%u idx:%u name:%s", client->name,
4058 			commands[command].name, tag, sel.id, sel.value);
4059 
4060 	if (command == COMMAND_GET_SINK_INFO || command == COMMAND_GET_SOURCE_INFO) {
4061 		o = find_device(client, sel.id, sel.value,
4062 				command == COMMAND_GET_SINK_INFO, NULL);
4063 	} else {
4064 		if (sel.value == NULL && sel.id == SPA_ID_INVALID)
4065 			goto error_invalid;
4066 		o = select_object(manager, &sel);
4067 	}
4068 	if (o == NULL)
4069 		goto error_noentity;
4070 
4071 	if ((res = fill_func(client, reply, o)) < 0)
4072 		goto error;
4073 
4074 	return client_queue_message(client, reply);
4075 
4076 error_protocol:
4077 	res = -EPROTO;
4078 	goto error;
4079 error_noentity:
4080 	res = -ENOENT;
4081 	goto error;
4082 error_invalid:
4083 	res = -EINVAL;
4084 	goto error;
4085 error:
4086 	if (reply)
4087 		message_free(impl, reply, false, false);
4088 	return res;
4089 }
4090 
bytes_to_usec(uint64_t length,const struct sample_spec * ss)4091 static uint64_t bytes_to_usec(uint64_t length, const struct sample_spec *ss)
4092 {
4093 	uint64_t u;
4094 	uint64_t frame_size = sample_spec_frame_size(ss);
4095 	if (frame_size == 0)
4096 		return 0;
4097 	u = length / frame_size;
4098 	u *= SPA_USEC_PER_SEC;
4099 	u /= ss->rate;
4100 	return u;
4101 }
4102 
fill_sample_info(struct client * client,struct message * m,struct sample * sample)4103 static int fill_sample_info(struct client *client, struct message *m,
4104 		struct sample *sample)
4105 {
4106 	struct volume vol;
4107 
4108 	volume_make(&vol, sample->ss.channels);
4109 
4110 	message_put(m,
4111 		TAG_U32, sample->index,
4112 		TAG_STRING, sample->name,
4113 		TAG_CVOLUME, &vol,
4114 		TAG_USEC, bytes_to_usec(sample->length, &sample->ss),
4115 		TAG_SAMPLE_SPEC, &sample->ss,
4116 		TAG_CHANNEL_MAP, &sample->map,
4117 		TAG_U32, sample->length,
4118 		TAG_BOOLEAN, false,			/* lazy */
4119 		TAG_STRING, NULL,			/* filename */
4120 		TAG_INVALID);
4121 
4122 	if (client->version >= 13) {
4123 		message_put(m,
4124 			TAG_PROPLIST, &sample->props->dict,
4125 			TAG_INVALID);
4126 	}
4127 	return 0;
4128 }
4129 
do_get_sample_info(struct client * client,uint32_t command,uint32_t tag,struct message * m)4130 static int do_get_sample_info(struct client *client, uint32_t command, uint32_t tag, struct message *m)
4131 {
4132 	struct impl *impl = client->impl;
4133 	struct message *reply = NULL;
4134 	uint32_t id;
4135 	const char *name;
4136 	struct sample *sample;
4137 	int res;
4138 
4139 	if (message_get(m,
4140 			TAG_U32, &id,
4141 			TAG_STRING, &name,
4142 			TAG_INVALID) < 0)
4143 		return -EPROTO;
4144 
4145 	if ((id == SPA_ID_INVALID && name == NULL) ||
4146 	    (id != SPA_ID_INVALID && name != NULL))
4147 		return -EINVAL;
4148 
4149 	pw_log_info("[%s] %s tag:%u idx:%u name:%s", client->name,
4150 			commands[command].name, tag, id, name);
4151 
4152 	if ((sample = find_sample(impl, id, name)) == NULL)
4153 		return -ENOENT;
4154 
4155 	reply = reply_new(client, tag);
4156 	if ((res = fill_sample_info(client, reply, sample)) < 0)
4157 		goto error;
4158 
4159 	return client_queue_message(client, reply);
4160 
4161 error:
4162 	if (reply)
4163 		message_free(impl, reply, false, false);
4164 	return res;
4165 }
4166 
do_get_sample_info_list(struct client * client,uint32_t command,uint32_t tag,struct message * m)4167 static int do_get_sample_info_list(struct client *client, uint32_t command, uint32_t tag, struct message *m)
4168 {
4169 	struct impl *impl = client->impl;
4170 	struct message *reply;
4171 	union pw_map_item *item;
4172 
4173 	pw_log_info("[%s] %s tag:%u", client->name,
4174 			commands[command].name, tag);
4175 
4176 	reply = reply_new(client, tag);
4177 	pw_array_for_each(item, &impl->samples.items) {
4178 		struct sample *s = item->data;
4179 		if (pw_map_item_is_free(item))
4180 			continue;
4181 		fill_sample_info(client, reply, s);
4182 	}
4183 	return client_queue_message(client, reply);
4184 }
4185 
4186 struct info_list_data {
4187 	struct client *client;
4188 	struct message *reply;
4189 	int (*fill_func) (struct client *client, struct message *m, struct pw_manager_object *o);
4190 };
4191 
do_list_info(void * data,struct pw_manager_object * object)4192 static int do_list_info(void *data, struct pw_manager_object *object)
4193 {
4194 	struct info_list_data *info = data;
4195 	info->fill_func(info->client, info->reply, object);
4196 	return 0;
4197 }
4198 
do_info_list_module(void * item,void * data)4199 static int do_info_list_module(void *item, void *data)
4200 {
4201 	struct module *m = item;
4202 	struct info_list_data *info = data;
4203 	fill_ext_module_info(info->client, info->reply, m);
4204 	return 0;
4205 }
4206 
do_get_info_list(struct client * client,uint32_t command,uint32_t tag,struct message * m)4207 static int do_get_info_list(struct client *client, uint32_t command, uint32_t tag, struct message *m)
4208 {
4209 	struct impl *impl = client->impl;
4210 	struct pw_manager *manager = client->manager;
4211 	struct info_list_data info;
4212 
4213 	pw_log_info("[%s] %s tag:%u", client->name,
4214 			commands[command].name, tag);
4215 
4216 	spa_zero(info);
4217 	info.client = client;
4218 
4219 	switch (command) {
4220 	case COMMAND_GET_CLIENT_INFO_LIST:
4221 		info.fill_func = fill_client_info;
4222 		break;
4223 	case COMMAND_GET_MODULE_INFO_LIST:
4224 		info.fill_func = fill_module_info;
4225 		break;
4226 	case COMMAND_GET_CARD_INFO_LIST:
4227 		info.fill_func = fill_card_info;
4228 		break;
4229 	case COMMAND_GET_SINK_INFO_LIST:
4230 		info.fill_func = fill_sink_info;
4231 		break;
4232 	case COMMAND_GET_SOURCE_INFO_LIST:
4233 		info.fill_func = fill_source_info;
4234 		break;
4235 	case COMMAND_GET_SINK_INPUT_INFO_LIST:
4236 		info.fill_func = fill_sink_input_info;
4237 		break;
4238 	case COMMAND_GET_SOURCE_OUTPUT_INFO_LIST:
4239 		info.fill_func = fill_source_output_info;
4240 		break;
4241 	default:
4242 		return -ENOTSUP;
4243 	}
4244 
4245 	info.reply = reply_new(client, tag);
4246 	if (info.fill_func)
4247 		pw_manager_for_each_object(manager, do_list_info, &info);
4248 
4249 	if (command == COMMAND_GET_MODULE_INFO_LIST)
4250 		pw_map_for_each(&impl->modules, do_info_list_module, &info);
4251 
4252 	return client_queue_message(client, info.reply);
4253 }
4254 
do_set_stream_buffer_attr(struct client * client,uint32_t command,uint32_t tag,struct message * m)4255 static int do_set_stream_buffer_attr(struct client *client, uint32_t command, uint32_t tag, struct message *m)
4256 {
4257 	uint32_t channel;
4258 	struct stream *stream;
4259 	struct message *reply;
4260 	struct buffer_attr attr;
4261 	bool adjust_latency = false, early_requests = false;
4262 
4263 	if (message_get(m,
4264 			TAG_U32, &channel,
4265 			TAG_INVALID) < 0)
4266 		return -EPROTO;
4267 
4268 	pw_log_info("[%s] %s tag:%u channel:%u", client->name,
4269 			commands[command].name, tag, channel);
4270 
4271 	stream = pw_map_lookup(&client->streams, channel);
4272 	if (stream == NULL)
4273 		return -ENOENT;
4274 
4275 	if (command == COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR) {
4276 		if (stream->type != STREAM_TYPE_PLAYBACK)
4277 			return -ENOENT;
4278 
4279 		if (message_get(m,
4280 				TAG_U32, &attr.maxlength,
4281 				TAG_U32, &attr.tlength,
4282 				TAG_U32, &attr.prebuf,
4283 				TAG_U32, &attr.minreq,
4284 				TAG_INVALID) < 0)
4285 			return -EPROTO;
4286 	} else {
4287 		if (stream->type != STREAM_TYPE_RECORD)
4288 			return -ENOENT;
4289 
4290 		if (message_get(m,
4291 				TAG_U32, &attr.maxlength,
4292 				TAG_U32, &attr.fragsize,
4293 				TAG_INVALID) < 0)
4294 			return -EPROTO;
4295 	}
4296 	if (client->version >= 13) {
4297 		if (message_get(m,
4298 				TAG_BOOLEAN, &adjust_latency,
4299 				TAG_INVALID) < 0)
4300 			return -EPROTO;
4301 	}
4302 	if (client->version >= 14) {
4303 		if (message_get(m,
4304 				TAG_BOOLEAN, &early_requests,
4305 				TAG_INVALID) < 0)
4306 			return -EPROTO;
4307 	}
4308 
4309 	reply = reply_new(client, tag);
4310 
4311 	if (command == COMMAND_SET_PLAYBACK_STREAM_BUFFER_ATTR) {
4312 		message_put(reply,
4313 			TAG_U32, stream->attr.maxlength,
4314 			TAG_U32, stream->attr.tlength,
4315 			TAG_U32, stream->attr.prebuf,
4316 			TAG_U32, stream->attr.minreq,
4317 			TAG_INVALID);
4318 		if (client->version >= 13) {
4319 			message_put(reply,
4320 				TAG_USEC, 0LL,		/* configured_sink_latency */
4321 				TAG_INVALID);
4322 		}
4323 	} else {
4324 		message_put(reply,
4325 			TAG_U32, stream->attr.maxlength,
4326 			TAG_U32, stream->attr.fragsize,
4327 			TAG_INVALID);
4328 		if (client->version >= 13) {
4329 			message_put(reply,
4330 				TAG_USEC, 0LL,		/* configured_source_latency */
4331 				TAG_INVALID);
4332 		}
4333 	}
4334 	return client_queue_message(client, reply);
4335 }
4336 
do_update_stream_sample_rate(struct client * client,uint32_t command,uint32_t tag,struct message * m)4337 static int do_update_stream_sample_rate(struct client *client, uint32_t command, uint32_t tag, struct message *m)
4338 {
4339 	uint32_t channel, rate;
4340 	struct stream *stream;
4341 	bool match;
4342 
4343 	if (message_get(m,
4344 			TAG_U32, &channel,
4345 			TAG_U32, &rate,
4346 			TAG_INVALID) < 0)
4347 		return -EPROTO;
4348 
4349 	pw_log_info("[%s] %s tag:%u channel:%u rate:%u", client->name,
4350 			commands[command].name, tag, channel, rate);
4351 
4352 	stream = pw_map_lookup(&client->streams, channel);
4353 	if (stream == NULL || stream->type == STREAM_TYPE_UPLOAD)
4354 		return -ENOENT;
4355 
4356 	if (stream->rate_match == NULL)
4357 		return -ENOTSUP;
4358 
4359 	match = rate != stream->ss.rate;
4360 	stream->rate = rate;
4361 	stream->rate_match->rate = match ?
4362 			(double)rate/(double)stream->ss.rate : 1.0;
4363 	SPA_FLAG_UPDATE(stream->rate_match->flags,
4364 			SPA_IO_RATE_MATCH_FLAG_ACTIVE, match);
4365 
4366 	return reply_simple_ack(client, tag);
4367 }
4368 
do_extension(struct client * client,uint32_t command,uint32_t tag,struct message * m)4369 static int do_extension(struct client *client, uint32_t command, uint32_t tag, struct message *m)
4370 {
4371 	uint32_t idx;
4372 	const char *name;
4373 	const struct extension *ext;
4374 
4375 	if (message_get(m,
4376 			TAG_U32, &idx,
4377 			TAG_STRING, &name,
4378 			TAG_INVALID) < 0)
4379 		return -EPROTO;
4380 
4381 	pw_log_info("[%s] %s tag:%u id:%u name:%s", client->name,
4382 			commands[command].name, tag, idx, name);
4383 
4384 	if ((idx == SPA_ID_INVALID && name == NULL) ||
4385 	    (idx != SPA_ID_INVALID && name != NULL))
4386 		return -EINVAL;
4387 
4388 	ext = extension_find(idx, name);
4389 	if (ext == NULL)
4390 		return -ENOENT;
4391 
4392 	return ext->process(client, tag, m);
4393 }
4394 
do_set_profile(struct client * client,uint32_t command,uint32_t tag,struct message * m)4395 static int do_set_profile(struct client *client, uint32_t command, uint32_t tag, struct message *m)
4396 {
4397 	struct pw_manager *manager = client->manager;
4398 	struct pw_manager_object *o;
4399 	const char *profile_name;
4400 	uint32_t profile_id = SPA_ID_INVALID;
4401 	struct selector sel;
4402 	char buf[1024];
4403 	struct spa_pod_builder b = SPA_POD_BUILDER_INIT(buf, sizeof(buf));
4404 
4405 	spa_zero(sel);
4406 	sel.key = PW_KEY_DEVICE_NAME;
4407 	sel.type = pw_manager_object_is_card;
4408 
4409 	if (message_get(m,
4410 			TAG_U32, &sel.id,
4411 			TAG_STRING, &sel.value,
4412 			TAG_STRING, &profile_name,
4413 			TAG_INVALID) < 0)
4414 		return -EPROTO;
4415 
4416 	pw_log_info("[%s] %s tag:%u id:%u name:%s profile:%s", client->name,
4417 			commands[command].name, tag, sel.id, sel.value, profile_name);
4418 
4419 	if ((sel.id == SPA_ID_INVALID && sel.value == NULL) ||
4420 	    (sel.id != SPA_ID_INVALID && sel.value != NULL))
4421 		return -EINVAL;
4422 	if (profile_name == NULL)
4423 		return -EINVAL;
4424 
4425 	if ((o = select_object(manager, &sel)) == NULL)
4426 		return -ENOENT;
4427 
4428 	if ((profile_id = find_profile_id(o, profile_name)) == SPA_ID_INVALID)
4429 		return -ENOENT;
4430 
4431 	if (!SPA_FLAG_IS_SET(o->permissions, PW_PERM_W | PW_PERM_X))
4432 		return -EACCES;
4433 
4434 	if (o->proxy == NULL)
4435 		return -ENOENT;
4436 
4437 	pw_device_set_param((struct pw_device*)o->proxy,
4438 			SPA_PARAM_Profile, 0,
4439 			spa_pod_builder_add_object(&b,
4440 				SPA_TYPE_OBJECT_ParamProfile, SPA_PARAM_Profile,
4441 				SPA_PARAM_PROFILE_index, SPA_POD_Int(profile_id),
4442 				SPA_PARAM_PROFILE_save, SPA_POD_Bool(true)));
4443 
4444 	return operation_new(client, tag);
4445 }
4446 
do_set_default(struct client * client,uint32_t command,uint32_t tag,struct message * m)4447 static int do_set_default(struct client *client, uint32_t command, uint32_t tag, struct message *m)
4448 {
4449 	struct pw_manager *manager = client->manager;
4450 	struct pw_manager_object *o;
4451 	const char *name, *str;
4452 	int res;
4453 	bool sink = command == COMMAND_SET_DEFAULT_SINK;
4454 
4455 	if (message_get(m,
4456 			TAG_STRING, &name,
4457 			TAG_INVALID) < 0)
4458 		return -EPROTO;
4459 
4460 	pw_log_info("[%s] %s tag:%u name:%s", client->name,
4461 			commands[command].name, tag, name);
4462 
4463 	if (name != NULL && (o = find_device(client, SPA_ID_INVALID, name, sink, NULL)) == NULL)
4464 		return -ENOENT;
4465 
4466 	if (name != NULL) {
4467 		if (o->props && (str = pw_properties_get(o->props, PW_KEY_NODE_NAME)) != NULL)
4468 			name = str;
4469 		else if (spa_strendswith(name, ".monitor"))
4470 			name = strndupa(name, strlen(name)-8);
4471 
4472 		res = pw_manager_set_metadata(manager, client->metadata_default,
4473 				PW_ID_CORE,
4474 				sink ? METADATA_CONFIG_DEFAULT_SINK : METADATA_CONFIG_DEFAULT_SOURCE,
4475 				"Spa:String:JSON", "{ \"name\": \"%s\" }", name);
4476 	} else {
4477 		res = pw_manager_set_metadata(manager, client->metadata_default,
4478 				PW_ID_CORE,
4479 				sink ? METADATA_CONFIG_DEFAULT_SINK : METADATA_CONFIG_DEFAULT_SOURCE,
4480 				NULL, NULL);
4481 	}
4482 	if (res < 0)
4483 		return res;
4484 
4485 	return operation_new(client, tag);
4486 }
4487 
do_suspend(struct client * client,uint32_t command,uint32_t tag,struct message * m)4488 static int do_suspend(struct client *client, uint32_t command, uint32_t tag, struct message *m)
4489 {
4490 	struct pw_manager_object *o;
4491 	const char *name;
4492 	uint32_t id, cmd;
4493 	bool sink = command == COMMAND_SUSPEND_SINK, suspend;
4494 
4495 	if (message_get(m,
4496 			TAG_U32, &id,
4497 			TAG_STRING, &name,
4498 			TAG_BOOLEAN, &suspend,
4499 			TAG_INVALID) < 0)
4500 		return -EPROTO;
4501 
4502 	pw_log_info("[%s] %s tag:%u id:%u name:%s", client->name,
4503 			commands[command].name, tag, id, name);
4504 
4505 	if ((o = find_device(client, id, name, sink, NULL)) == NULL)
4506 		return -ENOENT;
4507 
4508 	if (o->proxy == NULL)
4509 		return -ENOENT;
4510 
4511 	if (suspend) {
4512 		cmd = SPA_NODE_COMMAND_Suspend;
4513 		pw_node_send_command((struct pw_node*)o->proxy, &SPA_NODE_COMMAND_INIT(cmd));
4514 	}
4515 	return operation_new(client, tag);
4516 }
4517 
do_move_stream(struct client * client,uint32_t command,uint32_t tag,struct message * m)4518 static int do_move_stream(struct client *client, uint32_t command, uint32_t tag, struct message *m)
4519 {
4520 	struct pw_manager *manager = client->manager;
4521 	struct pw_manager_object *o, *dev, *dev_default;
4522 	uint32_t id, id_device;
4523 	int target_id;
4524 	const char *name_device;
4525 	struct selector sel;
4526 	int res;
4527 	bool sink = command == COMMAND_MOVE_SINK_INPUT;
4528 
4529 	if (message_get(m,
4530 			TAG_U32, &id,
4531 			TAG_U32, &id_device,
4532 			TAG_STRING, &name_device,
4533 			TAG_INVALID) < 0)
4534 		return -EPROTO;
4535 
4536 	if ((id_device == SPA_ID_INVALID && name_device == NULL) ||
4537 	    (id_device != SPA_ID_INVALID && name_device != NULL))
4538 		return -EINVAL;
4539 
4540 	pw_log_info("[%s] %s tag:%u idx:%u device:%d name:%s", client->name,
4541 			commands[command].name, tag, id, id_device, name_device);
4542 
4543 	spa_zero(sel);
4544 	sel.id = id;
4545 	sel.type = sink ? pw_manager_object_is_sink_input: pw_manager_object_is_source_output;
4546 
4547 	o = select_object(manager, &sel);
4548 	if (o == NULL)
4549 		return -ENOENT;
4550 
4551 	if ((dev = find_device(client, id_device, name_device, sink, NULL)) == NULL)
4552 		return -ENOENT;
4553 
4554 	dev_default = find_device(client, SPA_ID_INVALID, NULL, sink, NULL);
4555 	if (dev == dev_default) {
4556 		/*
4557 		 * When moving streams to a node that is equal to the default,
4558 		 * Pulseaudio understands this to mean '... and unset preferred sink/source',
4559 		 * forgetting target.node. Follow that behavior here.
4560 		 */
4561 		target_id = -1;
4562 	} else {
4563 		target_id = dev->id;
4564 	}
4565 
4566 	if ((res = pw_manager_set_metadata(manager, client->metadata_default,
4567 			o->id,
4568 			METADATA_TARGET_NODE,
4569 			SPA_TYPE_INFO_BASE"Id", "%d", target_id)) < 0)
4570 		return res;
4571 
4572 	return reply_simple_ack(client, tag);
4573 }
4574 
do_kill(struct client * client,uint32_t command,uint32_t tag,struct message * m)4575 static int do_kill(struct client *client, uint32_t command, uint32_t tag, struct message *m)
4576 {
4577 	struct pw_manager *manager = client->manager;
4578 	struct pw_manager_object *o;
4579 	uint32_t id;
4580 	struct selector sel;
4581 
4582 	if (message_get(m,
4583 			TAG_U32, &id,
4584 			TAG_INVALID) < 0)
4585 		return -EPROTO;
4586 
4587 	pw_log_info("[%s] %s tag:%u id:%u", client->name,
4588 			commands[command].name, tag, id);
4589 
4590 	spa_zero(sel);
4591 	sel.id = id;
4592 	switch (command) {
4593 	case COMMAND_KILL_CLIENT:
4594 		sel.type = pw_manager_object_is_client;
4595 		break;
4596 	case COMMAND_KILL_SINK_INPUT:
4597 		sel.type = pw_manager_object_is_sink_input;
4598 		break;
4599 	case COMMAND_KILL_SOURCE_OUTPUT:
4600 		sel.type = pw_manager_object_is_source_output;
4601 		break;
4602 	default:
4603 		return -EINVAL;
4604 	}
4605 
4606 	if ((o = select_object(manager, &sel)) == NULL)
4607 		return -ENOENT;
4608 
4609 	pw_registry_destroy(manager->registry, o->id);
4610 
4611 	return reply_simple_ack(client, tag);
4612 }
4613 
handle_module_loaded(struct module * module,struct client * client,uint32_t tag,int result)4614 static void handle_module_loaded(struct module *module, struct client *client, uint32_t tag, int result)
4615 {
4616 	const char *client_name = client != NULL ? client->name : "?";
4617 	struct impl *impl = module->impl;
4618 
4619 	spa_assert(!SPA_RESULT_IS_ASYNC(result));
4620 
4621 	if (SPA_RESULT_IS_OK(result)) {
4622 		pw_log_info("[%s] loaded module id:%u name:%s",
4623 				client_name, module->idx, module->name);
4624 
4625 		module->loaded = true;
4626 
4627 		broadcast_subscribe_event(impl,
4628 			SUBSCRIPTION_MASK_MODULE,
4629 			SUBSCRIPTION_EVENT_NEW | SUBSCRIPTION_EVENT_MODULE,
4630 			module->idx);
4631 
4632 		if (client != NULL) {
4633 			struct message *reply = reply_new(client, tag);
4634 
4635 			message_put(reply,
4636 				TAG_U32, module->idx,
4637 				TAG_INVALID);
4638 			client_queue_message(client, reply);
4639 		}
4640 	}
4641 	else {
4642 		pw_log_warn("%p: [%s] failed to load module id:%u name:%s result:%d (%s)",
4643 				impl, client_name,
4644 				module->idx, module->name,
4645 				result, spa_strerror(result));
4646 
4647 		module_schedule_unload(module);
4648 
4649 		if (client != NULL)
4650 			reply_error(client, COMMAND_LOAD_MODULE, tag, result);
4651 	}
4652 }
4653 
4654 struct pending_module {
4655 	struct client *client;
4656 	struct spa_hook client_listener;
4657 
4658 	struct module *module;
4659 	struct spa_hook module_listener;
4660 
4661 	uint32_t tag;
4662 };
4663 
on_module_loaded(void * data,int result)4664 static void on_module_loaded(void *data, int result)
4665 {
4666 	struct pending_module *pm = data;
4667 
4668 	if (pm->client != NULL)
4669 		spa_hook_remove(&pm->client_listener);
4670 
4671 	spa_hook_remove(&pm->module_listener);
4672 
4673 	handle_module_loaded(pm->module, pm->client, pm->tag, result);
4674 
4675 	free(pm);
4676 }
4677 
on_module_destroy(void * data)4678 static void on_module_destroy(void *data)
4679 {
4680 	on_module_loaded(data, -ECANCELED);
4681 }
4682 
on_client_disconnect(void * data)4683 static void on_client_disconnect(void *data)
4684 {
4685 	struct pending_module *pm = data;
4686 
4687 	spa_hook_remove(&pm->client_listener);
4688 	pm->client = NULL;
4689 }
4690 
do_load_module(struct client * client,uint32_t command,uint32_t tag,struct message * m)4691 static int do_load_module(struct client *client, uint32_t command, uint32_t tag, struct message *m)
4692 {
4693 	static const struct module_events module_events = {
4694 		VERSION_MODULE_EVENTS,
4695 		.loaded = on_module_loaded,
4696 		.destroy = on_module_destroy,
4697 	};
4698 	static const struct client_events client_events = {
4699 		VERSION_CLIENT_EVENTS,
4700 		.disconnect = on_client_disconnect,
4701 	};
4702 
4703 	const char *name, *argument;
4704 	struct module *module;
4705 	int r;
4706 
4707 	if (message_get(m,
4708 			TAG_STRING, &name,
4709 			TAG_STRING, &argument,
4710 			TAG_INVALID) < 0)
4711 		return -EPROTO;
4712 
4713 	pw_log_info("[%s] %s name:%s argument:%s",
4714 			client->name, commands[command].name, name, argument);
4715 
4716 	module = module_create(client, name, argument);
4717 	if (module == NULL)
4718 		return -errno;
4719 
4720 	r = module_load(client, module);
4721 	if (SPA_RESULT_IS_ASYNC(r)) {
4722 		struct pending_module *pm = calloc(1, sizeof(*pm));
4723 		if (pm == NULL)
4724 			return -errno;
4725 
4726 		pm->tag = tag;
4727 		pm->client = client;
4728 		pm->module = module;
4729 
4730 		module_add_listener(module, &pm->module_listener, &module_events, pm);
4731 		client_add_listener(client, &pm->client_listener, &client_events, pm);
4732 	} else {
4733 		handle_module_loaded(module, client, tag, r);
4734 	}
4735 
4736 	/*
4737 	 * return 0 to prevent `handle_packet()` from sending a reply
4738 	 * because we want `handle_module_loaded()` to send the reply
4739 	 */
4740 	return 0;
4741 }
4742 
do_unload_module(struct client * client,uint32_t command,uint32_t tag,struct message * m)4743 static int do_unload_module(struct client *client, uint32_t command, uint32_t tag, struct message *m)
4744 {
4745 	struct impl *impl = client->impl;
4746 	struct module *module;
4747 	uint32_t module_idx;
4748 
4749 	if (message_get(m,
4750 			TAG_U32, &module_idx,
4751 			TAG_INVALID) < 0)
4752 		return -EPROTO;
4753 
4754 	pw_log_info("[%s] %s tag:%u id:%u", client->name,
4755 			commands[command].name, tag, module_idx);
4756 
4757 	if (module_idx == SPA_ID_INVALID)
4758 		return -EINVAL;
4759 	if ((module_idx & MODULE_FLAG) == 0)
4760 		return -EPERM;
4761 
4762 	module = pw_map_lookup(&impl->modules, module_idx & INDEX_MASK);
4763 	if (module == NULL)
4764 		return -ENOENT;
4765 
4766 	module_unload(client, module);
4767 
4768 	return reply_simple_ack(client, tag);
4769 }
4770 
do_send_object_message(struct client * client,uint32_t command,uint32_t tag,struct message * m)4771 static int do_send_object_message(struct client *client, uint32_t command, uint32_t tag, struct message *m)
4772 {
4773 	struct impl *impl = client->impl;
4774 	struct pw_manager *manager = client->manager;
4775 	const char *object_path = NULL;
4776 	const char *message = NULL;
4777 	const char *params = NULL;
4778 	char *response = NULL;
4779 	char *path = NULL;
4780 	struct message *reply;
4781 	struct pw_manager_object *o;
4782 	int len = 0;
4783 	int res;
4784 
4785 	if (message_get(m,
4786 			TAG_STRING, &object_path,
4787 			TAG_STRING, &message,
4788 			TAG_STRING, &params,
4789 			TAG_INVALID) < 0)
4790 		return -EPROTO;
4791 
4792 	pw_log_info("[%s] %s tag:%u object_path:'%s' message:'%s' params:'%s'",
4793 			client->name, commands[command].name, tag, object_path,
4794 			message, params ? params : "<null>");
4795 
4796 	if (object_path == NULL || message == NULL)
4797 		return -EINVAL;
4798 
4799 	len = strlen(object_path);
4800 	if (len > 0 && object_path[len - 1] == '/')
4801 		--len;
4802 	path = strndup(object_path, len);
4803 	if (path == NULL)
4804 		return -ENOMEM;
4805 
4806 	res = -ENOENT;
4807 
4808 	spa_list_for_each(o, &manager->object_list, link) {
4809 		if (o->message_object_path && spa_streq(o->message_object_path, path)) {
4810 			if (o->message_handler)
4811 				res = o->message_handler(manager, o, message, params, &response);
4812 			else
4813 				res = -ENOSYS;
4814 			break;
4815 		}
4816 	}
4817 
4818 	free(path);
4819 	if (res < 0)
4820 		return res;
4821 
4822 	pw_log_debug("%p: object message response:'%s'", impl, response ? response : "<null>");
4823 
4824 	reply = reply_new(client, tag);
4825 	message_put(reply, TAG_STRING, response, TAG_INVALID);
4826 	free(response);
4827 	return client_queue_message(client, reply);
4828 }
4829 
do_error_access(struct client * client,uint32_t command,uint32_t tag,struct message * m)4830 static int do_error_access(struct client *client, uint32_t command, uint32_t tag, struct message *m)
4831 {
4832 	return -EACCES;
4833 }
4834 
do_error_not_implemented(struct client * client,uint32_t command,uint32_t tag,struct message * m)4835 static SPA_UNUSED int do_error_not_implemented(struct client *client, uint32_t command, uint32_t tag, struct message *m)
4836 {
4837 	return -ENOSYS;
4838 }
4839 
4840 #define COMMAND(name, ...) [COMMAND_ ## name] = { #name, __VA_ARGS__ }
4841 const struct command commands[COMMAND_MAX] =
4842 {
4843 	COMMAND(ERROR),
4844 	COMMAND(TIMEOUT), /* pseudo command */
4845 	COMMAND(REPLY),
4846 
4847 	/* CLIENT->SERVER */
4848 	COMMAND(CREATE_PLAYBACK_STREAM, do_create_playback_stream),
4849 	COMMAND(DELETE_PLAYBACK_STREAM, do_delete_stream),
4850 	COMMAND(CREATE_RECORD_STREAM, do_create_record_stream),
4851 	COMMAND(DELETE_RECORD_STREAM, do_delete_stream),
4852 	COMMAND(EXIT, do_error_access),
4853 	COMMAND(AUTH, do_command_auth, COMMAND_ACCESS_WITHOUT_AUTH | COMMAND_ACCESS_WITHOUT_MANAGER),
4854 	COMMAND(SET_CLIENT_NAME, do_set_client_name, COMMAND_ACCESS_WITHOUT_MANAGER),
4855 	COMMAND(LOOKUP_SINK, do_lookup),
4856 	COMMAND(LOOKUP_SOURCE, do_lookup),
4857 	COMMAND(DRAIN_PLAYBACK_STREAM, do_drain_stream),
4858 	COMMAND(STAT, do_stat, COMMAND_ACCESS_WITHOUT_MANAGER),
4859 	COMMAND(GET_PLAYBACK_LATENCY, do_get_playback_latency),
4860 	COMMAND(CREATE_UPLOAD_STREAM, do_create_upload_stream),
4861 	COMMAND(DELETE_UPLOAD_STREAM, do_delete_stream),
4862 	COMMAND(FINISH_UPLOAD_STREAM, do_finish_upload_stream),
4863 	COMMAND(PLAY_SAMPLE, do_play_sample),
4864 	COMMAND(REMOVE_SAMPLE, do_remove_sample),
4865 
4866 	COMMAND(GET_SERVER_INFO, do_get_server_info, COMMAND_ACCESS_WITHOUT_MANAGER),
4867 	COMMAND(GET_SINK_INFO, do_get_info),
4868 	COMMAND(GET_SOURCE_INFO, do_get_info),
4869 	COMMAND(GET_MODULE_INFO, do_get_info),
4870 	COMMAND(GET_CLIENT_INFO, do_get_info),
4871 	COMMAND(GET_SINK_INPUT_INFO, do_get_info),
4872 	COMMAND(GET_SOURCE_OUTPUT_INFO, do_get_info),
4873 	COMMAND(GET_SAMPLE_INFO, do_get_sample_info),
4874 	COMMAND(GET_CARD_INFO, do_get_info),
4875 	COMMAND(SUBSCRIBE, do_subscribe),
4876 
4877 	COMMAND(GET_SINK_INFO_LIST, do_get_info_list),
4878 	COMMAND(GET_SOURCE_INFO_LIST, do_get_info_list),
4879 	COMMAND(GET_MODULE_INFO_LIST, do_get_info_list),
4880 	COMMAND(GET_CLIENT_INFO_LIST, do_get_info_list),
4881 	COMMAND(GET_SINK_INPUT_INFO_LIST, do_get_info_list),
4882 	COMMAND(GET_SOURCE_OUTPUT_INFO_LIST, do_get_info_list),
4883 	COMMAND(GET_SAMPLE_INFO_LIST, do_get_sample_info_list),
4884 	COMMAND(GET_CARD_INFO_LIST, do_get_info_list),
4885 
4886 	COMMAND(SET_SINK_VOLUME, do_set_volume),
4887 	COMMAND(SET_SINK_INPUT_VOLUME, do_set_stream_volume),
4888 	COMMAND(SET_SOURCE_VOLUME, do_set_volume),
4889 
4890 	COMMAND(SET_SINK_MUTE, do_set_mute),
4891 	COMMAND(SET_SOURCE_MUTE, do_set_mute),
4892 
4893 	COMMAND(CORK_PLAYBACK_STREAM, do_cork_stream),
4894 	COMMAND(FLUSH_PLAYBACK_STREAM, do_flush_trigger_prebuf_stream),
4895 	COMMAND(TRIGGER_PLAYBACK_STREAM, do_flush_trigger_prebuf_stream),
4896 	COMMAND(PREBUF_PLAYBACK_STREAM, do_flush_trigger_prebuf_stream),
4897 
4898 	COMMAND(SET_DEFAULT_SINK, do_set_default),
4899 	COMMAND(SET_DEFAULT_SOURCE, do_set_default),
4900 
4901 	COMMAND(SET_PLAYBACK_STREAM_NAME, do_set_stream_name),
4902 	COMMAND(SET_RECORD_STREAM_NAME, do_set_stream_name),
4903 
4904 	COMMAND(KILL_CLIENT, do_kill),
4905 	COMMAND(KILL_SINK_INPUT, do_kill),
4906 	COMMAND(KILL_SOURCE_OUTPUT, do_kill),
4907 
4908 	COMMAND(LOAD_MODULE, do_load_module),
4909 	COMMAND(UNLOAD_MODULE, do_unload_module),
4910 
4911 	/* Obsolete */
4912 	COMMAND(ADD_AUTOLOAD___OBSOLETE, do_error_access),
4913 	COMMAND(REMOVE_AUTOLOAD___OBSOLETE, do_error_access),
4914 	COMMAND(GET_AUTOLOAD_INFO___OBSOLETE, do_error_access),
4915 	COMMAND(GET_AUTOLOAD_INFO_LIST___OBSOLETE, do_error_access),
4916 
4917 	COMMAND(GET_RECORD_LATENCY, do_get_record_latency),
4918 	COMMAND(CORK_RECORD_STREAM, do_cork_stream),
4919 	COMMAND(FLUSH_RECORD_STREAM, do_flush_trigger_prebuf_stream),
4920 
4921 	/* SERVER->CLIENT */
4922 	COMMAND(REQUEST),
4923 	COMMAND(OVERFLOW),
4924 	COMMAND(UNDERFLOW),
4925 	COMMAND(PLAYBACK_STREAM_KILLED),
4926 	COMMAND(RECORD_STREAM_KILLED),
4927 	COMMAND(SUBSCRIBE_EVENT),
4928 
4929 	/* A few more client->server commands */
4930 
4931 	/* Supported since protocol v10 (0.9.5) */
4932 	COMMAND(MOVE_SINK_INPUT, do_move_stream),
4933 	COMMAND(MOVE_SOURCE_OUTPUT, do_move_stream),
4934 
4935 	/* Supported since protocol v11 (0.9.7) */
4936 	COMMAND(SET_SINK_INPUT_MUTE, do_set_stream_mute),
4937 
4938 	COMMAND(SUSPEND_SINK, do_suspend),
4939 	COMMAND(SUSPEND_SOURCE, do_suspend),
4940 
4941 	/* Supported since protocol v12 (0.9.8) */
4942 	COMMAND(SET_PLAYBACK_STREAM_BUFFER_ATTR, do_set_stream_buffer_attr),
4943 	COMMAND(SET_RECORD_STREAM_BUFFER_ATTR, do_set_stream_buffer_attr),
4944 
4945 	COMMAND(UPDATE_PLAYBACK_STREAM_SAMPLE_RATE, do_update_stream_sample_rate),
4946 	COMMAND(UPDATE_RECORD_STREAM_SAMPLE_RATE, do_update_stream_sample_rate),
4947 
4948 	/* SERVER->CLIENT */
4949 	COMMAND(PLAYBACK_STREAM_SUSPENDED),
4950 	COMMAND(RECORD_STREAM_SUSPENDED),
4951 	COMMAND(PLAYBACK_STREAM_MOVED),
4952 	COMMAND(RECORD_STREAM_MOVED),
4953 
4954 	/* Supported since protocol v13 (0.9.11) */
4955 	COMMAND(UPDATE_RECORD_STREAM_PROPLIST, do_update_proplist),
4956 	COMMAND(UPDATE_PLAYBACK_STREAM_PROPLIST, do_update_proplist),
4957 	COMMAND(UPDATE_CLIENT_PROPLIST, do_update_proplist),
4958 
4959 	COMMAND(REMOVE_RECORD_STREAM_PROPLIST, do_remove_proplist),
4960 	COMMAND(REMOVE_PLAYBACK_STREAM_PROPLIST, do_remove_proplist),
4961 	COMMAND(REMOVE_CLIENT_PROPLIST, do_remove_proplist),
4962 
4963 	/* SERVER->CLIENT */
4964 	COMMAND(STARTED),
4965 
4966 	/* Supported since protocol v14 (0.9.12) */
4967 	COMMAND(EXTENSION, do_extension),
4968 	/* Supported since protocol v15 (0.9.15) */
4969 	COMMAND(SET_CARD_PROFILE, do_set_profile),
4970 
4971 	/* SERVER->CLIENT */
4972 	COMMAND(CLIENT_EVENT),
4973 	COMMAND(PLAYBACK_STREAM_EVENT),
4974 	COMMAND(RECORD_STREAM_EVENT),
4975 
4976 	/* SERVER->CLIENT */
4977 	COMMAND(PLAYBACK_BUFFER_ATTR_CHANGED),
4978 	COMMAND(RECORD_BUFFER_ATTR_CHANGED),
4979 
4980 	/* Supported since protocol v16 (0.9.16) */
4981 	COMMAND(SET_SINK_PORT, do_set_port),
4982 	COMMAND(SET_SOURCE_PORT, do_set_port),
4983 
4984 	/* Supported since protocol v22 (1.0) */
4985 	COMMAND(SET_SOURCE_OUTPUT_VOLUME,  do_set_stream_volume),
4986 	COMMAND(SET_SOURCE_OUTPUT_MUTE,  do_set_stream_mute),
4987 
4988 	/* Supported since protocol v27 (3.0) */
4989 	COMMAND(SET_PORT_LATENCY_OFFSET, do_set_port_latency_offset),
4990 
4991 	/* Supported since protocol v30 (6.0) */
4992 	/* BOTH DIRECTIONS */
4993 	COMMAND(ENABLE_SRBCHANNEL, do_error_access),
4994 	COMMAND(DISABLE_SRBCHANNEL, do_error_access),
4995 
4996 	/* Supported since protocol v31 (9.0)
4997 	 * BOTH DIRECTIONS */
4998 	COMMAND(REGISTER_MEMFD_SHMID, do_error_access),
4999 
5000 	/* Supported since protocol v35 (15.0) */
5001 	COMMAND(SEND_OBJECT_MESSAGE, do_send_object_message),
5002 };
5003 #undef COMMAND
5004 
impl_free_sample(void * item,void * data)5005 static int impl_free_sample(void *item, void *data)
5006 {
5007 	struct sample *s = item;
5008 	sample_free(s);
5009 	return 0;
5010 }
5011 
impl_free_module(void * item,void * data)5012 static int impl_free_module(void *item, void *data)
5013 {
5014 	struct module *m = item;
5015 	module_free(m);
5016 	return 0;
5017 }
5018 
impl_free(struct impl * impl)5019 static void impl_free(struct impl *impl)
5020 {
5021 	struct server *s;
5022 	struct client *c;
5023 	struct message *msg;
5024 
5025 #if HAVE_DBUS
5026 	if (impl->dbus_name)
5027 		dbus_release_name(impl->dbus_name);
5028 #endif
5029 
5030 	spa_list_consume(msg, &impl->free_messages, link)
5031 		message_free(impl, msg, true, true);
5032 
5033 	if (impl->context != NULL)
5034 		spa_hook_remove(&impl->context_listener);
5035 	spa_list_consume(c, &impl->cleanup_clients, link)
5036 		client_free(c);
5037 	spa_list_consume(s, &impl->servers, link)
5038 		server_free(s);
5039 
5040 	pw_map_for_each(&impl->samples, impl_free_sample, impl);
5041 	pw_map_clear(&impl->samples);
5042 	pw_map_for_each(&impl->modules, impl_free_module, impl);
5043 	pw_map_clear(&impl->modules);
5044 
5045 	pw_properties_free(impl->props);
5046 	free(impl);
5047 }
5048 
context_destroy(void * data)5049 static void context_destroy(void *data)
5050 {
5051 	struct impl *impl = data;
5052 	struct server *s;
5053 	spa_list_consume(s, &impl->servers, link)
5054 		server_free(s);
5055 	spa_hook_remove(&impl->context_listener);
5056 	impl->context = NULL;
5057 }
5058 
5059 static const struct pw_context_events context_events = {
5060 	PW_VERSION_CONTEXT_EVENTS,
5061 	.destroy = context_destroy,
5062 };
5063 
parse_frac(struct pw_properties * props,const char * key,const char * def,struct spa_fraction * res)5064 static int parse_frac(struct pw_properties *props, const char *key, const char *def,
5065 		struct spa_fraction *res)
5066 {
5067 	const char *str;
5068 	if (props == NULL ||
5069 	    (str = pw_properties_get(props, key)) == NULL)
5070 		str = def;
5071 	if (sscanf(str, "%u/%u", &res->num, &res->denom) != 2 || res->denom == 0)
5072 		return -EINVAL;
5073 	pw_log_info(": defaults: %s = %u/%u", key, res->num, res->denom);
5074 	return 0;
5075 }
5076 
parse_position(struct pw_properties * props,const char * key,const char * def,struct channel_map * res)5077 static int parse_position(struct pw_properties *props, const char *key, const char *def,
5078 		struct channel_map *res)
5079 {
5080 	const char *str;
5081 	struct spa_json it[2];
5082 	char v[256];
5083 
5084 	if (props == NULL ||
5085 	    (str = pw_properties_get(props, key)) == NULL)
5086 		str = def;
5087 
5088 	spa_json_init(&it[0], str, strlen(str));
5089 	if (spa_json_enter_array(&it[0], &it[1]) <= 0)
5090 		spa_json_init(&it[1], str, strlen(str));
5091 
5092 	res->channels = 0;
5093 	while (spa_json_get_string(&it[1], v, sizeof(v)) > 0 &&
5094 	    res->channels < SPA_AUDIO_MAX_CHANNELS) {
5095 		res->map[res->channels++] = channel_name2id(v);
5096 	}
5097 	pw_log_info(": defaults: %s = %s", key, str);
5098 	return 0;
5099 }
parse_format(struct pw_properties * props,const char * key,const char * def,struct sample_spec * res)5100 static int parse_format(struct pw_properties *props, const char *key, const char *def,
5101 		struct sample_spec *res)
5102 {
5103 	const char *str;
5104 	if (props == NULL ||
5105 	    (str = pw_properties_get(props, key)) == NULL)
5106 		str = def;
5107 	res->format = format_name2id(str);
5108 	if (res->format == SPA_AUDIO_FORMAT_UNKNOWN) {
5109 		pw_log_warn(": unknown format %s, default to %s", str, def);
5110 		res->format = format_name2id(def);
5111 	}
5112 	pw_log_info(": defaults: %s = %s", key, str);
5113 	return 0;
5114 }
5115 
load_defaults(struct defs * def,struct pw_properties * props)5116 static void load_defaults(struct defs *def, struct pw_properties *props)
5117 {
5118 	parse_frac(props, "pulse.min.req", DEFAULT_MIN_REQ, &def->min_req);
5119 	parse_frac(props, "pulse.default.req", DEFAULT_DEFAULT_REQ, &def->default_req);
5120 	parse_frac(props, "pulse.min.frag", DEFAULT_MIN_FRAG, &def->min_frag);
5121 	parse_frac(props, "pulse.default.frag", DEFAULT_DEFAULT_FRAG, &def->default_frag);
5122 	parse_frac(props, "pulse.default.tlength", DEFAULT_DEFAULT_TLENGTH, &def->default_tlength);
5123 	parse_frac(props, "pulse.min.quantum", DEFAULT_MIN_QUANTUM, &def->min_quantum);
5124 	parse_format(props, "pulse.default.format", DEFAULT_FORMAT, &def->sample_spec);
5125 	parse_position(props, "pulse.default.position", DEFAULT_POSITION, &def->channel_map);
5126 	def->sample_spec.channels = def->channel_map.channels;
5127 	def->max_quantum = 8192;
5128 }
5129 
pw_protocol_pulse_new(struct pw_context * context,struct pw_properties * props,size_t user_data_size)5130 struct pw_protocol_pulse *pw_protocol_pulse_new(struct pw_context *context,
5131 		struct pw_properties *props, size_t user_data_size)
5132 {
5133 	const struct spa_support *support;
5134 	struct spa_cpu *cpu;
5135 	uint32_t n_support;
5136 	struct impl *impl;
5137 	const char *str;
5138 	int res = 0;
5139 
5140 	impl = calloc(1, sizeof(*impl) + user_data_size);
5141 	if (impl == NULL)
5142 		goto error_exit;
5143 
5144 	if (props == NULL)
5145 		props = pw_properties_new(NULL, NULL);
5146 	if (props == NULL)
5147 		goto error_free;
5148 
5149 	support = pw_context_get_support(context, &n_support);
5150 	cpu = spa_support_find(support, n_support, SPA_TYPE_INTERFACE_CPU);
5151 
5152 	if ((str = pw_properties_get(props, "vm.overrides")) != NULL) {
5153 		if (cpu != NULL && spa_cpu_get_vm_type(cpu) != SPA_CPU_VM_NONE)
5154 			pw_properties_update_string(props, str, strlen(str));
5155 		pw_properties_set(props, "vm.overrides", NULL);
5156 	}
5157 
5158 	load_defaults(&impl->defs, props);
5159 
5160 	debug_messages = pw_debug_is_category_enabled("connection");
5161 
5162 	impl->context = context;
5163 	impl->loop = pw_context_get_main_loop(context);
5164 	impl->props = props;
5165 
5166 	impl->work_queue = pw_context_get_work_queue(context);
5167 	if (impl->work_queue == NULL)
5168 		goto error_free;
5169 
5170 	spa_list_init(&impl->servers);
5171 	impl->rate_limit.interval = 2 * SPA_NSEC_PER_SEC;
5172 	impl->rate_limit.burst = 1;
5173 	pw_map_init(&impl->samples, 16, 16);
5174 	pw_map_init(&impl->modules, 16, 16);
5175 	spa_list_init(&impl->cleanup_clients);
5176 	spa_list_init(&impl->free_messages);
5177 
5178 	str = pw_properties_get(props, "server.address");
5179 	if (str == NULL) {
5180 		pw_properties_setf(props, "server.address",
5181 				"[ \"%s-%s\" ]",
5182 				PW_PROTOCOL_PULSE_DEFAULT_SERVER,
5183 				get_server_name(context));
5184 		str = pw_properties_get(props, "server.address");
5185 	}
5186 
5187 	if (str == NULL)
5188 		goto error_free;
5189 
5190 	if ((res = servers_create_and_start(impl, str, NULL)) < 0) {
5191 		pw_log_error("%p: no servers could be started: %s",
5192 				impl, spa_strerror(res));
5193 		goto error_free;
5194 	}
5195 
5196 	if ((res = create_pid_file()) < 0) {
5197 		pw_log_warn("%p: can't create pid file: %s",
5198 				impl, spa_strerror(res));
5199 	}
5200 	pw_context_add_listener(context, &impl->context_listener,
5201 			&context_events, impl);
5202 
5203 #if HAVE_DBUS
5204 	impl->dbus_name = dbus_request_name(context, "org.pulseaudio.Server");
5205 #endif
5206 
5207 	return (struct pw_protocol_pulse *) impl;
5208 
5209 error_free:
5210 	free(impl);
5211 
5212 error_exit:
5213 	pw_properties_free(props);
5214 
5215 	if (res < 0)
5216 		errno = -res;
5217 
5218 	return NULL;
5219 }
5220 
pw_protocol_pulse_get_user_data(struct pw_protocol_pulse * pulse)5221 void *pw_protocol_pulse_get_user_data(struct pw_protocol_pulse *pulse)
5222 {
5223 	return SPA_PTROFF(pulse, sizeof(struct impl), void);
5224 }
5225 
pw_protocol_pulse_destroy(struct pw_protocol_pulse * pulse)5226 void pw_protocol_pulse_destroy(struct pw_protocol_pulse *pulse)
5227 {
5228 	struct impl *impl = (struct impl*)pulse;
5229 	impl_free(impl);
5230 }
5231