1 /* PipeWire
2 *
3 * Copyright © 2020 Wim Taymans
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining a
6 * copy of this software and associated documentation files (the "Software"),
7 * to deal in the Software without restriction, including without limitation
8 * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9 * and/or sell copies of the Software, and to permit persons to whom the
10 * Software is furnished to do so, subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice (including the next
13 * paragraph) shall be included in all copies or substantial portions of the
14 * Software.
15 *
16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
19 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
22 * DEALINGS IN THE SOFTWARE.
23 */
24
25 #include <stdbool.h>
26 #include <stdint.h>
27 #include <stdlib.h>
28 #include <arpa/inet.h>
29 #include <sys/socket.h>
30
31 #include <spa/utils/defs.h>
32 #include <spa/utils/hook.h>
33 #include <spa/utils/list.h>
34 #include <pipewire/core.h>
35 #include <pipewire/log.h>
36 #include <pipewire/loop.h>
37 #include <pipewire/map.h>
38 #include <pipewire/properties.h>
39
40 #include "client.h"
41 #include "commands.h"
42 #include "defs.h"
43 #include "internal.h"
44 #include "log.h"
45 #include "manager.h"
46 #include "message.h"
47 #include "operation.h"
48 #include "pending-sample.h"
49 #include "server.h"
50 #include "stream.h"
51
52 #define client_emit_disconnect(c) spa_hook_list_call(&(c)->listener_list, struct client_events, disconnect, 0)
53
client_new(struct server * server)54 struct client *client_new(struct server *server)
55 {
56 struct client *client = calloc(1, sizeof(*client));
57 if (client == NULL)
58 return NULL;
59
60 client->ref = 1;
61 client->server = server;
62 client->impl = server->impl;
63 client->connect_tag = SPA_ID_INVALID;
64
65 pw_map_init(&client->streams, 16, 16);
66 spa_list_init(&client->out_messages);
67 spa_list_init(&client->operations);
68 spa_list_init(&client->pending_samples);
69 spa_list_init(&client->pending_streams);
70 spa_hook_list_init(&client->listener_list);
71
72 spa_list_append(&server->clients, &client->link);
73 server->n_clients++;
74
75 return client;
76 }
77
client_free_stream(void * item,void * data)78 static int client_free_stream(void *item, void *data)
79 {
80 struct stream *s = item;
81
82 stream_free(s);
83 return 0;
84 }
85
86 /*
87 * tries to detach the client from the server,
88 * but it does not drop the server's reference
89 */
client_detach(struct client * client)90 bool client_detach(struct client *client)
91 {
92 struct impl *impl = client->impl;
93 struct server *server = client->server;
94
95 if (server == NULL)
96 return false;
97
98 pw_log_debug("client %p: detaching from server %p", client, server);
99
100 /* remove from the `server->clients` list */
101 spa_list_remove(&client->link);
102
103 server->n_clients--;
104 if (server->wait_clients > 0 && --server->wait_clients == 0) {
105 int mask = server->source->mask;
106 SPA_FLAG_SET(mask, SPA_IO_IN);
107 pw_loop_update_io(impl->loop, server->source, mask);
108 }
109
110 client->server = NULL;
111
112 return true;
113 }
114
client_disconnect(struct client * client)115 void client_disconnect(struct client *client)
116 {
117 struct impl *impl = client->impl;
118
119 if (client->disconnect)
120 return;
121
122 client_emit_disconnect(client);
123
124 /* the client must be detached from the server to disconnect */
125 spa_assert(client->server == NULL);
126
127 client->disconnect = true;
128 spa_list_append(&impl->cleanup_clients, &client->link);
129
130 pw_map_for_each(&client->streams, client_free_stream, client);
131
132 if (client->source)
133 pw_loop_destroy_source(impl->loop, client->source);
134
135 if (client->manager)
136 pw_manager_destroy(client->manager);
137 }
138
client_free(struct client * client)139 void client_free(struct client *client)
140 {
141 struct impl *impl = client->impl;
142 struct pending_sample *p;
143 struct message *msg;
144 struct operation *o;
145
146 pw_log_debug("client %p: free", client);
147
148 client_detach(client);
149 client_disconnect(client);
150
151 /* remove from the `impl->cleanup_clients` list */
152 spa_list_remove(&client->link);
153
154 spa_list_consume(p, &client->pending_samples, link)
155 pending_sample_free(p);
156
157 if (client->message)
158 message_free(impl, client->message, false, false);
159
160 spa_list_consume(msg, &client->out_messages, link)
161 message_free(impl, msg, true, false);
162
163 spa_list_consume(o, &client->operations, link)
164 operation_free(o);
165
166 if (client->core) {
167 client->disconnecting = true;
168 pw_core_disconnect(client->core);
169 }
170
171 pw_map_clear(&client->streams);
172
173 free(client->default_sink);
174 free(client->default_source);
175
176 pw_properties_free(client->props);
177 pw_properties_free(client->routes);
178
179 spa_hook_list_clean(&client->listener_list);
180
181 free(client);
182 }
183
client_queue_message(struct client * client,struct message * msg)184 int client_queue_message(struct client *client, struct message *msg)
185 {
186 struct impl *impl = client->impl;
187 int res;
188
189 if (msg == NULL)
190 return -EINVAL;
191
192 if (client->disconnect) {
193 res = -ENOTCONN;
194 goto error;
195 }
196
197 if (msg->length == 0) {
198 res = 0;
199 goto error;
200 } else if (msg->length > msg->allocated) {
201 res = -ENOMEM;
202 goto error;
203 }
204
205 msg->offset = 0;
206 spa_list_append(&client->out_messages, &msg->link);
207
208 uint32_t mask = client->source->mask;
209 if (!SPA_FLAG_IS_SET(mask, SPA_IO_OUT)) {
210 SPA_FLAG_SET(mask, SPA_IO_OUT);
211 pw_loop_update_io(impl->loop, client->source, mask);
212 }
213
214 client->new_msg_since_last_flush = true;
215
216 return 0;
217
218 error:
219 message_free(impl, msg, false, false);
220 return res;
221 }
222
client_try_flush_messages(struct client * client)223 static int client_try_flush_messages(struct client *client)
224 {
225 struct impl *impl = client->impl;
226
227 pw_log_trace("client %p: flushing", client);
228
229 spa_assert(!client->disconnect);
230
231 while (!spa_list_is_empty(&client->out_messages)) {
232 struct message *m = spa_list_first(&client->out_messages, struct message, link);
233 struct descriptor desc;
234 const void *data;
235 size_t size;
236
237 if (client->out_index < sizeof(desc)) {
238 desc.length = htonl(m->length);
239 desc.channel = htonl(m->channel);
240 desc.offset_hi = 0;
241 desc.offset_lo = 0;
242 desc.flags = 0;
243
244 data = SPA_PTROFF(&desc, client->out_index, void);
245 size = sizeof(desc) - client->out_index;
246 } else if (client->out_index < m->length + sizeof(desc)) {
247 uint32_t idx = client->out_index - sizeof(desc);
248 data = m->data + idx;
249 size = m->length - idx;
250 } else {
251 if (debug_messages && m->channel == SPA_ID_INVALID)
252 message_dump(SPA_LOG_LEVEL_INFO, m);
253 message_free(impl, m, true, false);
254 client->out_index = 0;
255 continue;
256 }
257
258 while (true) {
259 ssize_t sent = send(client->source->fd, data, size, MSG_NOSIGNAL | MSG_DONTWAIT);
260 if (sent < 0) {
261 int res = -errno;
262 if (res == -EINTR)
263 continue;
264 if (res != -EAGAIN && res != -EWOULDBLOCK)
265 pw_log_warn("client %p: send channel:%u %zu, error %d: %m",
266 client, m->channel, size, res);
267 return res;
268 }
269
270 client->out_index += sent;
271 break;
272 }
273 }
274
275 return 0;
276 }
277
client_flush_messages(struct client * client)278 int client_flush_messages(struct client *client)
279 {
280 client->new_msg_since_last_flush = false;
281
282 int res = client_try_flush_messages(client);
283 if (res >= 0) {
284 uint32_t mask = client->source->mask;
285
286 if (SPA_FLAG_IS_SET(mask, SPA_IO_OUT)) {
287 SPA_FLAG_CLEAR(mask, SPA_IO_OUT);
288 pw_loop_update_io(client->impl->loop, client->source, mask);
289 }
290 } else {
291 if (res != -EAGAIN && res != -EWOULDBLOCK)
292 return res;
293 }
294
295 return 0;
296 }
297
298 /* returns true if an event with the (mask, event, id) triplet should be dropped because it is redundant */
client_prune_subscribe_events(struct client * client,uint32_t mask,uint32_t event,uint32_t id)299 static bool client_prune_subscribe_events(struct client *client, uint32_t mask, uint32_t event, uint32_t id)
300 {
301 struct impl *impl = client->impl;
302 struct message *m, *t, *first;
303
304 if ((event & SUBSCRIPTION_EVENT_TYPE_MASK) == SUBSCRIPTION_EVENT_NEW)
305 return false;
306
307 first = spa_list_first(&client->out_messages, struct message, link);
308
309 spa_list_for_each_safe_reverse(m, t, &client->out_messages, link) {
310 if (m->extra[0] != COMMAND_SUBSCRIBE_EVENT)
311 continue;
312 if ((m->extra[1] ^ event) & SUBSCRIPTION_EVENT_FACILITY_MASK)
313 continue;
314 if (m->extra[2] != id)
315 continue;
316
317 if ((event & SUBSCRIPTION_EVENT_TYPE_MASK) == SUBSCRIPTION_EVENT_REMOVE) {
318 /* This object is being removed, hence there is
319 * point in keeping the old events regarding
320 * entry in the queue. */
321
322 /* if the first message has already been partially sent, do not drop it */
323 if (m != first || client->out_index == 0) {
324 message_free(impl, m, true, false);
325 pw_log_debug("client %p: dropped redundant event due to remove event", client);
326 }
327 }
328
329 if ((event & SUBSCRIPTION_EVENT_TYPE_MASK) == SUBSCRIPTION_EVENT_CHANGE) {
330 /* This object has changed. If a "new" or "change" event for
331 * this object is still in the queue we can exit. */
332 pw_log_debug("client %p: dropped redundant event due to change event", client);
333 return true;
334 }
335 }
336
337 return false;
338 }
339
client_queue_subscribe_event(struct client * client,uint32_t mask,uint32_t event,uint32_t id)340 int client_queue_subscribe_event(struct client *client, uint32_t mask, uint32_t event, uint32_t id)
341 {
342 if (client->disconnect)
343 return -ENOTCONN;
344
345 if (!(client->subscribed & mask))
346 return 0;
347
348 pw_log_debug("client %p: SUBSCRIBE event:%08x id:%u", client, event, id);
349
350 if (client_prune_subscribe_events(client, mask, event, id))
351 return 0;
352
353 struct message *reply = message_alloc(client->impl, -1, 0);
354 reply->extra[0] = COMMAND_SUBSCRIBE_EVENT;
355 reply->extra[1] = event;
356 reply->extra[2] = id;
357
358 message_put(reply,
359 TAG_U32, COMMAND_SUBSCRIBE_EVENT,
360 TAG_U32, -1,
361 TAG_U32, event,
362 TAG_U32, id,
363 TAG_INVALID);
364
365 return client_queue_message(client, reply);
366 }
367