1 /* PipeWire
2 *
3 * Copyright © 2018 Wim Taymans
4 *
5 * Permission is hereby granted, free of charge, to any person obtaining a
6 * copy of this software and associated documentation files (the "Software"),
7 * to deal in the Software without restriction, including without limitation
8 * the rights to use, copy, modify, merge, publish, distribute, sublicense,
9 * and/or sell copies of the Software, and to permit persons to whom the
10 * Software is furnished to do so, subject to the following conditions:
11 *
12 * The above copyright notice and this permission notice (including the next
13 * paragraph) shall be included in all copies or substantial portions of the
14 * Software.
15 *
16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
19 * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21 * FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
22 * DEALINGS IN THE SOFTWARE.
23 */
24
25 #include <errno.h>
26 #include <stdio.h>
27 #include <math.h>
28 #include <sys/mman.h>
29 #include <time.h>
30
31 #include <spa/buffer/alloc.h>
32 #include <spa/param/props.h>
33 #include <spa/node/io.h>
34 #include <spa/node/utils.h>
35 #include <spa/utils/ringbuffer.h>
36 #include <spa/pod/filter.h>
37 #include <spa/debug/format.h>
38 #include <spa/debug/types.h>
39 #include <spa/debug/pod.h>
40
41 #include "pipewire/pipewire.h"
42 #include "pipewire/stream.h"
43 #include "pipewire/private.h"
44
45 PW_LOG_TOPIC_EXTERN(log_stream);
46 #define PW_LOG_TOPIC_DEFAULT log_stream
47
48 #define MAX_BUFFERS 64
49
50 #define MASK_BUFFERS (MAX_BUFFERS-1)
51 #define MAX_PORTS 1
52
53 static bool mlock_warned = false;
54
55 static uint32_t mappable_dataTypes = (1<<SPA_DATA_MemFd);
56
57 struct buffer {
58 struct pw_buffer this;
59 uint32_t id;
60 #define BUFFER_FLAG_MAPPED (1 << 0)
61 #define BUFFER_FLAG_QUEUED (1 << 1)
62 #define BUFFER_FLAG_ADDED (1 << 2)
63 uint32_t flags;
64 struct spa_meta_busy *busy;
65 };
66
67 struct queue {
68 uint32_t ids[MAX_BUFFERS];
69 struct spa_ringbuffer ring;
70 uint64_t incount;
71 uint64_t outcount;
72 };
73
74 struct data {
75 struct pw_context *context;
76 struct spa_hook stream_listener;
77 };
78
79 struct param {
80 uint32_t id;
81 #define PARAM_FLAG_LOCKED (1 << 0)
82 uint32_t flags;
83 struct spa_list link;
84 struct spa_pod *param;
85 };
86
87 struct control {
88 uint32_t id;
89 uint32_t type;
90 uint32_t container;
91 struct spa_list link;
92 struct pw_stream_control control;
93 struct spa_pod *info;
94 unsigned int emitted:1;
95 float values[64];
96 };
97
98 struct stream {
99 struct pw_stream this;
100
101 const char *path;
102
103 struct pw_context *context;
104 struct spa_hook context_listener;
105
106 enum spa_direction direction;
107 enum pw_stream_flags flags;
108
109 struct pw_impl_node *node;
110
111 struct spa_node impl_node;
112 struct spa_node_methods node_methods;
113 struct spa_hook_list hooks;
114 struct spa_callbacks callbacks;
115
116 struct spa_io_clock *clock;
117 struct spa_io_position *position;
118 struct spa_io_buffers *io;
119 struct {
120 struct spa_io_position *position;
121 } rt;
122
123 uint32_t port_change_mask_all;
124 struct spa_port_info port_info;
125 struct pw_properties *port_props;
126 #define IDX_EnumFormat 0
127 #define IDX_Meta 1
128 #define IDX_IO 2
129 #define IDX_Format 3
130 #define IDX_Buffers 4
131 #define IDX_Latency 5
132 #define N_PORT_PARAMS 6
133 struct spa_param_info port_params[N_PORT_PARAMS];
134
135 struct spa_list param_list;
136
137 uint32_t change_mask_all;
138 struct spa_node_info info;
139 #define IDX_Props 0
140 #define N_NODE_PARAMS 1
141 struct spa_param_info params[N_NODE_PARAMS];
142
143 uint32_t media_type;
144 uint32_t media_subtype;
145
146 struct buffer buffers[MAX_BUFFERS];
147 uint32_t n_buffers;
148
149 struct queue dequeued;
150 struct queue queued;
151
152 struct data data;
153 uintptr_t seq;
154 struct pw_time time;
155 uint64_t base_pos;
156 uint32_t clock_id;
157 struct spa_latency_info latency;
158 uint64_t quantum;
159
160 struct spa_callbacks rt_callbacks;
161
162 unsigned int disconnecting:1;
163 unsigned int disconnect_core:1;
164 unsigned int draining:1;
165 unsigned int drained:1;
166 unsigned int allow_mlock:1;
167 unsigned int warn_mlock:1;
168 unsigned int process_rt:1;
169 unsigned int driving:1;
170 unsigned int using_trigger:1;
171 unsigned int trigger:1;
172 };
173
get_param_index(uint32_t id)174 static int get_param_index(uint32_t id)
175 {
176 switch (id) {
177 case SPA_PARAM_Props:
178 return IDX_Props;
179 default:
180 return -1;
181 }
182 }
183
get_port_param_index(uint32_t id)184 static int get_port_param_index(uint32_t id)
185 {
186 switch (id) {
187 case SPA_PARAM_EnumFormat:
188 return IDX_EnumFormat;
189 case SPA_PARAM_Meta:
190 return IDX_Meta;
191 case SPA_PARAM_IO:
192 return IDX_IO;
193 case SPA_PARAM_Format:
194 return IDX_Format;
195 case SPA_PARAM_Buffers:
196 return IDX_Buffers;
197 case SPA_PARAM_Latency:
198 return IDX_Latency;
199 default:
200 return -1;
201 }
202 }
203
fix_datatype(const struct spa_pod * param)204 static void fix_datatype(const struct spa_pod *param)
205 {
206 const struct spa_pod_prop *pod_param;
207 const struct spa_pod *vals;
208 uint32_t dataType, n_vals, choice;
209
210 pod_param = spa_pod_find_prop(param, NULL, SPA_PARAM_BUFFERS_dataType);
211 if (pod_param == NULL)
212 return;
213
214 vals = spa_pod_get_values(&pod_param->value, &n_vals, &choice);
215 if (n_vals == 0)
216 return;
217
218 if (spa_pod_get_int(&vals[0], (int32_t*)&dataType) < 0)
219 return;
220
221 pw_log_debug("dataType: %u", dataType);
222 if (dataType & (1u << SPA_DATA_MemPtr)) {
223 SPA_POD_VALUE(struct spa_pod_int, &vals[0]) =
224 dataType | mappable_dataTypes;
225 pw_log_debug("Change dataType: %u -> %u", dataType,
226 SPA_POD_VALUE(struct spa_pod_int, &vals[0]));
227 }
228 }
229
add_param(struct stream * impl,uint32_t id,uint32_t flags,const struct spa_pod * param)230 static struct param *add_param(struct stream *impl,
231 uint32_t id, uint32_t flags, const struct spa_pod *param)
232 {
233 struct param *p;
234 int idx;
235
236 if (param == NULL || !spa_pod_is_object(param)) {
237 errno = EINVAL;
238 return NULL;
239 }
240 if (id == SPA_ID_INVALID)
241 id = SPA_POD_OBJECT_ID(param);
242
243 p = malloc(sizeof(struct param) + SPA_POD_SIZE(param));
244 if (p == NULL)
245 return NULL;
246
247 if (id == SPA_PARAM_Buffers &&
248 SPA_FLAG_IS_SET(impl->flags, PW_STREAM_FLAG_MAP_BUFFERS) &&
249 impl->direction == SPA_DIRECTION_INPUT)
250 fix_datatype(param);
251
252 p->id = id;
253 p->flags = flags;
254 p->param = SPA_PTROFF(p, sizeof(struct param), struct spa_pod);
255 memcpy(p->param, param, SPA_POD_SIZE(param));
256 SPA_POD_OBJECT_ID(p->param) = id;
257
258 spa_list_append(&impl->param_list, &p->link);
259
260 if ((idx = get_param_index(id)) != -1) {
261 impl->info.change_mask |= SPA_NODE_CHANGE_MASK_PARAMS;
262 impl->params[idx].flags |= SPA_PARAM_INFO_READ;
263 impl->params[idx].user++;
264 } else if ((idx = get_port_param_index(id)) != -1) {
265 impl->port_info.change_mask |= SPA_PORT_CHANGE_MASK_PARAMS;
266 impl->port_params[idx].flags |= SPA_PARAM_INFO_READ;
267 impl->port_params[idx].user++;
268 }
269 return p;
270 }
271
clear_params(struct stream * impl,uint32_t id)272 static void clear_params(struct stream *impl, uint32_t id)
273 {
274 struct param *p, *t;
275
276 spa_list_for_each_safe(p, t, &impl->param_list, link) {
277 if (id == SPA_ID_INVALID ||
278 (p->id == id && !(p->flags & PARAM_FLAG_LOCKED))) {
279 spa_list_remove(&p->link);
280 free(p);
281 }
282 }
283 }
284
update_params(struct stream * impl,uint32_t id,const struct spa_pod ** params,uint32_t n_params)285 static int update_params(struct stream *impl, uint32_t id,
286 const struct spa_pod **params, uint32_t n_params)
287 {
288 uint32_t i;
289 int res = 0;
290
291 if (id != SPA_ID_INVALID) {
292 clear_params(impl, id);
293 } else {
294 for (i = 0; i < n_params; i++) {
295 if (!spa_pod_is_object(params[i]))
296 continue;
297 clear_params(impl, SPA_POD_OBJECT_ID(params[i]));
298 }
299 }
300 for (i = 0; i < n_params; i++) {
301 if (add_param(impl, id, 0, params[i]) == NULL) {
302 res = -errno;
303 break;
304 }
305 }
306 return res;
307 }
308
309
push_queue(struct stream * stream,struct queue * queue,struct buffer * buffer)310 static inline int push_queue(struct stream *stream, struct queue *queue, struct buffer *buffer)
311 {
312 uint32_t index;
313
314 if (SPA_FLAG_IS_SET(buffer->flags, BUFFER_FLAG_QUEUED))
315 return -EINVAL;
316
317 SPA_FLAG_SET(buffer->flags, BUFFER_FLAG_QUEUED);
318 queue->incount += buffer->this.size;
319
320 spa_ringbuffer_get_write_index(&queue->ring, &index);
321 queue->ids[index & MASK_BUFFERS] = buffer->id;
322 spa_ringbuffer_write_update(&queue->ring, index + 1);
323
324 return 0;
325 }
326
pop_queue(struct stream * stream,struct queue * queue)327 static inline struct buffer *pop_queue(struct stream *stream, struct queue *queue)
328 {
329 uint32_t index, id;
330 struct buffer *buffer;
331
332 if (spa_ringbuffer_get_read_index(&queue->ring, &index) < 1) {
333 errno = EPIPE;
334 return NULL;
335 }
336
337 id = queue->ids[index & MASK_BUFFERS];
338 spa_ringbuffer_read_update(&queue->ring, index + 1);
339
340 buffer = &stream->buffers[id];
341 queue->outcount += buffer->this.size;
342 SPA_FLAG_CLEAR(buffer->flags, BUFFER_FLAG_QUEUED);
343
344 return buffer;
345 }
clear_queue(struct stream * stream,struct queue * queue)346 static inline void clear_queue(struct stream *stream, struct queue *queue)
347 {
348 spa_ringbuffer_init(&queue->ring);
349 queue->incount = queue->outcount;
350 }
351
stream_set_state(struct pw_stream * stream,enum pw_stream_state state,const char * error)352 static bool stream_set_state(struct pw_stream *stream, enum pw_stream_state state, const char *error)
353 {
354 enum pw_stream_state old = stream->state;
355 bool res = old != state;
356
357 if (res) {
358 free(stream->error);
359 stream->error = error ? strdup(error) : NULL;
360
361 pw_log_debug("%p: update state from %s -> %s (%s)", stream,
362 pw_stream_state_as_string(old),
363 pw_stream_state_as_string(state), stream->error);
364
365 if (state == PW_STREAM_STATE_ERROR)
366 pw_log_error("%p: error %s", stream, error);
367
368 stream->state = state;
369 pw_stream_emit_state_changed(stream, old, state, error);
370 }
371 return res;
372 }
373
get_buffer(struct pw_stream * stream,uint32_t id)374 static struct buffer *get_buffer(struct pw_stream *stream, uint32_t id)
375 {
376 struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
377 if (id < impl->n_buffers)
378 return &impl->buffers[id];
379
380 errno = EINVAL;
381 return NULL;
382 }
383
384 static int
do_call_process(struct spa_loop * loop,bool async,uint32_t seq,const void * data,size_t size,void * user_data)385 do_call_process(struct spa_loop *loop,
386 bool async, uint32_t seq, const void *data, size_t size, void *user_data)
387 {
388 struct stream *impl = user_data;
389 struct pw_stream *stream = &impl->this;
390 pw_log_trace("%p: do process", stream);
391 pw_stream_emit_process(stream);
392 return 0;
393 }
394
call_process(struct stream * impl)395 static void call_process(struct stream *impl)
396 {
397 pw_log_trace("%p: call process rt:%u", impl, impl->process_rt);
398 if (impl->process_rt)
399 spa_callbacks_call(&impl->rt_callbacks, struct pw_stream_events, process, 0);
400 else
401 pw_loop_invoke(impl->context->main_loop,
402 do_call_process, 1, NULL, 0, false, impl);
403 }
404
405 static int
do_call_drained(struct spa_loop * loop,bool async,uint32_t seq,const void * data,size_t size,void * user_data)406 do_call_drained(struct spa_loop *loop,
407 bool async, uint32_t seq, const void *data, size_t size, void *user_data)
408 {
409 struct stream *impl = user_data;
410 struct pw_stream *stream = &impl->this;
411 pw_log_trace("%p: drained", stream);
412 pw_stream_emit_drained(stream);
413 return 0;
414 }
415
call_drained(struct stream * impl)416 static void call_drained(struct stream *impl)
417 {
418 pw_loop_invoke(impl->context->main_loop,
419 do_call_drained, 1, NULL, 0, false, impl);
420 }
421
422 static int
do_call_trigger_done(struct spa_loop * loop,bool async,uint32_t seq,const void * data,size_t size,void * user_data)423 do_call_trigger_done(struct spa_loop *loop,
424 bool async, uint32_t seq, const void *data, size_t size, void *user_data)
425 {
426 struct stream *impl = user_data;
427 struct pw_stream *stream = &impl->this;
428 pw_log_trace("%p: trigger_done", stream);
429 pw_stream_emit_trigger_done(stream);
430 return 0;
431 }
432
call_trigger_done(struct stream * impl)433 static void call_trigger_done(struct stream *impl)
434 {
435 pw_loop_invoke(impl->context->main_loop,
436 do_call_trigger_done, 1, NULL, 0, false, impl);
437 }
438
439 static int
do_set_position(struct spa_loop * loop,bool async,uint32_t seq,const void * data,size_t size,void * user_data)440 do_set_position(struct spa_loop *loop,
441 bool async, uint32_t seq, const void *data, size_t size, void *user_data)
442 {
443 struct stream *impl = user_data;
444 impl->rt.position = impl->position;
445 return 0;
446 }
447
impl_set_io(void * object,uint32_t id,void * data,size_t size)448 static int impl_set_io(void *object, uint32_t id, void *data, size_t size)
449 {
450 struct stream *impl = object;
451 struct pw_stream *stream = &impl->this;
452
453 pw_log_debug("%p: set io id %d (%s) %p %zd", impl, id,
454 spa_debug_type_find_name(spa_type_io, id), data, size);
455
456 switch(id) {
457 case SPA_IO_Clock:
458 if (data && size >= sizeof(struct spa_io_clock))
459 impl->clock = data;
460 else
461 impl->clock = NULL;
462 break;
463 case SPA_IO_Position:
464 if (data && size >= sizeof(struct spa_io_position))
465 impl->position = data;
466 else
467 impl->position = NULL;
468
469 pw_loop_invoke(impl->context->data_loop,
470 do_set_position, 1, NULL, 0, true, impl);
471 break;
472 default:
473 break;
474 }
475 impl->driving = impl->clock && impl->position && impl->position->clock.id == impl->clock->id;
476 pw_stream_emit_io_changed(stream, id, data, size);
477
478 return 0;
479 }
480
enum_params(void * object,bool is_port,int seq,uint32_t id,uint32_t start,uint32_t num,const struct spa_pod * filter)481 static int enum_params(void *object, bool is_port, int seq, uint32_t id, uint32_t start, uint32_t num,
482 const struct spa_pod *filter)
483 {
484 struct stream *d = object;
485 struct spa_result_node_params result;
486 uint8_t buffer[1024];
487 struct spa_pod_builder b = { 0 };
488 uint32_t count = 0;
489 struct param *p;
490 bool found = false;
491
492 spa_return_val_if_fail(num != 0, -EINVAL);
493
494 result.id = id;
495 result.next = 0;
496
497 pw_log_debug("%p: param id %d (%s) start:%d num:%d", d, id,
498 spa_debug_type_find_name(spa_type_param, id),
499 start, num);
500
501 spa_list_for_each(p, &d->param_list, link) {
502 struct spa_pod *param;
503
504 result.index = result.next++;
505 if (result.index < start)
506 continue;
507
508 param = p->param;
509 if (param == NULL || p->id != id)
510 continue;
511
512 found = true;
513
514 spa_pod_builder_init(&b, buffer, sizeof(buffer));
515 if (spa_pod_filter(&b, &result.param, param, filter) != 0)
516 continue;
517
518 spa_node_emit_result(&d->hooks, seq, 0, SPA_RESULT_TYPE_NODE_PARAMS, &result);
519
520 if (++count == num)
521 break;
522 }
523 return found ? 0 : -ENOENT;
524 }
525
impl_enum_params(void * object,int seq,uint32_t id,uint32_t start,uint32_t num,const struct spa_pod * filter)526 static int impl_enum_params(void *object, int seq, uint32_t id, uint32_t start, uint32_t num,
527 const struct spa_pod *filter)
528 {
529 return enum_params(object, false, seq, id, start, num, filter);
530 }
531
impl_set_param(void * object,uint32_t id,uint32_t flags,const struct spa_pod * param)532 static int impl_set_param(void *object, uint32_t id, uint32_t flags, const struct spa_pod *param)
533 {
534 struct stream *impl = object;
535 struct pw_stream *stream = &impl->this;
536
537 if (id != SPA_PARAM_Props)
538 return -ENOTSUP;
539
540 pw_stream_emit_param_changed(stream, id, param);
541 return 0;
542 }
543
impl_send_command(void * object,const struct spa_command * command)544 static int impl_send_command(void *object, const struct spa_command *command)
545 {
546 struct stream *impl = object;
547 struct pw_stream *stream = &impl->this;
548 uint32_t id = SPA_NODE_COMMAND_ID(command);
549
550 pw_log_info("%p: command %s", impl,
551 spa_debug_type_find_name(spa_type_node_command_id, id));
552
553 switch (id) {
554 case SPA_NODE_COMMAND_Suspend:
555 case SPA_NODE_COMMAND_Flush:
556 case SPA_NODE_COMMAND_Pause:
557 pw_loop_invoke(impl->context->main_loop,
558 NULL, 0, NULL, 0, false, impl);
559 if (stream->state == PW_STREAM_STATE_STREAMING) {
560
561 pw_log_debug("%p: pause", stream);
562 stream_set_state(stream, PW_STREAM_STATE_PAUSED, NULL);
563 }
564 break;
565 case SPA_NODE_COMMAND_Start:
566 if (stream->state == PW_STREAM_STATE_PAUSED) {
567 pw_log_debug("%p: start %d", stream, impl->direction);
568
569 if (impl->direction == SPA_DIRECTION_INPUT)
570 impl->io->status = SPA_STATUS_NEED_DATA;
571 else if (!impl->process_rt && !impl->driving)
572 call_process(impl);
573
574 stream_set_state(stream, PW_STREAM_STATE_STREAMING, NULL);
575 }
576 break;
577 default:
578 break;
579 }
580 pw_stream_emit_command(stream, command);
581 return 0;
582 }
583
emit_node_info(struct stream * d,bool full)584 static void emit_node_info(struct stream *d, bool full)
585 {
586 uint32_t i;
587 uint64_t old = full ? d->info.change_mask : 0;
588 if (full)
589 d->info.change_mask = d->change_mask_all;
590 if (d->info.change_mask != 0) {
591 if (d->info.change_mask & SPA_NODE_CHANGE_MASK_PARAMS) {
592 for (i = 0; i < d->info.n_params; i++) {
593 if (d->params[i].user > 0) {
594 d->params[i].flags ^= SPA_PARAM_INFO_SERIAL;
595 d->params[i].user = 0;
596 }
597 }
598 }
599 spa_node_emit_info(&d->hooks, &d->info);
600 }
601 d->info.change_mask = old;
602 }
603
emit_port_info(struct stream * d,bool full)604 static void emit_port_info(struct stream *d, bool full)
605 {
606 uint32_t i;
607 uint64_t old = full ? d->port_info.change_mask : 0;
608 if (full)
609 d->port_info.change_mask = d->port_change_mask_all;
610 if (d->port_info.change_mask != 0) {
611 if (d->port_info.change_mask & SPA_PORT_CHANGE_MASK_PARAMS) {
612 for (i = 0; i < d->port_info.n_params; i++) {
613 if (d->port_params[i].user > 0) {
614 d->port_params[i].flags ^= SPA_PARAM_INFO_SERIAL;
615 d->port_params[i].user = 0;
616 }
617 }
618 }
619 spa_node_emit_port_info(&d->hooks, d->direction, 0, &d->port_info);
620 }
621 d->port_info.change_mask = old;
622 }
623
impl_add_listener(void * object,struct spa_hook * listener,const struct spa_node_events * events,void * data)624 static int impl_add_listener(void *object,
625 struct spa_hook *listener,
626 const struct spa_node_events *events,
627 void *data)
628 {
629 struct stream *d = object;
630 struct spa_hook_list save;
631
632 spa_hook_list_isolate(&d->hooks, &save, listener, events, data);
633
634 emit_node_info(d, true);
635 emit_port_info(d, true);
636
637 spa_hook_list_join(&d->hooks, &save);
638
639 return 0;
640 }
641
impl_set_callbacks(void * object,const struct spa_node_callbacks * callbacks,void * data)642 static int impl_set_callbacks(void *object,
643 const struct spa_node_callbacks *callbacks, void *data)
644 {
645 struct stream *d = object;
646
647 d->callbacks = SPA_CALLBACKS_INIT(callbacks, data);
648
649 return 0;
650 }
651
impl_port_set_io(void * object,enum spa_direction direction,uint32_t port_id,uint32_t id,void * data,size_t size)652 static int impl_port_set_io(void *object, enum spa_direction direction, uint32_t port_id,
653 uint32_t id, void *data, size_t size)
654 {
655 struct stream *impl = object;
656 struct pw_stream *stream = &impl->this;
657
658 pw_log_debug("%p: id:%d (%s) %p %zd", impl, id,
659 spa_debug_type_find_name(spa_type_io, id), data, size);
660
661 switch (id) {
662 case SPA_IO_Buffers:
663 if (data && size >= sizeof(struct spa_io_buffers))
664 impl->io = data;
665 else
666 impl->io = NULL;
667 break;
668 }
669 pw_stream_emit_io_changed(stream, id, data, size);
670
671 return 0;
672 }
673
impl_port_enum_params(void * object,int seq,enum spa_direction direction,uint32_t port_id,uint32_t id,uint32_t start,uint32_t num,const struct spa_pod * filter)674 static int impl_port_enum_params(void *object, int seq,
675 enum spa_direction direction, uint32_t port_id,
676 uint32_t id, uint32_t start, uint32_t num,
677 const struct spa_pod *filter)
678 {
679 return enum_params(object, true, seq, id, start, num, filter);
680 }
681
map_data(struct stream * impl,struct spa_data * data,int prot)682 static int map_data(struct stream *impl, struct spa_data *data, int prot)
683 {
684 void *ptr;
685 struct pw_map_range range;
686
687 pw_map_range_init(&range, data->mapoffset, data->maxsize, impl->context->sc_pagesize);
688
689 ptr = mmap(NULL, range.size, prot, MAP_SHARED, data->fd, range.offset);
690 if (ptr == MAP_FAILED) {
691 pw_log_error("%p: failed to mmap buffer mem: %m", impl);
692 return -errno;
693 }
694
695 data->data = SPA_PTROFF(ptr, range.start, void);
696 pw_log_debug("%p: fd %"PRIi64" mapped %d %d %p", impl, data->fd,
697 range.offset, range.size, data->data);
698
699 if (impl->allow_mlock && mlock(data->data, data->maxsize) < 0) {
700 if (errno != ENOMEM || !mlock_warned) {
701 pw_log(impl->warn_mlock ? SPA_LOG_LEVEL_WARN : SPA_LOG_LEVEL_DEBUG,
702 "%p: Failed to mlock memory %p %u: %s", impl,
703 data->data, data->maxsize,
704 errno == ENOMEM ?
705 "This is not a problem but for best performance, "
706 "consider increasing RLIMIT_MEMLOCK" : strerror(errno));
707 mlock_warned |= errno == ENOMEM;
708 }
709 }
710 return 0;
711 }
712
unmap_data(struct stream * impl,struct spa_data * data)713 static int unmap_data(struct stream *impl, struct spa_data *data)
714 {
715 struct pw_map_range range;
716
717 pw_map_range_init(&range, data->mapoffset, data->maxsize, impl->context->sc_pagesize);
718
719 if (munmap(SPA_PTROFF(data->data, -range.start, void), range.size) < 0)
720 pw_log_warn("%p: failed to unmap: %m", impl);
721
722 pw_log_debug("%p: fd %"PRIi64" unmapped", impl, data->fd);
723 return 0;
724 }
725
clear_buffers(struct pw_stream * stream)726 static void clear_buffers(struct pw_stream *stream)
727 {
728 struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
729 uint32_t i, j;
730
731 pw_log_debug("%p: clear buffers %d", stream, impl->n_buffers);
732
733 for (i = 0; i < impl->n_buffers; i++) {
734 struct buffer *b = &impl->buffers[i];
735
736 if (SPA_FLAG_IS_SET(b->flags, BUFFER_FLAG_ADDED))
737 pw_stream_emit_remove_buffer(stream, &b->this);
738
739 if (SPA_FLAG_IS_SET(b->flags, BUFFER_FLAG_MAPPED)) {
740 for (j = 0; j < b->this.buffer->n_datas; j++) {
741 struct spa_data *d = &b->this.buffer->datas[j];
742 pw_log_debug("%p: clear buffer %d mem",
743 stream, b->id);
744 unmap_data(impl, d);
745 }
746 }
747 }
748 impl->n_buffers = 0;
749 if (impl->direction == SPA_DIRECTION_INPUT) {
750 struct buffer *b;
751
752 while ((b = pop_queue(impl, &impl->dequeued))) {
753 if (b->busy)
754 ATOMIC_DEC(b->busy->count);
755 }
756 } else
757 clear_queue(impl, &impl->dequeued);
758 clear_queue(impl, &impl->queued);
759 }
760
parse_latency(struct pw_stream * stream,const struct spa_pod * param)761 static int parse_latency(struct pw_stream *stream, const struct spa_pod *param)
762 {
763 struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
764 struct spa_latency_info info;
765 int res;
766
767 if (param == NULL)
768 return 0;
769
770 if ((res = spa_latency_parse(param, &info)) < 0)
771 return res;
772
773 pw_log_info("stream %p: set %s latency %f-%f %d-%d %"PRIu64"-%"PRIu64, stream,
774 info.direction == SPA_DIRECTION_INPUT ? "input" : "output",
775 info.min_quantum, info.max_quantum,
776 info.min_rate, info.max_rate,
777 info.min_ns, info.max_ns);
778
779 if (info.direction == impl->direction)
780 return 0;
781
782 impl->latency = info;
783 return 0;
784 }
785
impl_port_set_param(void * object,enum spa_direction direction,uint32_t port_id,uint32_t id,uint32_t flags,const struct spa_pod * param)786 static int impl_port_set_param(void *object,
787 enum spa_direction direction, uint32_t port_id,
788 uint32_t id, uint32_t flags,
789 const struct spa_pod *param)
790 {
791 struct stream *impl = object;
792 struct pw_stream *stream = &impl->this;
793 int res;
794
795 pw_log_debug("%p: port:%d.%d id:%d (%s) param:%p disconnecting:%d", impl,
796 direction, port_id, id,
797 spa_debug_type_find_name(spa_type_param, id), param,
798 impl->disconnecting);
799
800 if (impl->disconnecting && param != NULL)
801 return -EIO;
802
803 if (param)
804 pw_log_pod(SPA_LOG_LEVEL_DEBUG, param);
805
806 if ((res = update_params(impl, id, ¶m, param ? 1 : 0)) < 0)
807 return res;
808
809 switch (id) {
810 case SPA_PARAM_Format:
811 clear_buffers(stream);
812 break;
813 case SPA_PARAM_Latency:
814 parse_latency(stream, param);
815 break;
816 default:
817 break;
818 }
819
820 pw_stream_emit_param_changed(stream, id, param);
821
822 if (stream->state == PW_STREAM_STATE_ERROR)
823 return -EIO;
824
825 emit_port_info(impl, false);
826
827 return 0;
828 }
829
impl_port_use_buffers(void * object,enum spa_direction direction,uint32_t port_id,uint32_t flags,struct spa_buffer ** buffers,uint32_t n_buffers)830 static int impl_port_use_buffers(void *object,
831 enum spa_direction direction, uint32_t port_id,
832 uint32_t flags,
833 struct spa_buffer **buffers, uint32_t n_buffers)
834 {
835 struct stream *impl = object;
836 struct pw_stream *stream = &impl->this;
837 uint32_t i, j, impl_flags = impl->flags;
838 int prot, res;
839 int size = 0;
840
841 pw_log_debug("%p: port:%d.%d buffers:%u disconnecting:%d", impl,
842 direction, port_id, n_buffers, impl->disconnecting);
843
844 if (impl->disconnecting && n_buffers > 0)
845 return -EIO;
846
847 prot = PROT_READ | (direction == SPA_DIRECTION_OUTPUT ? PROT_WRITE : 0);
848
849 clear_buffers(stream);
850
851 for (i = 0; i < n_buffers; i++) {
852 int buf_size = 0;
853 struct buffer *b = &impl->buffers[i];
854
855 b->flags = 0;
856 b->id = i;
857
858 if (SPA_FLAG_IS_SET(impl_flags, PW_STREAM_FLAG_MAP_BUFFERS)) {
859 for (j = 0; j < buffers[i]->n_datas; j++) {
860 struct spa_data *d = &buffers[i]->datas[j];
861 if ((mappable_dataTypes & (1<<d->type)) > 0) {
862 if ((res = map_data(impl, d, prot)) < 0)
863 return res;
864 SPA_FLAG_SET(b->flags, BUFFER_FLAG_MAPPED);
865 }
866 else if (d->type == SPA_DATA_MemPtr && d->data == NULL) {
867 pw_log_error("%p: invalid buffer mem", stream);
868 return -EINVAL;
869 }
870 buf_size += d->maxsize;
871 }
872
873 if (size > 0 && buf_size != size) {
874 pw_log_error("%p: invalid buffer size %d", stream, buf_size);
875 return -EINVAL;
876 } else
877 size = buf_size;
878 }
879 pw_log_debug("%p: got buffer id:%d datas:%d, mapped size %d", stream, i,
880 buffers[i]->n_datas, size);
881 }
882
883 for (i = 0; i < n_buffers; i++) {
884 struct buffer *b = &impl->buffers[i];
885
886 b->this.buffer = buffers[i];
887 b->busy = spa_buffer_find_meta_data(buffers[i], SPA_META_Busy, sizeof(*b->busy));
888
889 if (impl->direction == SPA_DIRECTION_OUTPUT) {
890 pw_log_trace("%p: recycle buffer %d", stream, b->id);
891 push_queue(impl, &impl->dequeued, b);
892 }
893
894 SPA_FLAG_SET(b->flags, BUFFER_FLAG_ADDED);
895
896 pw_stream_emit_add_buffer(stream, &b->this);
897 }
898
899 impl->n_buffers = n_buffers;
900
901 return 0;
902 }
903
impl_port_reuse_buffer(void * object,uint32_t port_id,uint32_t buffer_id)904 static int impl_port_reuse_buffer(void *object, uint32_t port_id, uint32_t buffer_id)
905 {
906 struct stream *d = object;
907 pw_log_trace("%p: recycle buffer %d", d, buffer_id);
908 if (buffer_id < d->n_buffers)
909 push_queue(d, &d->queued, &d->buffers[buffer_id]);
910 return 0;
911 }
912
copy_position(struct stream * impl,int64_t queued)913 static inline void copy_position(struct stream *impl, int64_t queued)
914 {
915 struct spa_io_position *p = impl->rt.position;
916 if (SPA_UNLIKELY(p != NULL)) {
917 SEQ_WRITE(impl->seq);
918 impl->time.now = p->clock.nsec;
919 impl->time.rate = p->clock.rate;
920 if (SPA_UNLIKELY(impl->clock_id != p->clock.id)) {
921 impl->base_pos = p->clock.position - impl->time.ticks;
922 impl->clock_id = p->clock.id;
923 }
924 impl->time.ticks = p->clock.position - impl->base_pos;
925 impl->time.delay = 0;
926 impl->time.queued = queued;
927 impl->quantum = p->clock.duration;
928 SEQ_WRITE(impl->seq);
929 }
930 }
931
impl_node_process_input(void * object)932 static int impl_node_process_input(void *object)
933 {
934 struct stream *impl = object;
935 struct pw_stream *stream = &impl->this;
936 struct spa_io_buffers *io = impl->io;
937 struct buffer *b;
938
939 pw_log_trace("%p: process in status:%d id:%d ticks:%"PRIu64" delay:%"PRIi64,
940 stream, io->status, io->buffer_id, impl->time.ticks, impl->time.delay);
941
942 if (io->status == SPA_STATUS_HAVE_DATA &&
943 (b = get_buffer(stream, io->buffer_id)) != NULL) {
944 /* push new buffer */
945 if (push_queue(impl, &impl->dequeued, b) == 0) {
946 copy_position(impl, impl->dequeued.incount);
947 if (b->busy)
948 ATOMIC_INC(b->busy->count);
949 call_process(impl);
950 }
951 }
952 if (io->status != SPA_STATUS_NEED_DATA) {
953 /* pop buffer to recycle */
954 if ((b = pop_queue(impl, &impl->queued))) {
955 pw_log_trace("%p: recycle buffer %d", stream, b->id);
956 } else if (io->status == -EPIPE)
957 return io->status;
958 io->buffer_id = b ? b->id : SPA_ID_INVALID;
959 io->status = SPA_STATUS_NEED_DATA;
960 }
961 if (impl->driving && impl->using_trigger)
962 call_trigger_done(impl);
963
964 return SPA_STATUS_NEED_DATA | SPA_STATUS_HAVE_DATA;
965 }
966
impl_node_process_output(void * object)967 static int impl_node_process_output(void *object)
968 {
969 struct stream *impl = object;
970 struct pw_stream *stream = &impl->this;
971 struct spa_io_buffers *io = impl->io;
972 struct buffer *b;
973 int res;
974 uint32_t index;
975
976 again:
977 pw_log_trace("%p: process out status:%d id:%d", stream,
978 io->status, io->buffer_id);
979
980 if ((res = io->status) != SPA_STATUS_HAVE_DATA) {
981 /* recycle old buffer */
982 if ((b = get_buffer(stream, io->buffer_id)) != NULL) {
983 pw_log_trace("%p: recycle buffer %d", stream, b->id);
984 push_queue(impl, &impl->dequeued, b);
985 }
986
987 /* pop new buffer */
988 if ((b = pop_queue(impl, &impl->queued)) != NULL) {
989 impl->drained = false;
990 io->buffer_id = b->id;
991 res = io->status = SPA_STATUS_HAVE_DATA;
992 pw_log_trace("%p: pop %d %p", stream, b->id, io);
993 } else if (impl->draining || impl->drained) {
994 impl->draining = true;
995 impl->drained = true;
996 io->buffer_id = SPA_ID_INVALID;
997 res = io->status = SPA_STATUS_DRAINED;
998 pw_log_trace("%p: draining", stream);
999 } else {
1000 io->buffer_id = SPA_ID_INVALID;
1001 res = io->status = SPA_STATUS_NEED_DATA;
1002 pw_log_trace("%p: no more buffers %p", stream, io);
1003 }
1004 }
1005
1006 copy_position(impl, impl->queued.outcount);
1007
1008 if (!impl->draining && !impl->driving) {
1009 /* we're not draining, not a driver check if we need to get
1010 * more buffers */
1011 if (!impl->process_rt) {
1012 /* not realtime and we have a free buffer, trigger process so that we have
1013 * data in the next round. */
1014 if (spa_ringbuffer_get_read_index(&impl->dequeued.ring, &index) > 0)
1015 call_process(impl);
1016 } else if (io->status == SPA_STATUS_NEED_DATA) {
1017 /* realtime and we don't have a buffer, trigger process and try
1018 * again when there is something in the queue now */
1019 call_process(impl);
1020 if (impl->draining ||
1021 spa_ringbuffer_get_read_index(&impl->queued.ring, &index) > 0)
1022 goto again;
1023 }
1024 }
1025
1026 pw_log_trace("%p: res %d", stream, res);
1027
1028 if (impl->driving && impl->using_trigger && res != SPA_STATUS_HAVE_DATA)
1029 call_trigger_done(impl);
1030
1031 return res;
1032 }
1033
1034 static const struct spa_node_methods impl_node = {
1035 SPA_VERSION_NODE_METHODS,
1036 .add_listener = impl_add_listener,
1037 .set_callbacks = impl_set_callbacks,
1038 .enum_params = impl_enum_params,
1039 .set_param = impl_set_param,
1040 .set_io = impl_set_io,
1041 .send_command = impl_send_command,
1042 .port_set_io = impl_port_set_io,
1043 .port_enum_params = impl_port_enum_params,
1044 .port_set_param = impl_port_set_param,
1045 .port_use_buffers = impl_port_use_buffers,
1046 .port_reuse_buffer = impl_port_reuse_buffer,
1047 };
1048
proxy_removed(void * _data)1049 static void proxy_removed(void *_data)
1050 {
1051 struct pw_stream *stream = _data;
1052 pw_log_debug("%p: removed", stream);
1053 spa_hook_remove(&stream->proxy_listener);
1054 stream->node_id = SPA_ID_INVALID;
1055 stream_set_state(stream, PW_STREAM_STATE_UNCONNECTED, NULL);
1056 }
1057
proxy_destroy(void * _data)1058 static void proxy_destroy(void *_data)
1059 {
1060 struct pw_stream *stream = _data;
1061 pw_log_debug("%p: destroy", stream);
1062 proxy_removed(_data);
1063 }
1064
proxy_error(void * _data,int seq,int res,const char * message)1065 static void proxy_error(void *_data, int seq, int res, const char *message)
1066 {
1067 struct pw_stream *stream = _data;
1068 /* we just emit the state change here to inform the application.
1069 * If this is supposed to be a permanent error, the app should
1070 * do a pw_stream_set_error() */
1071 pw_stream_emit_state_changed(stream, stream->state,
1072 PW_STREAM_STATE_ERROR, message);
1073 }
1074
proxy_bound(void * data,uint32_t global_id)1075 static void proxy_bound(void *data, uint32_t global_id)
1076 {
1077 struct pw_stream *stream = data;
1078 stream->node_id = global_id;
1079 stream_set_state(stream, PW_STREAM_STATE_PAUSED, NULL);
1080 }
1081
1082 static const struct pw_proxy_events proxy_events = {
1083 PW_VERSION_PROXY_EVENTS,
1084 .removed = proxy_removed,
1085 .destroy = proxy_destroy,
1086 .error = proxy_error,
1087 .bound = proxy_bound,
1088 };
1089
find_control(struct pw_stream * stream,uint32_t id)1090 static struct control *find_control(struct pw_stream *stream, uint32_t id)
1091 {
1092 struct control *c;
1093 spa_list_for_each(c, &stream->controls, link) {
1094 if (c->id == id)
1095 return c;
1096 }
1097 return NULL;
1098 }
1099
node_event_param(void * object,int seq,uint32_t id,uint32_t index,uint32_t next,struct spa_pod * param)1100 static int node_event_param(void *object, int seq,
1101 uint32_t id, uint32_t index, uint32_t next,
1102 struct spa_pod *param)
1103 {
1104 struct pw_stream *stream = object;
1105
1106 switch (id) {
1107 case SPA_PARAM_PropInfo:
1108 {
1109 struct control *c;
1110 const struct spa_pod *type, *pod;
1111 uint32_t iid, choice, n_vals, container = SPA_ID_INVALID;
1112 float *vals, bool_range[3] = { 1.0, 0.0, 1.0 };
1113
1114 if (spa_pod_parse_object(param,
1115 SPA_TYPE_OBJECT_PropInfo, NULL,
1116 SPA_PROP_INFO_id, SPA_POD_Id(&iid)) < 0)
1117 return -EINVAL;
1118
1119 c = find_control(stream, iid);
1120 if (c != NULL)
1121 return 0;
1122
1123 c = calloc(1, sizeof(*c) + SPA_POD_SIZE(param));
1124 c->info = SPA_PTROFF(c, sizeof(*c), struct spa_pod);
1125 memcpy(c->info, param, SPA_POD_SIZE(param));
1126 c->control.n_values = 0;
1127 c->control.max_values = 0;
1128 c->control.values = c->values;
1129
1130 if (spa_pod_parse_object(c->info,
1131 SPA_TYPE_OBJECT_PropInfo, NULL,
1132 SPA_PROP_INFO_name, SPA_POD_String(&c->control.name),
1133 SPA_PROP_INFO_type, SPA_POD_PodChoice(&type),
1134 SPA_PROP_INFO_container, SPA_POD_OPT_Id(&container)) < 0) {
1135 free(c);
1136 return -EINVAL;
1137 }
1138
1139 spa_list_append(&stream->controls, &c->link);
1140
1141 pod = spa_pod_get_values(type, &n_vals, &choice);
1142
1143 c->type = SPA_POD_TYPE(pod);
1144 if (spa_pod_is_float(pod))
1145 vals = SPA_POD_BODY(pod);
1146 else if (spa_pod_is_bool(pod) && n_vals > 0) {
1147 choice = SPA_CHOICE_Range;
1148 vals = bool_range;
1149 vals[0] = SPA_POD_VALUE(struct spa_pod_bool, pod);
1150 n_vals = 3;
1151 }
1152 else
1153 return -ENOTSUP;
1154
1155 c->container = container != SPA_ID_INVALID ? container : c->type;
1156
1157 switch (choice) {
1158 case SPA_CHOICE_None:
1159 if (n_vals < 1)
1160 return -EINVAL;
1161 c->control.n_values = 1;
1162 c->control.max_values = 1;
1163 c->control.values[0] = c->control.def = c->control.min = c->control.max = vals[0];
1164 break;
1165 case SPA_CHOICE_Range:
1166 if (n_vals < 3)
1167 return -EINVAL;
1168 c->control.n_values = 1;
1169 c->control.max_values = 1;
1170 c->control.values[0] = vals[0];
1171 c->control.def = vals[0];
1172 c->control.min = vals[1];
1173 c->control.max = vals[2];
1174 break;
1175 default:
1176 return -ENOTSUP;
1177 }
1178
1179 c->id = iid;
1180 pw_log_debug("%p: add control %d (%s) container:%d (def:%f min:%f max:%f)",
1181 stream, c->id, c->control.name, c->container,
1182 c->control.def, c->control.min, c->control.max);
1183 break;
1184 }
1185 case SPA_PARAM_Props:
1186 {
1187 struct spa_pod_prop *prop;
1188 struct spa_pod_object *obj = (struct spa_pod_object *) param;
1189 union {
1190 float f;
1191 bool b;
1192 } value;
1193 float *values;
1194 uint32_t i, n_values;
1195
1196 SPA_POD_OBJECT_FOREACH(obj, prop) {
1197 struct control *c;
1198
1199 c = find_control(stream, prop->key);
1200 if (c == NULL)
1201 continue;
1202
1203 switch (c->container) {
1204 case SPA_TYPE_Float:
1205 if (spa_pod_get_float(&prop->value, &value.f) < 0)
1206 continue;
1207 n_values = 1;
1208 values = &value.f;
1209 break;
1210 case SPA_TYPE_Bool:
1211 if (spa_pod_get_bool(&prop->value, &value.b) < 0)
1212 continue;
1213 value.f = value.b ? 1.0 : 0.0;
1214 n_values = 1;
1215 values = &value.f;
1216 break;
1217 case SPA_TYPE_Array:
1218 if ((values = spa_pod_get_array(&prop->value, &n_values)) == NULL ||
1219 !spa_pod_is_float(SPA_POD_ARRAY_CHILD(&prop->value)))
1220 continue;
1221 break;
1222 default:
1223 continue;
1224 }
1225
1226 if (c->emitted && c->control.n_values == n_values &&
1227 memcmp(c->control.values, values, sizeof(float) * n_values) == 0)
1228 continue;
1229
1230 memcpy(c->control.values, values, sizeof(float) * n_values);
1231 c->control.n_values = n_values;
1232 c->emitted = true;
1233
1234 pw_log_debug("%p: control %d (%s) changed %d:", stream,
1235 prop->key, c->control.name, n_values);
1236 for (i = 0; i < n_values; i++)
1237 pw_log_debug("%p: value %d %f", stream, i, values[i]);
1238
1239 pw_stream_emit_control_info(stream, prop->key, &c->control);
1240 }
1241 break;
1242 }
1243 default:
1244 break;
1245 }
1246 return 0;
1247 }
1248
node_event_info(void * object,const struct pw_node_info * info)1249 static void node_event_info(void *object, const struct pw_node_info *info)
1250 {
1251 struct pw_stream *stream = object;
1252 struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
1253 uint32_t i;
1254
1255 if (info->change_mask & PW_NODE_CHANGE_MASK_PARAMS) {
1256 for (i = 0; i < info->n_params; i++) {
1257 switch (info->params[i].id) {
1258 case SPA_PARAM_PropInfo:
1259 case SPA_PARAM_Props:
1260 pw_impl_node_for_each_param(impl->node,
1261 0, info->params[i].id,
1262 0, UINT32_MAX,
1263 NULL,
1264 node_event_param,
1265 stream);
1266 break;
1267 default:
1268 break;
1269 }
1270 }
1271 }
1272 }
1273
1274 static const struct pw_impl_node_events node_events = {
1275 PW_VERSION_IMPL_NODE_EVENTS,
1276 .info_changed = node_event_info,
1277 };
1278
on_core_error(void * object,uint32_t id,int seq,int res,const char * message)1279 static void on_core_error(void *object, uint32_t id, int seq, int res, const char *message)
1280 {
1281 struct pw_stream *stream = object;
1282
1283 pw_log_debug("%p: error id:%u seq:%d res:%d (%s): %s", stream,
1284 id, seq, res, spa_strerror(res), message);
1285
1286 if (id == PW_ID_CORE && res == -EPIPE) {
1287 stream_set_state(stream, PW_STREAM_STATE_UNCONNECTED, message);
1288 }
1289 }
1290
1291 static const struct pw_core_events core_events = {
1292 PW_VERSION_CORE_EVENTS,
1293 .error = on_core_error,
1294 };
1295
context_drained(void * data,struct pw_impl_node * node)1296 static void context_drained(void *data, struct pw_impl_node *node)
1297 {
1298 struct stream *impl = data;
1299 if (impl->node != node)
1300 return;
1301 if (impl->draining && impl->drained) {
1302 impl->draining = false;
1303 impl->io->status = SPA_STATUS_NEED_DATA;
1304 call_drained(impl);
1305 }
1306 }
1307
1308 static const struct pw_context_driver_events context_events = {
1309 PW_VERSION_CONTEXT_DRIVER_EVENTS,
1310 .drained = context_drained,
1311 };
1312
1313 static struct stream *
stream_new(struct pw_context * context,const char * name,struct pw_properties * props,const struct pw_properties * extra)1314 stream_new(struct pw_context *context, const char *name,
1315 struct pw_properties *props, const struct pw_properties *extra)
1316 {
1317 struct stream *impl;
1318 struct pw_stream *this;
1319 const char *str;
1320 int res;
1321
1322 impl = calloc(1, sizeof(struct stream));
1323 if (impl == NULL) {
1324 res = -errno;
1325 goto error_cleanup;
1326 }
1327 impl->port_props = pw_properties_new(NULL, NULL);
1328 if (impl->port_props == NULL) {
1329 res = -errno;
1330 goto error_properties;
1331 }
1332
1333 this = &impl->this;
1334 pw_log_debug("%p: new \"%s\"", impl, name);
1335
1336 if (props == NULL) {
1337 props = pw_properties_new(PW_KEY_MEDIA_NAME, name, NULL);
1338 } else if (pw_properties_get(props, PW_KEY_MEDIA_NAME) == NULL) {
1339 pw_properties_set(props, PW_KEY_MEDIA_NAME, name);
1340 }
1341 if (props == NULL) {
1342 res = -errno;
1343 goto error_properties;
1344 }
1345 if ((str = pw_context_get_conf_section(context, "stream.properties")) != NULL)
1346 pw_properties_update_string(props, str, strlen(str));
1347
1348 if (pw_properties_get(props, PW_KEY_STREAM_IS_LIVE) == NULL)
1349 pw_properties_set(props, PW_KEY_STREAM_IS_LIVE, "true");
1350
1351 if (pw_properties_get(props, PW_KEY_NODE_NAME) == NULL && extra) {
1352 str = pw_properties_get(extra, PW_KEY_APP_NAME);
1353 if (str == NULL)
1354 str = pw_properties_get(extra, PW_KEY_APP_PROCESS_BINARY);
1355 if (str == NULL)
1356 str = name;
1357 pw_properties_set(props, PW_KEY_NODE_NAME, str);
1358 }
1359 if ((str = getenv("PIPEWIRE_LATENCY")) != NULL)
1360 pw_properties_set(props, PW_KEY_NODE_LATENCY, str);
1361
1362 spa_hook_list_init(&impl->hooks);
1363 this->properties = props;
1364
1365 this->name = name ? strdup(name) : NULL;
1366 this->node_id = SPA_ID_INVALID;
1367
1368 spa_ringbuffer_init(&impl->dequeued.ring);
1369 spa_ringbuffer_init(&impl->queued.ring);
1370 spa_list_init(&impl->param_list);
1371
1372 spa_hook_list_init(&this->listener_list);
1373 spa_list_init(&this->controls);
1374
1375 this->state = PW_STREAM_STATE_UNCONNECTED;
1376
1377 impl->context = context;
1378 impl->allow_mlock = context->settings.mem_allow_mlock;
1379 impl->warn_mlock = context->settings.mem_warn_mlock;
1380
1381 spa_hook_list_append(&impl->context->driver_listener_list,
1382 &impl->context_listener,
1383 &context_events, impl);
1384 return impl;
1385
1386 error_properties:
1387 pw_properties_free(impl->port_props);
1388 free(impl);
1389 error_cleanup:
1390 pw_properties_free(props);
1391 errno = -res;
1392 return NULL;
1393 }
1394
1395 SPA_EXPORT
pw_stream_new(struct pw_core * core,const char * name,struct pw_properties * props)1396 struct pw_stream * pw_stream_new(struct pw_core *core, const char *name,
1397 struct pw_properties *props)
1398 {
1399 struct stream *impl;
1400 struct pw_stream *this;
1401 struct pw_context *context = core->context;
1402
1403 impl = stream_new(context, name, props, core->properties);
1404 if (impl == NULL)
1405 return NULL;
1406
1407 this = &impl->this;
1408 this->core = core;
1409 spa_list_append(&core->stream_list, &this->link);
1410 pw_core_add_listener(core,
1411 &this->core_listener, &core_events, this);
1412
1413 return this;
1414 }
1415
1416 SPA_EXPORT
1417 struct pw_stream *
pw_stream_new_simple(struct pw_loop * loop,const char * name,struct pw_properties * props,const struct pw_stream_events * events,void * data)1418 pw_stream_new_simple(struct pw_loop *loop,
1419 const char *name,
1420 struct pw_properties *props,
1421 const struct pw_stream_events *events,
1422 void *data)
1423 {
1424 struct pw_stream *this;
1425 struct stream *impl;
1426 struct pw_context *context;
1427 int res;
1428
1429 if (props == NULL)
1430 props = pw_properties_new(NULL, NULL);
1431 if (props == NULL)
1432 return NULL;
1433
1434 context = pw_context_new(loop, NULL, 0);
1435 if (context == NULL) {
1436 res = -errno;
1437 goto error_cleanup;
1438 }
1439
1440 impl = stream_new(context, name, props, NULL);
1441 if (impl == NULL) {
1442 res = -errno;
1443 props = NULL;
1444 goto error_cleanup;
1445 }
1446
1447 this = &impl->this;
1448 impl->data.context = context;
1449 pw_stream_add_listener(this, &impl->data.stream_listener, events, data);
1450
1451 return this;
1452
1453 error_cleanup:
1454 if (context)
1455 pw_context_destroy(context);
1456 pw_properties_free(props);
1457 errno = -res;
1458 return NULL;
1459 }
1460
1461 SPA_EXPORT
pw_stream_state_as_string(enum pw_stream_state state)1462 const char *pw_stream_state_as_string(enum pw_stream_state state)
1463 {
1464 switch (state) {
1465 case PW_STREAM_STATE_ERROR:
1466 return "error";
1467 case PW_STREAM_STATE_UNCONNECTED:
1468 return "unconnected";
1469 case PW_STREAM_STATE_CONNECTING:
1470 return "connecting";
1471 case PW_STREAM_STATE_PAUSED:
1472 return "paused";
1473 case PW_STREAM_STATE_STREAMING:
1474 return "streaming";
1475 }
1476 return "invalid-state";
1477 }
1478
1479 SPA_EXPORT
pw_stream_destroy(struct pw_stream * stream)1480 void pw_stream_destroy(struct pw_stream *stream)
1481 {
1482 struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
1483 struct control *c;
1484
1485 pw_log_debug("%p: destroy", stream);
1486
1487 pw_stream_emit_destroy(stream);
1488
1489 if (!impl->disconnecting)
1490 pw_stream_disconnect(stream);
1491
1492 if (stream->core) {
1493 spa_hook_remove(&stream->core_listener);
1494 spa_list_remove(&stream->link);
1495 stream->core = NULL;
1496 }
1497
1498 clear_params(impl, SPA_ID_INVALID);
1499
1500 pw_log_debug("%p: free", stream);
1501 free(stream->error);
1502
1503 pw_properties_free(stream->properties);
1504
1505 free(stream->name);
1506
1507 spa_list_consume(c, &stream->controls, link) {
1508 spa_list_remove(&c->link);
1509 free(c);
1510 }
1511
1512 spa_hook_list_clean(&impl->hooks);
1513 spa_hook_list_clean(&stream->listener_list);
1514
1515 spa_hook_remove(&impl->context_listener);
1516
1517 if (impl->data.context)
1518 pw_context_destroy(impl->data.context);
1519
1520 pw_properties_free(impl->port_props);
1521 free(impl);
1522 }
1523
hook_removed(struct spa_hook * hook)1524 static void hook_removed(struct spa_hook *hook)
1525 {
1526 struct stream *impl = hook->priv;
1527 spa_zero(impl->rt_callbacks);
1528 hook->priv = NULL;
1529 hook->removed = NULL;
1530 }
1531
1532 SPA_EXPORT
pw_stream_add_listener(struct pw_stream * stream,struct spa_hook * listener,const struct pw_stream_events * events,void * data)1533 void pw_stream_add_listener(struct pw_stream *stream,
1534 struct spa_hook *listener,
1535 const struct pw_stream_events *events,
1536 void *data)
1537 {
1538 struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
1539 spa_hook_list_append(&stream->listener_list, listener, events, data);
1540
1541 if (events->process && impl->rt_callbacks.funcs == NULL) {
1542 impl->rt_callbacks = SPA_CALLBACKS_INIT(events, data);
1543 listener->removed = hook_removed;
1544 listener->priv = impl;
1545 }
1546 }
1547
1548 SPA_EXPORT
pw_stream_get_state(struct pw_stream * stream,const char ** error)1549 enum pw_stream_state pw_stream_get_state(struct pw_stream *stream, const char **error)
1550 {
1551 if (error)
1552 *error = stream->error;
1553 return stream->state;
1554 }
1555
1556 SPA_EXPORT
pw_stream_get_name(struct pw_stream * stream)1557 const char *pw_stream_get_name(struct pw_stream *stream)
1558 {
1559 return stream->name;
1560 }
1561
1562 SPA_EXPORT
pw_stream_get_properties(struct pw_stream * stream)1563 const struct pw_properties *pw_stream_get_properties(struct pw_stream *stream)
1564 {
1565 return stream->properties;
1566 }
1567
1568 SPA_EXPORT
pw_stream_update_properties(struct pw_stream * stream,const struct spa_dict * dict)1569 int pw_stream_update_properties(struct pw_stream *stream, const struct spa_dict *dict)
1570 {
1571 struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
1572 int changed, res = 0;
1573
1574 changed = pw_properties_update(stream->properties, dict);
1575
1576 if (!changed)
1577 return 0;
1578
1579 if (impl->node)
1580 res = pw_impl_node_update_properties(impl->node, dict);
1581
1582 return res;
1583 }
1584
1585 SPA_EXPORT
pw_stream_get_core(struct pw_stream * stream)1586 struct pw_core *pw_stream_get_core(struct pw_stream *stream)
1587 {
1588 return stream->core;
1589 }
1590
add_params(struct stream * impl)1591 static void add_params(struct stream *impl)
1592 {
1593 uint8_t buffer[4096];
1594 struct spa_pod_builder b;
1595
1596 spa_pod_builder_init(&b, buffer, sizeof(buffer));
1597
1598 add_param(impl, SPA_PARAM_IO, PARAM_FLAG_LOCKED,
1599 spa_pod_builder_add_object(&b,
1600 SPA_TYPE_OBJECT_ParamIO, SPA_PARAM_IO,
1601 SPA_PARAM_IO_id, SPA_POD_Id(SPA_IO_Buffers),
1602 SPA_PARAM_IO_size, SPA_POD_Int(sizeof(struct spa_io_buffers))));
1603
1604 add_param(impl, SPA_PARAM_Meta, PARAM_FLAG_LOCKED,
1605 spa_pod_builder_add_object(&b,
1606 SPA_TYPE_OBJECT_ParamMeta, SPA_PARAM_Meta,
1607 SPA_PARAM_META_type, SPA_POD_Id(SPA_META_Busy),
1608 SPA_PARAM_META_size, SPA_POD_Int(sizeof(struct spa_meta_busy))));
1609 }
1610
find_format(struct stream * impl,enum pw_direction direction,uint32_t * media_type,uint32_t * media_subtype)1611 static int find_format(struct stream *impl, enum pw_direction direction,
1612 uint32_t *media_type, uint32_t *media_subtype)
1613 {
1614 uint32_t state = 0;
1615 uint8_t buffer[4096];
1616 struct spa_pod_builder b;
1617 int res;
1618 struct spa_pod *format;
1619
1620 spa_pod_builder_init(&b, buffer, sizeof(buffer));
1621 if (spa_node_port_enum_params_sync(&impl->impl_node,
1622 impl->direction, 0,
1623 SPA_PARAM_EnumFormat, &state,
1624 NULL, &format, &b) != 1) {
1625 pw_log_warn("%p: no format given", impl);
1626 return 0;
1627 }
1628
1629 if ((res = spa_format_parse(format, media_type, media_subtype)) < 0)
1630 return res;
1631
1632 pw_log_debug("%p: %s/%s", impl,
1633 spa_debug_type_find_name(spa_type_media_type, *media_type),
1634 spa_debug_type_find_name(spa_type_media_subtype, *media_subtype));
1635 return 0;
1636 }
1637
get_media_class(struct stream * impl)1638 static const char *get_media_class(struct stream *impl)
1639 {
1640 switch (impl->media_type) {
1641 case SPA_MEDIA_TYPE_audio:
1642 return "Audio";
1643 case SPA_MEDIA_TYPE_video:
1644 return "Video";
1645 case SPA_MEDIA_TYPE_application:
1646 switch(impl->media_subtype) {
1647 case SPA_MEDIA_SUBTYPE_control:
1648 return "Midi";
1649 }
1650 return "Data";
1651 case SPA_MEDIA_TYPE_stream:
1652 switch(impl->media_subtype) {
1653 case SPA_MEDIA_SUBTYPE_midi:
1654 return "Midi";
1655 }
1656 return "Data";
1657 default:
1658 return "Unknown";
1659 }
1660 }
1661
1662 SPA_EXPORT
1663 int
pw_stream_connect(struct pw_stream * stream,enum pw_direction direction,uint32_t target_id,enum pw_stream_flags flags,const struct spa_pod ** params,uint32_t n_params)1664 pw_stream_connect(struct pw_stream *stream,
1665 enum pw_direction direction,
1666 uint32_t target_id,
1667 enum pw_stream_flags flags,
1668 const struct spa_pod **params,
1669 uint32_t n_params)
1670 {
1671 struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
1672 struct pw_impl_factory *factory;
1673 struct pw_properties *props = NULL;
1674 struct pw_impl_node *follower;
1675 const char *str;
1676 uint32_t i;
1677 int res;
1678
1679 pw_log_debug("%p: connect target:%d", stream, target_id);
1680 impl->direction =
1681 direction == PW_DIRECTION_INPUT ? SPA_DIRECTION_INPUT : SPA_DIRECTION_OUTPUT;
1682 impl->flags = flags;
1683 impl->node_methods = impl_node;
1684
1685 if (impl->direction == SPA_DIRECTION_INPUT)
1686 impl->node_methods.process = impl_node_process_input;
1687 else
1688 impl->node_methods.process = impl_node_process_output;
1689
1690 impl->process_rt = SPA_FLAG_IS_SET(flags, PW_STREAM_FLAG_RT_PROCESS);
1691
1692 impl->impl_node.iface = SPA_INTERFACE_INIT(
1693 SPA_TYPE_INTERFACE_Node,
1694 SPA_VERSION_NODE,
1695 &impl->node_methods, impl);
1696
1697 impl->change_mask_all =
1698 SPA_NODE_CHANGE_MASK_FLAGS |
1699 SPA_NODE_CHANGE_MASK_PROPS |
1700 SPA_NODE_CHANGE_MASK_PARAMS;
1701
1702 impl->info = SPA_NODE_INFO_INIT();
1703 if (impl->direction == SPA_DIRECTION_INPUT) {
1704 impl->info.max_input_ports = 1;
1705 impl->info.max_output_ports = 0;
1706 } else {
1707 impl->info.max_input_ports = 0;
1708 impl->info.max_output_ports = 1;
1709 }
1710 /* we're always RT safe, if the stream was marked RT_PROCESS,
1711 * the callback must be RT safe */
1712 impl->info.flags = SPA_NODE_FLAG_RT;
1713 /* if the callback was not marked RT_PROCESS, we will offload
1714 * the process callback in the main thread and we are ASYNC */
1715 if (!impl->process_rt)
1716 impl->info.flags |= SPA_NODE_FLAG_ASYNC;
1717 impl->info.props = &stream->properties->dict;
1718 impl->params[IDX_Props] = SPA_PARAM_INFO(SPA_PARAM_Props, SPA_PARAM_INFO_WRITE);
1719 impl->info.params = impl->params;
1720 impl->info.n_params = N_NODE_PARAMS;
1721 impl->info.change_mask = impl->change_mask_all;
1722
1723 impl->port_change_mask_all =
1724 SPA_PORT_CHANGE_MASK_FLAGS |
1725 SPA_PORT_CHANGE_MASK_PROPS |
1726 SPA_PORT_CHANGE_MASK_PARAMS;
1727
1728 impl->port_info = SPA_PORT_INFO_INIT();
1729 impl->port_info.change_mask = impl->port_change_mask_all;
1730 impl->port_info.flags = 0;
1731 if (SPA_FLAG_IS_SET(flags, PW_STREAM_FLAG_ALLOC_BUFFERS))
1732 impl->port_info.flags |= SPA_PORT_FLAG_CAN_ALLOC_BUFFERS;
1733 impl->port_params[IDX_EnumFormat] = SPA_PARAM_INFO(SPA_PARAM_EnumFormat, 0);
1734 impl->port_params[IDX_Meta] = SPA_PARAM_INFO(SPA_PARAM_Meta, 0);
1735 impl->port_params[IDX_IO] = SPA_PARAM_INFO(SPA_PARAM_IO, 0);
1736 impl->port_params[IDX_Format] = SPA_PARAM_INFO(SPA_PARAM_Format, SPA_PARAM_INFO_WRITE);
1737 impl->port_params[IDX_Buffers] = SPA_PARAM_INFO(SPA_PARAM_Buffers, 0);
1738 impl->port_params[IDX_Latency] = SPA_PARAM_INFO(SPA_PARAM_Latency, SPA_PARAM_INFO_WRITE);
1739 impl->port_info.props = &impl->port_props->dict;
1740 impl->port_info.params = impl->port_params;
1741 impl->port_info.n_params = N_PORT_PARAMS;
1742
1743 clear_params(impl, SPA_ID_INVALID);
1744 for (i = 0; i < n_params; i++)
1745 add_param(impl, SPA_ID_INVALID, 0, params[i]);
1746
1747 add_params(impl);
1748
1749 if ((res = find_format(impl, direction, &impl->media_type, &impl->media_subtype)) < 0)
1750 return res;
1751
1752 impl->disconnecting = false;
1753 stream_set_state(stream, PW_STREAM_STATE_CONNECTING, NULL);
1754
1755 if (target_id != PW_ID_ANY)
1756 pw_properties_setf(stream->properties, PW_KEY_NODE_TARGET, "%d", target_id);
1757 else if ((str = getenv("PIPEWIRE_NODE")) != NULL)
1758 pw_properties_set(stream->properties, PW_KEY_NODE_TARGET, str);
1759 if ((flags & PW_STREAM_FLAG_AUTOCONNECT) &&
1760 pw_properties_get(stream->properties, PW_KEY_NODE_AUTOCONNECT) == NULL) {
1761 str = getenv("PIPEWIRE_AUTOCONNECT");
1762 pw_properties_set(stream->properties, PW_KEY_NODE_AUTOCONNECT, str ? str : "true");
1763 }
1764 if (flags & PW_STREAM_FLAG_DRIVER)
1765 pw_properties_set(stream->properties, PW_KEY_NODE_DRIVER, "true");
1766 if (flags & PW_STREAM_FLAG_EXCLUSIVE)
1767 pw_properties_set(stream->properties, PW_KEY_NODE_EXCLUSIVE, "true");
1768 if (flags & PW_STREAM_FLAG_DONT_RECONNECT)
1769 pw_properties_set(stream->properties, PW_KEY_NODE_DONT_RECONNECT, "true");
1770 if (flags & PW_STREAM_FLAG_TRIGGER) {
1771 pw_properties_set(stream->properties, PW_KEY_NODE_TRIGGER, "true");
1772 impl->trigger = true;
1773 }
1774
1775 if ((str = pw_properties_get(stream->properties, "mem.warn-mlock")) != NULL)
1776 impl->warn_mlock = pw_properties_parse_bool(str);
1777
1778 if ((pw_properties_get(stream->properties, PW_KEY_MEDIA_CLASS) == NULL)) {
1779 const char *media_type = pw_properties_get(stream->properties, PW_KEY_MEDIA_TYPE);
1780 pw_properties_setf(stream->properties, PW_KEY_MEDIA_CLASS, "Stream/%s/%s",
1781 direction == PW_DIRECTION_INPUT ? "Input" : "Output",
1782 media_type ? media_type : get_media_class(impl));
1783 }
1784 if ((str = pw_properties_get(stream->properties, PW_KEY_FORMAT_DSP)) != NULL)
1785 pw_properties_set(impl->port_props, PW_KEY_FORMAT_DSP, str);
1786 else if (impl->media_type == SPA_MEDIA_TYPE_application &&
1787 impl->media_subtype == SPA_MEDIA_SUBTYPE_control)
1788 pw_properties_set(impl->port_props, PW_KEY_FORMAT_DSP, "8 bit raw midi");
1789
1790 impl->port_info.props = &impl->port_props->dict;
1791
1792 if (stream->core == NULL) {
1793 stream->core = pw_context_connect(impl->context,
1794 pw_properties_copy(stream->properties), 0);
1795 if (stream->core == NULL) {
1796 res = -errno;
1797 goto error_connect;
1798 }
1799 spa_list_append(&stream->core->stream_list, &stream->link);
1800 pw_core_add_listener(stream->core,
1801 &stream->core_listener, &core_events, stream);
1802 impl->disconnect_core = true;
1803 }
1804
1805 pw_log_debug("%p: creating node", stream);
1806 props = pw_properties_copy(stream->properties);
1807 if (props == NULL) {
1808 res = -errno;
1809 goto error_node;
1810 }
1811
1812 if ((str = pw_properties_get(props, PW_KEY_STREAM_MONITOR)) &&
1813 pw_properties_parse_bool(str)) {
1814 pw_properties_set(props, "resample.peaks", "true");
1815 pw_properties_set(props, "channelmix.normalize", "true");
1816 }
1817
1818 follower = pw_context_create_node(impl->context, pw_properties_copy(props), 0);
1819 if (follower == NULL) {
1820 res = -errno;
1821 goto error_node;
1822 }
1823
1824 pw_impl_node_set_implementation(follower, &impl->impl_node);
1825
1826 if (impl->media_type == SPA_MEDIA_TYPE_audio) {
1827 factory = pw_context_find_factory(impl->context, "adapter");
1828 if (factory == NULL) {
1829 pw_log_error("%p: no adapter factory found", stream);
1830 res = -ENOENT;
1831 goto error_node;
1832 }
1833 pw_properties_setf(props, "adapt.follower.node", "pointer:%p", follower);
1834 pw_properties_set(props, "object.register", "false");
1835 impl->node = pw_impl_factory_create_object(factory,
1836 NULL,
1837 PW_TYPE_INTERFACE_Node,
1838 PW_VERSION_NODE,
1839 props,
1840 0);
1841 props = NULL;
1842 if (impl->node == NULL) {
1843 res = -errno;
1844 goto error_node;
1845 }
1846 } else {
1847 impl->node = follower;
1848 pw_properties_free(props);
1849 props = NULL;
1850 }
1851 pw_impl_node_set_active(impl->node,
1852 !SPA_FLAG_IS_SET(impl->flags, PW_STREAM_FLAG_INACTIVE));
1853
1854 pw_log_debug("%p: export node %p", stream, impl->node);
1855 stream->proxy = pw_core_export(stream->core,
1856 PW_TYPE_INTERFACE_Node, NULL, impl->node, 0);
1857 if (stream->proxy == NULL) {
1858 res = -errno;
1859 goto error_proxy;
1860 }
1861
1862 pw_proxy_add_listener(stream->proxy, &stream->proxy_listener, &proxy_events, stream);
1863
1864 pw_impl_node_add_listener(impl->node, &stream->node_listener, &node_events, stream);
1865
1866 return 0;
1867
1868 error_connect:
1869 pw_log_error("%p: can't connect: %s", stream, spa_strerror(res));
1870 goto exit_cleanup;
1871 error_node:
1872 pw_log_error("%p: can't make node: %s", stream, spa_strerror(res));
1873 goto exit_cleanup;
1874 error_proxy:
1875 pw_log_error("%p: can't make proxy: %s", stream, spa_strerror(res));
1876 goto exit_cleanup;
1877
1878 exit_cleanup:
1879 pw_properties_free(props);
1880 return res;
1881 }
1882
1883 SPA_EXPORT
pw_stream_get_node_id(struct pw_stream * stream)1884 uint32_t pw_stream_get_node_id(struct pw_stream *stream)
1885 {
1886 return stream->node_id;
1887 }
1888
1889 SPA_EXPORT
pw_stream_disconnect(struct pw_stream * stream)1890 int pw_stream_disconnect(struct pw_stream *stream)
1891 {
1892 struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
1893
1894 pw_log_debug("%p: disconnect", stream);
1895
1896 if (impl->disconnecting)
1897 return 0;
1898
1899 impl->disconnecting = true;
1900
1901 if (impl->node)
1902 pw_impl_node_set_active(impl->node, false);
1903
1904 if (stream->proxy) {
1905 pw_proxy_destroy(stream->proxy);
1906 stream->proxy = NULL;
1907 }
1908
1909 if (impl->node) {
1910 pw_impl_node_destroy(impl->node);
1911 impl->node = NULL;
1912 }
1913 if (impl->disconnect_core) {
1914 impl->disconnect_core = false;
1915 spa_hook_remove(&stream->core_listener);
1916 spa_list_remove(&stream->link);
1917 pw_core_disconnect(stream->core);
1918 stream->core = NULL;
1919 }
1920 return 0;
1921 }
1922
1923 SPA_EXPORT
pw_stream_set_error(struct pw_stream * stream,int res,const char * error,...)1924 int pw_stream_set_error(struct pw_stream *stream,
1925 int res, const char *error, ...)
1926 {
1927 if (res < 0) {
1928 va_list args;
1929 char *value;
1930 int r;
1931
1932 va_start(args, error);
1933 r = vasprintf(&value, error, args);
1934 va_end(args);
1935 if (r < 0)
1936 return -errno;
1937
1938 if (stream->proxy)
1939 pw_proxy_error(stream->proxy, res, value);
1940 stream_set_state(stream, PW_STREAM_STATE_ERROR, value);
1941
1942 free(value);
1943 }
1944 return res;
1945 }
1946
1947 SPA_EXPORT
pw_stream_update_params(struct pw_stream * stream,const struct spa_pod ** params,uint32_t n_params)1948 int pw_stream_update_params(struct pw_stream *stream,
1949 const struct spa_pod **params,
1950 uint32_t n_params)
1951 {
1952 struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
1953 int res;
1954
1955 pw_log_debug("%p: update params", stream);
1956 if ((res = update_params(impl, SPA_ID_INVALID, params, n_params)) < 0)
1957 return res;
1958
1959 emit_node_info(impl, false);
1960 emit_port_info(impl, false);
1961
1962 return res;
1963 }
1964
1965 SPA_EXPORT
pw_stream_set_control(struct pw_stream * stream,uint32_t id,uint32_t n_values,float * values,...)1966 int pw_stream_set_control(struct pw_stream *stream, uint32_t id, uint32_t n_values, float *values, ...)
1967 {
1968 struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
1969 va_list varargs;
1970 char buf[1024];
1971 struct spa_pod_builder b = SPA_POD_BUILDER_INIT(buf, sizeof(buf));
1972 struct spa_pod_frame f[1];
1973 struct spa_pod *pod;
1974 struct control *c;
1975
1976 va_start(varargs, values);
1977
1978 spa_pod_builder_push_object(&b, &f[0], SPA_TYPE_OBJECT_Props, SPA_PARAM_Props);
1979 while (1) {
1980 pw_log_debug("%p: set control %d %d %f", stream, id, n_values, values[0]);
1981
1982 if ((c = find_control(stream, id))) {
1983 spa_pod_builder_prop(&b, id, 0);
1984 switch (c->container) {
1985 case SPA_TYPE_Float:
1986 spa_pod_builder_float(&b, values[0]);
1987 break;
1988 case SPA_TYPE_Bool:
1989 spa_pod_builder_bool(&b, values[0] < 0.5 ? false : true);
1990 break;
1991 case SPA_TYPE_Array:
1992 spa_pod_builder_array(&b,
1993 sizeof(float), SPA_TYPE_Float,
1994 n_values, values);
1995 break;
1996 default:
1997 spa_pod_builder_none(&b);
1998 break;
1999 }
2000 } else {
2001 pw_log_warn("%p: unknown control with id %d", stream, id);
2002 }
2003 if ((id = va_arg(varargs, uint32_t)) == 0)
2004 break;
2005 n_values = va_arg(varargs, uint32_t);
2006 values = va_arg(varargs, float *);
2007 }
2008 pod = spa_pod_builder_pop(&b, &f[0]);
2009
2010 va_end(varargs);
2011
2012 pw_impl_node_set_param(impl->node, SPA_PARAM_Props, 0, pod);
2013
2014 return 0;
2015 }
2016
2017 SPA_EXPORT
pw_stream_get_control(struct pw_stream * stream,uint32_t id)2018 const struct pw_stream_control *pw_stream_get_control(struct pw_stream *stream, uint32_t id)
2019 {
2020 struct control *c;
2021
2022 if (id == 0)
2023 return NULL;
2024
2025 if ((c = find_control(stream, id)))
2026 return &c->control;
2027
2028 return NULL;
2029 }
2030
2031 SPA_EXPORT
pw_stream_set_active(struct pw_stream * stream,bool active)2032 int pw_stream_set_active(struct pw_stream *stream, bool active)
2033 {
2034 struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
2035 pw_log_debug("%p: active:%d", stream, active);
2036 if (impl->node)
2037 pw_impl_node_set_active(impl->node, active);
2038
2039 if (!active || impl->drained)
2040 impl->drained = impl->draining = false;
2041 return 0;
2042 }
2043
2044 SPA_EXPORT
pw_stream_get_time(struct pw_stream * stream,struct pw_time * time)2045 int pw_stream_get_time(struct pw_stream *stream, struct pw_time *time)
2046 {
2047 struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
2048 uintptr_t seq1, seq2;
2049
2050 do {
2051 seq1 = SEQ_READ(impl->seq);
2052 *time = impl->time;
2053 seq2 = SEQ_READ(impl->seq);
2054 } while (!SEQ_READ_SUCCESS(seq1, seq2));
2055
2056 if (impl->direction == SPA_DIRECTION_INPUT)
2057 time->queued = (int64_t)(time->queued - impl->dequeued.outcount);
2058 else
2059 time->queued = (int64_t)(impl->queued.incount - time->queued);
2060
2061 time->delay += ((impl->latency.min_quantum + impl->latency.max_quantum) / 2) * impl->quantum;
2062 time->delay += (impl->latency.min_rate + impl->latency.max_rate) / 2;
2063 time->delay += ((impl->latency.min_ns + impl->latency.max_ns) / 2) * time->rate.denom / SPA_NSEC_PER_SEC;
2064
2065 pw_log_trace("%p: %"PRIi64" %"PRIi64" %"PRIu64" %d/%d %"PRIu64" %"
2066 PRIu64" %"PRIu64" %"PRIu64" %"PRIu64, stream,
2067 time->now, time->delay, time->ticks,
2068 time->rate.num, time->rate.denom, time->queued,
2069 impl->dequeued.outcount, impl->dequeued.incount,
2070 impl->queued.outcount, impl->queued.incount);
2071
2072 return 0;
2073 }
2074
2075 static int
do_trigger_deprecated(struct spa_loop * loop,bool async,uint32_t seq,const void * data,size_t size,void * user_data)2076 do_trigger_deprecated(struct spa_loop *loop,
2077 bool async, uint32_t seq, const void *data, size_t size, void *user_data)
2078 {
2079 struct stream *impl = user_data;
2080 int res = impl->node_methods.process(impl);
2081 return spa_node_call_ready(&impl->callbacks, res);
2082 }
2083
2084 SPA_EXPORT
pw_stream_dequeue_buffer(struct pw_stream * stream)2085 struct pw_buffer *pw_stream_dequeue_buffer(struct pw_stream *stream)
2086 {
2087 struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
2088 struct buffer *b;
2089 int res;
2090
2091 if ((b = pop_queue(impl, &impl->dequeued)) == NULL) {
2092 res = -errno;
2093 pw_log_trace("%p: no more buffers: %m", stream);
2094 errno = -res;
2095 return NULL;
2096 }
2097 pw_log_trace("%p: dequeue buffer %d", stream, b->id);
2098
2099 if (b->busy && impl->direction == SPA_DIRECTION_OUTPUT) {
2100 if (ATOMIC_INC(b->busy->count) > 1) {
2101 ATOMIC_DEC(b->busy->count);
2102 push_queue(impl, &impl->dequeued, b);
2103 pw_log_trace("%p: buffer busy", stream);
2104 errno = EBUSY;
2105 return NULL;
2106 }
2107 }
2108 return &b->this;
2109 }
2110
2111 SPA_EXPORT
pw_stream_queue_buffer(struct pw_stream * stream,struct pw_buffer * buffer)2112 int pw_stream_queue_buffer(struct pw_stream *stream, struct pw_buffer *buffer)
2113 {
2114 struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
2115 struct buffer *b = SPA_CONTAINER_OF(buffer, struct buffer, this);
2116 int res;
2117
2118 if (b->busy)
2119 ATOMIC_DEC(b->busy->count);
2120
2121 pw_log_trace("%p: queue buffer %d", stream, b->id);
2122 if ((res = push_queue(impl, &impl->queued, b)) < 0)
2123 return res;
2124
2125 if (impl->direction == SPA_DIRECTION_OUTPUT &&
2126 impl->driving && !impl->using_trigger) {
2127 pw_log_debug("deprecated: use pw_stream_trigger_process() to drive the stream.");
2128 res = pw_loop_invoke(impl->context->data_loop,
2129 do_trigger_deprecated, 1, NULL, 0, false, impl);
2130 }
2131 return res;
2132 }
2133
2134 static int
do_flush(struct spa_loop * loop,bool async,uint32_t seq,const void * data,size_t size,void * user_data)2135 do_flush(struct spa_loop *loop,
2136 bool async, uint32_t seq, const void *data, size_t size, void *user_data)
2137 {
2138 struct stream *impl = user_data;
2139 struct buffer *b;
2140
2141 pw_log_trace("%p: flush", impl);
2142 do {
2143 b = pop_queue(impl, &impl->queued);
2144 if (b != NULL)
2145 push_queue(impl, &impl->dequeued, b);
2146 }
2147 while (b);
2148
2149 impl->queued.outcount = impl->dequeued.incount =
2150 impl->dequeued.outcount = impl->queued.incount = 0;
2151
2152 return 0;
2153 }
2154 static int
do_drain(struct spa_loop * loop,bool async,uint32_t seq,const void * data,size_t size,void * user_data)2155 do_drain(struct spa_loop *loop,
2156 bool async, uint32_t seq, const void *data, size_t size, void *user_data)
2157 {
2158 struct stream *impl = user_data;
2159 pw_log_trace("%p", impl);
2160 impl->draining = true;
2161 impl->drained = false;
2162 return 0;
2163 }
2164
2165 SPA_EXPORT
pw_stream_flush(struct pw_stream * stream,bool drain)2166 int pw_stream_flush(struct pw_stream *stream, bool drain)
2167 {
2168 struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
2169 pw_loop_invoke(impl->context->data_loop,
2170 drain ? do_drain : do_flush, 1, NULL, 0, true, impl);
2171 if (!drain)
2172 spa_node_send_command(impl->node->node,
2173 &SPA_NODE_COMMAND_INIT(SPA_NODE_COMMAND_Flush));
2174 return 0;
2175 }
2176
2177 SPA_EXPORT
pw_stream_is_driving(struct pw_stream * stream)2178 bool pw_stream_is_driving(struct pw_stream *stream)
2179 {
2180 struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
2181 return impl->driving;
2182 }
2183
2184 static int
do_trigger_process(struct spa_loop * loop,bool async,uint32_t seq,const void * data,size_t size,void * user_data)2185 do_trigger_process(struct spa_loop *loop,
2186 bool async, uint32_t seq, const void *data, size_t size, void *user_data)
2187 {
2188 struct stream *impl = user_data;
2189 int res;
2190 if (impl->direction == SPA_DIRECTION_OUTPUT) {
2191 if (impl->process_rt)
2192 spa_callbacks_call(&impl->rt_callbacks, struct pw_stream_events, process, 0);
2193 res = impl->node_methods.process(impl);
2194 } else {
2195 res = SPA_STATUS_NEED_DATA;
2196 }
2197 return spa_node_call_ready(&impl->callbacks, res);
2198 }
2199
trigger_request_process(struct stream * impl)2200 static int trigger_request_process(struct stream *impl)
2201 {
2202 uint8_t buffer[1024];
2203 struct spa_pod_builder b = { 0 };
2204
2205 spa_pod_builder_init(&b, buffer, sizeof(buffer));
2206 spa_node_emit_event(&impl->hooks,
2207 spa_pod_builder_add_object(&b,
2208 SPA_TYPE_EVENT_Node, SPA_NODE_EVENT_RequestProcess));
2209 return 0;
2210 }
2211
2212 SPA_EXPORT
pw_stream_trigger_process(struct pw_stream * stream)2213 int pw_stream_trigger_process(struct pw_stream *stream)
2214 {
2215 struct stream *impl = SPA_CONTAINER_OF(stream, struct stream, this);
2216 int res = 0;
2217
2218 pw_log_trace("%p", impl);
2219
2220 /* flag to check for old or new behaviour */
2221 impl->using_trigger = true;
2222
2223 if (!impl->driving && !impl->trigger) {
2224 res = trigger_request_process(impl);
2225 } else {
2226 if (impl->direction == SPA_DIRECTION_OUTPUT &&
2227 !impl->process_rt) {
2228 pw_loop_invoke(impl->context->main_loop,
2229 do_call_process, 1, NULL, 0, false, impl);
2230 }
2231 res = pw_loop_invoke(impl->context->data_loop,
2232 do_trigger_process, 1, NULL, 0, false, impl);
2233 }
2234 return res;
2235 }
2236