1 /* PipeWire
2  *
3  * Copyright © 2020 Wim Taymans
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining a
6  * copy of this software and associated documentation files (the "Software"),
7  * to deal in the Software without restriction, including without limitation
8  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9  * and/or sell copies of the Software, and to permit persons to whom the
10  * Software is furnished to do so, subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice (including the next
13  * paragraph) shall be included in all copies or substantial portions of the
14  * Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.  IN NO EVENT SHALL
19  * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
22  * DEALINGS IN THE SOFTWARE.
23  */
24 
25 #include <stdbool.h>
26 #include <stdint.h>
27 #include <stdlib.h>
28 #include <arpa/inet.h>
29 #include <sys/socket.h>
30 
31 #include <spa/utils/defs.h>
32 #include <spa/utils/hook.h>
33 #include <spa/utils/list.h>
34 #include <pipewire/core.h>
35 #include <pipewire/log.h>
36 #include <pipewire/loop.h>
37 #include <pipewire/map.h>
38 #include <pipewire/properties.h>
39 
40 #include "client.h"
41 #include "commands.h"
42 #include "defs.h"
43 #include "internal.h"
44 #include "log.h"
45 #include "manager.h"
46 #include "message.h"
47 #include "operation.h"
48 #include "pending-sample.h"
49 #include "server.h"
50 #include "stream.h"
51 
52 #define client_emit_disconnect(c) spa_hook_list_call(&(c)->listener_list, struct client_events, disconnect, 0)
53 
client_new(struct server * server)54 struct client *client_new(struct server *server)
55 {
56 	struct client *client = calloc(1, sizeof(*client));
57 	if (client == NULL)
58 		return NULL;
59 
60 	client->ref = 1;
61 	client->server = server;
62 	client->impl = server->impl;
63 	client->connect_tag = SPA_ID_INVALID;
64 
65 	pw_map_init(&client->streams, 16, 16);
66 	spa_list_init(&client->out_messages);
67 	spa_list_init(&client->operations);
68 	spa_list_init(&client->pending_samples);
69 	spa_list_init(&client->pending_streams);
70 	spa_hook_list_init(&client->listener_list);
71 
72 	spa_list_append(&server->clients, &client->link);
73 	server->n_clients++;
74 
75 	return client;
76 }
77 
client_free_stream(void * item,void * data)78 static int client_free_stream(void *item, void *data)
79 {
80 	struct stream *s = item;
81 
82 	stream_free(s);
83 	return 0;
84 }
85 
86 /*
87  * tries to detach the client from the server,
88  * but it does not drop the server's reference
89  */
client_detach(struct client * client)90 bool client_detach(struct client *client)
91 {
92 	struct impl *impl = client->impl;
93 	struct server *server = client->server;
94 
95 	if (server == NULL)
96 		return false;
97 
98 	pw_log_debug("client %p: detaching from server %p", client, server);
99 
100 	/* remove from the `server->clients` list */
101 	spa_list_remove(&client->link);
102 
103 	server->n_clients--;
104 	if (server->wait_clients > 0 && --server->wait_clients == 0) {
105 		int mask = server->source->mask;
106 		SPA_FLAG_SET(mask, SPA_IO_IN);
107 		pw_loop_update_io(impl->loop, server->source, mask);
108 	}
109 
110 	client->server = NULL;
111 
112 	return true;
113 }
114 
client_disconnect(struct client * client)115 void client_disconnect(struct client *client)
116 {
117 	struct impl *impl = client->impl;
118 
119 	if (client->disconnect)
120 		return;
121 
122 	client_emit_disconnect(client);
123 
124 	/* the client must be detached from the server to disconnect */
125 	spa_assert(client->server == NULL);
126 
127 	client->disconnect = true;
128 	spa_list_append(&impl->cleanup_clients, &client->link);
129 
130 	pw_map_for_each(&client->streams, client_free_stream, client);
131 
132 	if (client->source)
133 		pw_loop_destroy_source(impl->loop, client->source);
134 
135 	if (client->manager)
136 		pw_manager_destroy(client->manager);
137 }
138 
client_free(struct client * client)139 void client_free(struct client *client)
140 {
141 	struct impl *impl = client->impl;
142 	struct pending_sample *p;
143 	struct message *msg;
144 	struct operation *o;
145 
146 	pw_log_debug("client %p: free", client);
147 
148 	client_detach(client);
149 	client_disconnect(client);
150 
151 	/* remove from the `impl->cleanup_clients` list */
152 	spa_list_remove(&client->link);
153 
154 	spa_list_consume(p, &client->pending_samples, link)
155 		pending_sample_free(p);
156 
157 	if (client->message)
158 		message_free(impl, client->message, false, false);
159 
160 	spa_list_consume(msg, &client->out_messages, link)
161 		message_free(impl, msg, true, false);
162 
163 	spa_list_consume(o, &client->operations, link)
164 		operation_free(o);
165 
166 	if (client->core) {
167 		client->disconnecting = true;
168 		pw_core_disconnect(client->core);
169 	}
170 
171 	pw_map_clear(&client->streams);
172 
173 	free(client->default_sink);
174 	free(client->default_source);
175 
176 	pw_properties_free(client->props);
177 	pw_properties_free(client->routes);
178 
179 	spa_hook_list_clean(&client->listener_list);
180 
181 	free(client);
182 }
183 
client_queue_message(struct client * client,struct message * msg)184 int client_queue_message(struct client *client, struct message *msg)
185 {
186 	struct impl *impl = client->impl;
187 	int res;
188 
189 	if (msg == NULL)
190 		return -EINVAL;
191 
192 	if (client->disconnect) {
193 		res = -ENOTCONN;
194 		goto error;
195 	}
196 
197 	if (msg->length == 0) {
198 		res = 0;
199 		goto error;
200 	} else if (msg->length > msg->allocated) {
201 		res = -ENOMEM;
202 		goto error;
203 	}
204 
205 	msg->offset = 0;
206 	spa_list_append(&client->out_messages, &msg->link);
207 
208 	uint32_t mask = client->source->mask;
209 	if (!SPA_FLAG_IS_SET(mask, SPA_IO_OUT)) {
210 		SPA_FLAG_SET(mask, SPA_IO_OUT);
211 		pw_loop_update_io(impl->loop, client->source, mask);
212 	}
213 
214 	client->new_msg_since_last_flush = true;
215 
216 	return 0;
217 
218 error:
219 	message_free(impl, msg, false, false);
220 	return res;
221 }
222 
client_try_flush_messages(struct client * client)223 static int client_try_flush_messages(struct client *client)
224 {
225 	struct impl *impl = client->impl;
226 
227 	pw_log_trace("client %p: flushing", client);
228 
229 	spa_assert(!client->disconnect);
230 
231 	while (!spa_list_is_empty(&client->out_messages)) {
232 		struct message *m = spa_list_first(&client->out_messages, struct message, link);
233 		struct descriptor desc;
234 		const void *data;
235 		size_t size;
236 
237 		if (client->out_index < sizeof(desc)) {
238 			desc.length = htonl(m->length);
239 			desc.channel = htonl(m->channel);
240 			desc.offset_hi = 0;
241 			desc.offset_lo = 0;
242 			desc.flags = 0;
243 
244 			data = SPA_PTROFF(&desc, client->out_index, void);
245 			size = sizeof(desc) - client->out_index;
246 		} else if (client->out_index < m->length + sizeof(desc)) {
247 			uint32_t idx = client->out_index - sizeof(desc);
248 			data = m->data + idx;
249 			size = m->length - idx;
250 		} else {
251 			if (debug_messages && m->channel == SPA_ID_INVALID)
252 				message_dump(SPA_LOG_LEVEL_INFO, m);
253 			message_free(impl, m, true, false);
254 			client->out_index = 0;
255 			continue;
256 		}
257 
258 		while (true) {
259 			ssize_t sent = send(client->source->fd, data, size, MSG_NOSIGNAL | MSG_DONTWAIT);
260 			if (sent < 0) {
261 				int res = -errno;
262 				if (res == -EINTR)
263 					continue;
264 				if (res != -EAGAIN && res != -EWOULDBLOCK)
265 					pw_log_warn("client %p: send channel:%u %zu, error %d: %m",
266 						    client, m->channel, size, res);
267 				return res;
268 			}
269 
270 			client->out_index += sent;
271 			break;
272 		}
273 	}
274 
275 	return 0;
276 }
277 
client_flush_messages(struct client * client)278 int client_flush_messages(struct client *client)
279 {
280 	client->new_msg_since_last_flush = false;
281 
282 	int res = client_try_flush_messages(client);
283 	if (res >= 0) {
284 		uint32_t mask = client->source->mask;
285 
286 		if (SPA_FLAG_IS_SET(mask, SPA_IO_OUT)) {
287 			SPA_FLAG_CLEAR(mask, SPA_IO_OUT);
288 			pw_loop_update_io(client->impl->loop, client->source, mask);
289 		}
290 	} else {
291 		if (res != -EAGAIN && res != -EWOULDBLOCK)
292 			return res;
293 	}
294 
295 	return 0;
296 }
297 
298 /* returns true if an event with the (mask, event, id) triplet should be dropped because it is redundant */
client_prune_subscribe_events(struct client * client,uint32_t mask,uint32_t event,uint32_t id)299 static bool client_prune_subscribe_events(struct client *client, uint32_t mask, uint32_t event, uint32_t id)
300 {
301 	struct impl *impl = client->impl;
302 	struct message *m, *t, *first;
303 
304 	if ((event & SUBSCRIPTION_EVENT_TYPE_MASK) == SUBSCRIPTION_EVENT_NEW)
305 		return false;
306 
307 	first = spa_list_first(&client->out_messages, struct message, link);
308 
309 	spa_list_for_each_safe_reverse(m, t, &client->out_messages, link) {
310 		if (m->extra[0] != COMMAND_SUBSCRIBE_EVENT)
311 			continue;
312 		if ((m->extra[1] ^ event) & SUBSCRIPTION_EVENT_FACILITY_MASK)
313 			continue;
314 		if (m->extra[2] != id)
315 			continue;
316 
317 		if ((event & SUBSCRIPTION_EVENT_TYPE_MASK) == SUBSCRIPTION_EVENT_REMOVE) {
318 			/* This object is being removed, hence there is
319 			 * point in keeping the old events regarding
320 			 * entry in the queue. */
321 
322 			/* if the first message has already been partially sent, do not drop it */
323 			if (m != first || client->out_index == 0) {
324 				message_free(impl, m, true, false);
325 				pw_log_debug("client %p: dropped redundant event due to remove event", client);
326 			}
327 		}
328 
329 		if ((event & SUBSCRIPTION_EVENT_TYPE_MASK) == SUBSCRIPTION_EVENT_CHANGE) {
330 			/* This object has changed. If a "new" or "change" event for
331 			 * this object is still in the queue we can exit. */
332 			pw_log_debug("client %p: dropped redundant event due to change event", client);
333 			return true;
334 		}
335 	}
336 
337 	return false;
338 }
339 
client_queue_subscribe_event(struct client * client,uint32_t mask,uint32_t event,uint32_t id)340 int client_queue_subscribe_event(struct client *client, uint32_t mask, uint32_t event, uint32_t id)
341 {
342 	if (client->disconnect)
343 		return -ENOTCONN;
344 
345 	if (!(client->subscribed & mask))
346 		return 0;
347 
348 	pw_log_debug("client %p: SUBSCRIBE event:%08x id:%u", client, event, id);
349 
350 	if (client_prune_subscribe_events(client, mask, event, id))
351 		return 0;
352 
353 	struct message *reply = message_alloc(client->impl, -1, 0);
354 	reply->extra[0] = COMMAND_SUBSCRIBE_EVENT;
355 	reply->extra[1] = event;
356 	reply->extra[2] = id;
357 
358 	message_put(reply,
359 		TAG_U32, COMMAND_SUBSCRIBE_EVENT,
360 		TAG_U32, -1,
361 		TAG_U32, event,
362 		TAG_U32, id,
363 		TAG_INVALID);
364 
365 	return client_queue_message(client, reply);
366 }
367