1 /* PipeWire
2 *
3 * Copyright © 2020 Wim Taymans
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining a
6 * copy of this software and associated documentation files (the "Software"),
7 * to deal in the Software without restriction, including without limitation
8 * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9 * and/or sell copies of the Software, and to permit persons to whom the
10 * Software is furnished to do so, subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice (including the next
13 * paragraph) shall be included in all copies or substantial portions of the
14 * Software.
15 *
16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
19 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
22 * DEALINGS IN THE SOFTWARE.
23 */
24
25 #include <stdbool.h>
26 #include <stdint.h>
27 #include <stdlib.h>
28
29 #include <spa/utils/hook.h>
30 #include <spa/utils/ringbuffer.h>
31 #include <pipewire/log.h>
32 #include <pipewire/loop.h>
33 #include <pipewire/map.h>
34 #include <pipewire/private.h>
35 #include <pipewire/properties.h>
36 #include <pipewire/stream.h>
37 #include <pipewire/work-queue.h>
38
39 #include "client.h"
40 #include "commands.h"
41 #include "internal.h"
42 #include "log.h"
43 #include "message.h"
44 #include "reply.h"
45 #include "stream.h"
46
stream_new(struct client * client,enum stream_type type,uint32_t create_tag,const struct sample_spec * ss,const struct channel_map * map,const struct buffer_attr * attr)47 struct stream *stream_new(struct client *client, enum stream_type type, uint32_t create_tag,
48 const struct sample_spec *ss, const struct channel_map *map,
49 const struct buffer_attr *attr)
50 {
51 int res;
52
53 struct stream *stream = calloc(1, sizeof(*stream));
54 if (stream == NULL)
55 return NULL;
56
57 stream->channel = pw_map_insert_new(&client->streams, stream);
58 if (stream->channel == SPA_ID_INVALID)
59 goto error_errno;
60
61 stream->impl = client->impl;
62 stream->client = client;
63 stream->type = type;
64 stream->create_tag = create_tag;
65 stream->ss = *ss;
66 stream->map = *map;
67 stream->attr = *attr;
68 spa_ringbuffer_init(&stream->ring);
69
70 switch (type) {
71 case STREAM_TYPE_RECORD:
72 stream->direction = PW_DIRECTION_INPUT;
73 break;
74 case STREAM_TYPE_PLAYBACK:
75 case STREAM_TYPE_UPLOAD:
76 stream->direction = PW_DIRECTION_OUTPUT;
77 break;
78 default:
79 spa_assert_not_reached();
80 }
81
82 return stream;
83
84 error_errno:
85 res = errno;
86 free(stream);
87 errno = res;
88
89 return NULL;
90 }
91
stream_free(struct stream * stream)92 void stream_free(struct stream *stream)
93 {
94 struct client *client = stream->client;
95 struct impl *impl = client->impl;
96
97 pw_log_debug("client %p: stream %p channel:%d", client, stream, stream->channel);
98
99 if (stream->pending)
100 spa_list_remove(&stream->link);
101
102 if (stream->drain_tag)
103 reply_error(client, -1, stream->drain_tag, -ENOENT);
104
105 if (stream->killed)
106 stream_send_killed(stream);
107
108 if (stream->stream) {
109 spa_hook_remove(&stream->stream_listener);
110 pw_stream_disconnect(stream->stream);
111
112 /* force processing of all pending messages before we destroy
113 * the stream */
114 pw_loop_invoke(impl->loop, NULL, 0, NULL, 0, false, client);
115
116 pw_stream_destroy(stream->stream);
117 }
118 if (stream->channel != SPA_ID_INVALID)
119 pw_map_remove(&client->streams, stream->channel);
120
121 pw_work_queue_cancel(impl->work_queue, stream, SPA_ID_INVALID);
122
123 if (stream->buffer)
124 free(stream->buffer);
125
126 pw_properties_free(stream->props);
127
128 free(stream);
129 }
130
stream_flush(struct stream * stream)131 void stream_flush(struct stream *stream)
132 {
133 pw_stream_flush(stream->stream, false);
134
135 if (stream->type == STREAM_TYPE_PLAYBACK) {
136 stream->ring.writeindex = stream->ring.readindex;
137 stream->write_index = stream->read_index;
138
139 stream->missing = stream->attr.tlength -
140 SPA_MIN(stream->requested, stream->attr.tlength);
141
142 if (stream->attr.prebuf > 0)
143 stream->in_prebuf = true;
144
145 stream->playing_for = 0;
146 stream->underrun_for = -1;
147 stream->is_underrun = true;
148
149 stream_send_request(stream);
150 } else {
151 stream->ring.readindex = stream->ring.writeindex;
152 stream->read_index = stream->write_index;
153 }
154 }
155
stream_prebuf_active(struct stream * stream)156 static bool stream_prebuf_active(struct stream *stream)
157 {
158 uint32_t index;
159 int32_t avail;
160
161 avail = spa_ringbuffer_get_write_index(&stream->ring, &index);
162
163 if (stream->in_prebuf) {
164 if (avail >= (int32_t) stream->attr.prebuf)
165 stream->in_prebuf = false;
166 } else {
167 if (stream->attr.prebuf > 0 && avail <= 0)
168 stream->in_prebuf = true;
169 }
170 return stream->in_prebuf;
171 }
172
stream_pop_missing(struct stream * stream)173 uint32_t stream_pop_missing(struct stream *stream)
174 {
175 uint32_t missing;
176
177 if (stream->missing <= 0)
178 return 0;
179
180 if (stream->missing < stream->attr.minreq && !stream_prebuf_active(stream))
181 return 0;
182
183 missing = stream->missing;
184 stream->requested += missing;
185 stream->missing = 0;
186
187 return missing;
188 }
189
stream_send_underflow(struct stream * stream,int64_t offset,uint32_t underrun_for)190 int stream_send_underflow(struct stream *stream, int64_t offset, uint32_t underrun_for)
191 {
192 struct client *client = stream->client;
193 struct impl *impl = client->impl;
194 struct message *reply;
195
196 if (ratelimit_test(&impl->rate_limit, stream->timestamp, SPA_LOG_LEVEL_INFO)) {
197 pw_log_info("[%s]: UNDERFLOW channel:%u offset:%" PRIi64 " underrun:%u",
198 client->name, stream->channel, offset, underrun_for);
199 }
200
201 reply = message_alloc(impl, -1, 0);
202 message_put(reply,
203 TAG_U32, COMMAND_UNDERFLOW,
204 TAG_U32, -1,
205 TAG_U32, stream->channel,
206 TAG_INVALID);
207
208 if (client->version >= 23) {
209 message_put(reply,
210 TAG_S64, offset,
211 TAG_INVALID);
212 }
213
214 return client_queue_message(client, reply);
215 }
216
stream_send_overflow(struct stream * stream)217 int stream_send_overflow(struct stream *stream)
218 {
219 struct client *client = stream->client;
220 struct impl *impl = client->impl;
221 struct message *reply;
222
223 pw_log_warn("client %p [%s]: stream %p OVERFLOW channel:%u",
224 client, client->name, stream, stream->channel);
225
226 reply = message_alloc(impl, -1, 0);
227 message_put(reply,
228 TAG_U32, COMMAND_OVERFLOW,
229 TAG_U32, -1,
230 TAG_U32, stream->channel,
231 TAG_INVALID);
232
233 return client_queue_message(client, reply);
234 }
235
stream_send_killed(struct stream * stream)236 int stream_send_killed(struct stream *stream)
237 {
238 struct client *client = stream->client;
239 struct impl *impl = client->impl;
240 struct message *reply;
241 uint32_t command;
242
243 command = stream->direction == PW_DIRECTION_OUTPUT ?
244 COMMAND_PLAYBACK_STREAM_KILLED :
245 COMMAND_RECORD_STREAM_KILLED;
246
247 pw_log_info("[%s]: %s channel:%u",
248 client->name, commands[command].name, stream->channel);
249
250 if (client->version < 23)
251 return 0;
252
253 reply = message_alloc(impl, -1, 0);
254 message_put(reply,
255 TAG_U32, command,
256 TAG_U32, -1,
257 TAG_U32, stream->channel,
258 TAG_INVALID);
259
260 return client_queue_message(client, reply);
261 }
262
stream_send_started(struct stream * stream)263 int stream_send_started(struct stream *stream)
264 {
265 struct client *client = stream->client;
266 struct impl *impl = client->impl;
267 struct message *reply;
268
269 pw_log_debug("client %p [%s]: stream %p STARTED channel:%u",
270 client, client->name, stream, stream->channel);
271
272 reply = message_alloc(impl, -1, 0);
273 message_put(reply,
274 TAG_U32, COMMAND_STARTED,
275 TAG_U32, -1,
276 TAG_U32, stream->channel,
277 TAG_INVALID);
278
279 return client_queue_message(client, reply);
280 }
281
stream_send_request(struct stream * stream)282 int stream_send_request(struct stream *stream)
283 {
284 struct client *client = stream->client;
285 struct impl *impl = client->impl;
286 struct message *msg;
287 uint32_t size;
288
289 size = stream_pop_missing(stream);
290 pw_log_debug("stream %p: REQUEST channel:%d %u", stream, stream->channel, size);
291
292 if (size == 0)
293 return 0;
294
295 msg = message_alloc(impl, -1, 0);
296 message_put(msg,
297 TAG_U32, COMMAND_REQUEST,
298 TAG_U32, -1,
299 TAG_U32, stream->channel,
300 TAG_U32, size,
301 TAG_INVALID);
302
303 return client_queue_message(client, msg);
304 }
305
stream_update_minreq(struct stream * stream,uint32_t minreq)306 int stream_update_minreq(struct stream *stream, uint32_t minreq)
307 {
308 struct client *client = stream->client;
309 struct impl *impl = client->impl;
310 uint32_t old_tlength = stream->attr.tlength;
311 uint32_t new_tlength = minreq + 2 * stream->attr.minreq;
312 uint64_t lat_usec;
313
314 if (new_tlength <= old_tlength)
315 return 0;
316
317 stream->missing += new_tlength - old_tlength;
318 stream->attr.tlength = new_tlength;
319
320 if (client->version >= 15) {
321 struct message *msg;
322
323 lat_usec = minreq * SPA_USEC_PER_SEC / stream->ss.rate;
324
325 msg = message_alloc(impl, -1, 0);
326 message_put(msg,
327 TAG_U32, COMMAND_PLAYBACK_BUFFER_ATTR_CHANGED,
328 TAG_U32, -1,
329 TAG_U32, stream->channel,
330 TAG_U32, stream->attr.maxlength,
331 TAG_U32, stream->attr.tlength,
332 TAG_U32, stream->attr.prebuf,
333 TAG_U32, stream->attr.minreq,
334 TAG_USEC, lat_usec,
335 TAG_INVALID);
336 return client_queue_message(client, msg);
337 }
338 return 0;
339 }
340