1 /* PipeWire
2  *
3  * Copyright © 2020 Wim Taymans
4  *
5  * Permission is hereby granted, free of charge, to any person obtaining a
6  * copy of this software and associated documentation files (the "Software"),
7  * to deal in the Software without restriction, including without limitation
8  * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9  * and/or sell copies of the Software, and to permit persons to whom the
10  * Software is furnished to do so, subject to the following conditions:
11  *
12  * The above copyright notice and this permission notice (including the next
13  * paragraph) shall be included in all copies or substantial portions of the
14  * Software.
15  *
16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.  IN NO EVENT SHALL
19  * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21  * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
22  * DEALINGS IN THE SOFTWARE.
23  */
24 
25 #include <stdbool.h>
26 #include <stdint.h>
27 #include <stdlib.h>
28 
29 #include <spa/utils/hook.h>
30 #include <spa/utils/ringbuffer.h>
31 #include <pipewire/log.h>
32 #include <pipewire/loop.h>
33 #include <pipewire/map.h>
34 #include <pipewire/private.h>
35 #include <pipewire/properties.h>
36 #include <pipewire/stream.h>
37 #include <pipewire/work-queue.h>
38 
39 #include "client.h"
40 #include "commands.h"
41 #include "internal.h"
42 #include "log.h"
43 #include "message.h"
44 #include "reply.h"
45 #include "stream.h"
46 
stream_new(struct client * client,enum stream_type type,uint32_t create_tag,const struct sample_spec * ss,const struct channel_map * map,const struct buffer_attr * attr)47 struct stream *stream_new(struct client *client, enum stream_type type, uint32_t create_tag,
48 			  const struct sample_spec *ss, const struct channel_map *map,
49 			  const struct buffer_attr *attr)
50 {
51 	int res;
52 
53 	struct stream *stream = calloc(1, sizeof(*stream));
54 	if (stream == NULL)
55 		return NULL;
56 
57 	stream->channel = pw_map_insert_new(&client->streams, stream);
58 	if (stream->channel == SPA_ID_INVALID)
59 		goto error_errno;
60 
61 	stream->impl = client->impl;
62 	stream->client = client;
63 	stream->type = type;
64 	stream->create_tag = create_tag;
65 	stream->ss = *ss;
66 	stream->map = *map;
67 	stream->attr = *attr;
68 	spa_ringbuffer_init(&stream->ring);
69 
70 	switch (type) {
71 	case STREAM_TYPE_RECORD:
72 		stream->direction = PW_DIRECTION_INPUT;
73 		break;
74 	case STREAM_TYPE_PLAYBACK:
75 	case STREAM_TYPE_UPLOAD:
76 		stream->direction = PW_DIRECTION_OUTPUT;
77 		break;
78 	default:
79 		spa_assert_not_reached();
80 	}
81 
82 	return stream;
83 
84 error_errno:
85 	res = errno;
86 	free(stream);
87 	errno = res;
88 
89 	return NULL;
90 }
91 
stream_free(struct stream * stream)92 void stream_free(struct stream *stream)
93 {
94 	struct client *client = stream->client;
95 	struct impl *impl = client->impl;
96 
97 	pw_log_debug("client %p: stream %p channel:%d", client, stream, stream->channel);
98 
99 	if (stream->pending)
100 		spa_list_remove(&stream->link);
101 
102 	if (stream->drain_tag)
103 		reply_error(client, -1, stream->drain_tag, -ENOENT);
104 
105 	if (stream->killed)
106 		stream_send_killed(stream);
107 
108 	if (stream->stream) {
109 		spa_hook_remove(&stream->stream_listener);
110 		pw_stream_disconnect(stream->stream);
111 
112 		/* force processing of all pending messages before we destroy
113 		 * the stream */
114 		pw_loop_invoke(impl->loop, NULL, 0, NULL, 0, false, client);
115 
116 		pw_stream_destroy(stream->stream);
117 	}
118 	if (stream->channel != SPA_ID_INVALID)
119 		pw_map_remove(&client->streams, stream->channel);
120 
121 	pw_work_queue_cancel(impl->work_queue, stream, SPA_ID_INVALID);
122 
123 	if (stream->buffer)
124 		free(stream->buffer);
125 
126 	pw_properties_free(stream->props);
127 
128 	free(stream);
129 }
130 
stream_flush(struct stream * stream)131 void stream_flush(struct stream *stream)
132 {
133 	pw_stream_flush(stream->stream, false);
134 
135 	if (stream->type == STREAM_TYPE_PLAYBACK) {
136 		stream->ring.writeindex = stream->ring.readindex;
137 		stream->write_index = stream->read_index;
138 
139 		stream->missing = stream->attr.tlength -
140 			SPA_MIN(stream->requested, stream->attr.tlength);
141 
142 		if (stream->attr.prebuf > 0)
143 			stream->in_prebuf = true;
144 
145 		stream->playing_for = 0;
146 		stream->underrun_for = -1;
147 		stream->is_underrun = true;
148 
149 		stream_send_request(stream);
150 	} else {
151 		stream->ring.readindex = stream->ring.writeindex;
152 		stream->read_index = stream->write_index;
153 	}
154 }
155 
stream_prebuf_active(struct stream * stream)156 static bool stream_prebuf_active(struct stream *stream)
157 {
158 	uint32_t index;
159 	int32_t avail;
160 
161 	avail = spa_ringbuffer_get_write_index(&stream->ring, &index);
162 
163 	if (stream->in_prebuf) {
164 		if (avail >= (int32_t) stream->attr.prebuf)
165 			stream->in_prebuf = false;
166 	} else {
167 		if (stream->attr.prebuf > 0 && avail <= 0)
168 			stream->in_prebuf = true;
169 	}
170 	return stream->in_prebuf;
171 }
172 
stream_pop_missing(struct stream * stream)173 uint32_t stream_pop_missing(struct stream *stream)
174 {
175 	uint32_t missing;
176 
177 	if (stream->missing <= 0)
178 		return 0;
179 
180 	if (stream->missing < stream->attr.minreq && !stream_prebuf_active(stream))
181 		return 0;
182 
183 	missing = stream->missing;
184 	stream->requested += missing;
185 	stream->missing = 0;
186 
187 	return missing;
188 }
189 
stream_send_underflow(struct stream * stream,int64_t offset,uint32_t underrun_for)190 int stream_send_underflow(struct stream *stream, int64_t offset, uint32_t underrun_for)
191 {
192 	struct client *client = stream->client;
193 	struct impl *impl = client->impl;
194 	struct message *reply;
195 
196 	if (ratelimit_test(&impl->rate_limit, stream->timestamp, SPA_LOG_LEVEL_INFO)) {
197 		pw_log_info("[%s]: UNDERFLOW channel:%u offset:%" PRIi64 " underrun:%u",
198 			    client->name, stream->channel, offset, underrun_for);
199 	}
200 
201 	reply = message_alloc(impl, -1, 0);
202 	message_put(reply,
203 		TAG_U32, COMMAND_UNDERFLOW,
204 		TAG_U32, -1,
205 		TAG_U32, stream->channel,
206 		TAG_INVALID);
207 
208 	if (client->version >= 23) {
209 		message_put(reply,
210 			TAG_S64, offset,
211 			TAG_INVALID);
212 	}
213 
214 	return client_queue_message(client, reply);
215 }
216 
stream_send_overflow(struct stream * stream)217 int stream_send_overflow(struct stream *stream)
218 {
219 	struct client *client = stream->client;
220 	struct impl *impl = client->impl;
221 	struct message *reply;
222 
223 	pw_log_warn("client %p [%s]: stream %p OVERFLOW channel:%u",
224 		    client, client->name, stream, stream->channel);
225 
226 	reply = message_alloc(impl, -1, 0);
227 	message_put(reply,
228 		TAG_U32, COMMAND_OVERFLOW,
229 		TAG_U32, -1,
230 		TAG_U32, stream->channel,
231 		TAG_INVALID);
232 
233 	return client_queue_message(client, reply);
234 }
235 
stream_send_killed(struct stream * stream)236 int stream_send_killed(struct stream *stream)
237 {
238 	struct client *client = stream->client;
239 	struct impl *impl = client->impl;
240 	struct message *reply;
241 	uint32_t command;
242 
243 	command = stream->direction == PW_DIRECTION_OUTPUT ?
244 		COMMAND_PLAYBACK_STREAM_KILLED :
245 		COMMAND_RECORD_STREAM_KILLED;
246 
247 	pw_log_info("[%s]: %s channel:%u",
248 		    client->name, commands[command].name, stream->channel);
249 
250 	if (client->version < 23)
251 		return 0;
252 
253 	reply = message_alloc(impl, -1, 0);
254 	message_put(reply,
255 		TAG_U32, command,
256 		TAG_U32, -1,
257 		TAG_U32, stream->channel,
258 		TAG_INVALID);
259 
260 	return client_queue_message(client, reply);
261 }
262 
stream_send_started(struct stream * stream)263 int stream_send_started(struct stream *stream)
264 {
265 	struct client *client = stream->client;
266 	struct impl *impl = client->impl;
267 	struct message *reply;
268 
269 	pw_log_debug("client %p [%s]: stream %p STARTED channel:%u",
270 		     client, client->name, stream, stream->channel);
271 
272 	reply = message_alloc(impl, -1, 0);
273 	message_put(reply,
274 		TAG_U32, COMMAND_STARTED,
275 		TAG_U32, -1,
276 		TAG_U32, stream->channel,
277 		TAG_INVALID);
278 
279 	return client_queue_message(client, reply);
280 }
281 
stream_send_request(struct stream * stream)282 int stream_send_request(struct stream *stream)
283 {
284 	struct client *client = stream->client;
285 	struct impl *impl = client->impl;
286 	struct message *msg;
287 	uint32_t size;
288 
289 	size = stream_pop_missing(stream);
290 	pw_log_debug("stream %p: REQUEST channel:%d %u", stream, stream->channel, size);
291 
292 	if (size == 0)
293 		return 0;
294 
295 	msg = message_alloc(impl, -1, 0);
296 	message_put(msg,
297 		TAG_U32, COMMAND_REQUEST,
298 		TAG_U32, -1,
299 		TAG_U32, stream->channel,
300 		TAG_U32, size,
301 		TAG_INVALID);
302 
303 	return client_queue_message(client, msg);
304 }
305 
stream_update_minreq(struct stream * stream,uint32_t minreq)306 int stream_update_minreq(struct stream *stream, uint32_t minreq)
307 {
308 	struct client *client = stream->client;
309 	struct impl *impl = client->impl;
310 	uint32_t old_tlength = stream->attr.tlength;
311 	uint32_t new_tlength = minreq + 2 * stream->attr.minreq;
312 	uint64_t lat_usec;
313 
314 	if (new_tlength <= old_tlength)
315 		return 0;
316 
317 	stream->missing += new_tlength - old_tlength;
318 	stream->attr.tlength = new_tlength;
319 
320 	if (client->version >= 15) {
321 		struct message *msg;
322 
323 		lat_usec = minreq * SPA_USEC_PER_SEC / stream->ss.rate;
324 
325 		msg = message_alloc(impl, -1, 0);
326 		message_put(msg,
327 			TAG_U32, COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED,
328 			TAG_U32, -1,
329 			TAG_U32, stream->channel,
330 			TAG_U32, stream->attr.maxlength,
331 			TAG_U32, stream->attr.tlength,
332 			TAG_U32, stream->attr.prebuf,
333 			TAG_U32, stream->attr.minreq,
334 			TAG_USEC, lat_usec,
335 			TAG_INVALID);
336 		return client_queue_message(client, msg);
337 	}
338 	return 0;
339 }
340