1 /* 2 * Copyright © 2011 Mozilla Foundation 3 * 4 * This program is made available under an ISC-style license. See the 5 * accompanying file LICENSE for details. 6 */ 7 #undef NDEBUG 8 #include <assert.h> 9 #include <dlfcn.h> 10 #include <stdlib.h> 11 #include <pulse/pulseaudio.h> 12 #include <string.h> 13 #include "cubeb/cubeb.h" 14 #include "cubeb-internal.h" 15 #include <stdio.h> 16 17 #ifdef DISABLE_LIBPULSE_DLOPEN 18 #define WRAP(x) x 19 #else 20 #define WRAP(x) cubeb_##x 21 #define LIBPULSE_API_VISIT(X) \ 22 X(pa_channel_map_can_balance) \ 23 X(pa_channel_map_init_auto) \ 24 X(pa_context_connect) \ 25 X(pa_context_disconnect) \ 26 X(pa_context_drain) \ 27 X(pa_context_get_server_info) \ 28 X(pa_context_get_sink_info_by_name) \ 29 X(pa_context_get_sink_info_list) \ 30 X(pa_context_get_source_info_list) \ 31 X(pa_context_get_state) \ 32 X(pa_context_new) \ 33 X(pa_context_rttime_new) \ 34 X(pa_context_set_sink_input_volume) \ 35 X(pa_context_set_state_callback) \ 36 X(pa_context_unref) \ 37 X(pa_cvolume_set) \ 38 X(pa_cvolume_set_balance) \ 39 X(pa_frame_size) \ 40 X(pa_operation_get_state) \ 41 X(pa_operation_unref) \ 42 X(pa_proplist_gets) \ 43 X(pa_rtclock_now) \ 44 X(pa_stream_begin_write) \ 45 X(pa_stream_cancel_write) \ 46 X(pa_stream_connect_playback) \ 47 X(pa_stream_cork) \ 48 X(pa_stream_disconnect) \ 49 X(pa_stream_get_channel_map) \ 50 X(pa_stream_get_index) \ 51 X(pa_stream_get_latency) \ 52 X(pa_stream_get_sample_spec) \ 53 X(pa_stream_get_state) \ 54 X(pa_stream_get_time) \ 55 X(pa_stream_new) \ 56 X(pa_stream_set_state_callback) \ 57 X(pa_stream_set_write_callback) \ 58 X(pa_stream_unref) \ 59 X(pa_stream_update_timing_info) \ 60 X(pa_stream_write) \ 61 X(pa_sw_volume_from_linear) \ 62 X(pa_threaded_mainloop_free) \ 63 X(pa_threaded_mainloop_get_api) \ 64 X(pa_threaded_mainloop_in_thread) \ 65 X(pa_threaded_mainloop_lock) \ 66 X(pa_threaded_mainloop_new) \ 67 X(pa_threaded_mainloop_signal) \ 68 X(pa_threaded_mainloop_start) \ 69 X(pa_threaded_mainloop_stop) \ 70 X(pa_threaded_mainloop_unlock) \ 71 X(pa_threaded_mainloop_wait) \ 72 X(pa_usec_to_bytes) \ 73 X(pa_stream_set_read_callback) \ 74 X(pa_stream_connect_record) \ 75 X(pa_stream_readable_size) \ 76 X(pa_stream_writable_size) \ 77 X(pa_stream_peek) \ 78 X(pa_stream_drop) \ 79 X(pa_stream_get_buffer_attr) \ 80 X(pa_stream_get_device_name) \ 81 X(pa_context_set_subscribe_callback) \ 82 X(pa_context_subscribe) \ 83 X(pa_mainloop_api_once) \ 84 85 #define MAKE_TYPEDEF(x) static typeof(x) * cubeb_##x; 86 LIBPULSE_API_VISIT(MAKE_TYPEDEF); 87 #undef MAKE_TYPEDEF 88 #endif 89 90 static struct cubeb_ops const pulse_ops; 91 92 struct cubeb { 93 struct cubeb_ops const * ops; 94 void * libpulse; 95 pa_threaded_mainloop * mainloop; 96 pa_context * context; 97 pa_sink_info * default_sink_info; 98 char * context_name; 99 int error; 100 cubeb_device_collection_changed_callback collection_changed_callback; 101 void * collection_changed_user_ptr; 102 }; 103 104 struct cubeb_stream { 105 cubeb * context; 106 pa_stream * output_stream; 107 pa_stream * input_stream; 108 cubeb_data_callback data_callback; 109 cubeb_state_callback state_callback; 110 void * user_ptr; 111 pa_time_event * drain_timer; 112 pa_sample_spec output_sample_spec; 113 pa_sample_spec input_sample_spec; 114 int shutdown; 115 float volume; 116 cubeb_state state; 117 }; 118 119 static const float PULSE_NO_GAIN = -1.0; 120 121 enum cork_state { 122 UNCORK = 0, 123 CORK = 1 << 0, 124 NOTIFY = 1 << 1 125 }; 126 127 static void 128 sink_info_callback(pa_context * context, const pa_sink_info * info, int eol, void * u) 129 { 130 (void)context; 131 cubeb * ctx = u; 132 if (!eol) { 133 free(ctx->default_sink_info); 134 ctx->default_sink_info = malloc(sizeof(pa_sink_info)); 135 memcpy(ctx->default_sink_info, info, sizeof(pa_sink_info)); 136 } 137 WRAP(pa_threaded_mainloop_signal)(ctx->mainloop, 0); 138 } 139 140 static void 141 server_info_callback(pa_context * context, const pa_server_info * info, void * u) 142 { 143 WRAP(pa_context_get_sink_info_by_name)(context, info->default_sink_name, sink_info_callback, u); 144 } 145 146 static void 147 context_state_callback(pa_context * c, void * u) 148 { 149 cubeb * ctx = u; 150 if (!PA_CONTEXT_IS_GOOD(WRAP(pa_context_get_state)(c))) { 151 ctx->error = 1; 152 } 153 WRAP(pa_threaded_mainloop_signal)(ctx->mainloop, 0); 154 } 155 156 static void 157 context_notify_callback(pa_context * c, void * u) 158 { 159 (void)c; 160 cubeb * ctx = u; 161 WRAP(pa_threaded_mainloop_signal)(ctx->mainloop, 0); 162 } 163 164 static void 165 stream_success_callback(pa_stream * s, int success, void * u) 166 { 167 (void)s; 168 (void)success; 169 cubeb_stream * stm = u; 170 WRAP(pa_threaded_mainloop_signal)(stm->context->mainloop, 0); 171 } 172 173 static void 174 stream_state_change_callback(cubeb_stream * stm, cubeb_state s) 175 { 176 stm->state = s; 177 stm->state_callback(stm, stm->user_ptr, s); 178 } 179 180 static void 181 stream_drain_callback(pa_mainloop_api * a, pa_time_event * e, struct timeval const * tv, void * u) 182 { 183 (void)a; 184 (void)tv; 185 cubeb_stream * stm = u; 186 assert(stm->drain_timer == e); 187 stream_state_change_callback(stm, CUBEB_STATE_DRAINED); 188 /* there's no pa_rttime_free, so use this instead. */ 189 a->time_free(stm->drain_timer); 190 stm->drain_timer = NULL; 191 WRAP(pa_threaded_mainloop_signal)(stm->context->mainloop, 0); 192 } 193 194 static void 195 stream_state_callback(pa_stream * s, void * u) 196 { 197 cubeb_stream * stm = u; 198 if (!PA_STREAM_IS_GOOD(WRAP(pa_stream_get_state)(s))) { 199 stream_state_change_callback(stm, CUBEB_STATE_ERROR); 200 } 201 WRAP(pa_threaded_mainloop_signal)(stm->context->mainloop, 0); 202 } 203 204 static void 205 trigger_user_callback(pa_stream * s, void const * input_data, size_t nbytes, cubeb_stream * stm) 206 { 207 void * buffer; 208 size_t size; 209 int r; 210 long got; 211 size_t towrite, read_offset; 212 size_t frame_size; 213 214 frame_size = WRAP(pa_frame_size)(&stm->output_sample_spec); 215 assert(nbytes % frame_size == 0); 216 217 towrite = nbytes; 218 read_offset = 0; 219 while (towrite) { 220 size = towrite; 221 r = WRAP(pa_stream_begin_write)(s, &buffer, &size); 222 // Note: this has failed running under rr on occassion - needs investigation. 223 assert(r == 0); 224 assert(size > 0); 225 assert(size % frame_size == 0); 226 227 LOGV("Trigger user callback with output buffer size=%zd, read_offset=%zd", size, read_offset); 228 got = stm->data_callback(stm, stm->user_ptr, (uint8_t const *)input_data + read_offset, buffer, size / frame_size); 229 if (got < 0) { 230 WRAP(pa_stream_cancel_write)(s); 231 stm->shutdown = 1; 232 return; 233 } 234 // If more iterations move offset of read buffer 235 if (input_data) { 236 size_t in_frame_size = WRAP(pa_frame_size)(&stm->input_sample_spec); 237 read_offset += (size / frame_size) * in_frame_size; 238 } 239 240 if (stm->volume != PULSE_NO_GAIN) { 241 uint32_t samples = size * stm->output_sample_spec.channels / frame_size ; 242 243 if (stm->output_sample_spec.format == PA_SAMPLE_S16BE || 244 stm->output_sample_spec.format == PA_SAMPLE_S16LE) { 245 short * b = buffer; 246 for (uint32_t i = 0; i < samples; i++) { 247 b[i] *= stm->volume; 248 } 249 } else { 250 float * b = buffer; 251 for (uint32_t i = 0; i < samples; i++) { 252 b[i] *= stm->volume; 253 } 254 } 255 } 256 257 r = WRAP(pa_stream_write)(s, buffer, got * frame_size, NULL, 0, PA_SEEK_RELATIVE); 258 assert(r == 0); 259 260 if ((size_t) got < size / frame_size) { 261 pa_usec_t latency = 0; 262 r = WRAP(pa_stream_get_latency)(s, &latency, NULL); 263 if (r == -PA_ERR_NODATA) { 264 /* this needs a better guess. */ 265 latency = 100 * PA_USEC_PER_MSEC; 266 } 267 assert(r == 0 || r == -PA_ERR_NODATA); 268 /* pa_stream_drain is useless, see PA bug# 866. this is a workaround. */ 269 /* arbitrary safety margin: double the current latency. */ 270 assert(!stm->drain_timer); 271 stm->drain_timer = WRAP(pa_context_rttime_new)(stm->context->context, WRAP(pa_rtclock_now)() + 2 * latency, stream_drain_callback, stm); 272 stm->shutdown = 1; 273 return; 274 } 275 276 towrite -= size; 277 } 278 279 assert(towrite == 0); 280 } 281 282 static int 283 read_from_input(pa_stream * s, void const ** buffer, size_t * size) 284 { 285 size_t readable_size = WRAP(pa_stream_readable_size)(s); 286 if (readable_size > 0) { 287 if (WRAP(pa_stream_peek)(s, buffer, size) < 0) { 288 return -1; 289 } 290 } 291 return readable_size; 292 } 293 294 static void 295 stream_write_callback(pa_stream * s, size_t nbytes, void * u) 296 { 297 LOGV("Output callback to be written buffer size %zd", nbytes); 298 cubeb_stream * stm = u; 299 if (stm->shutdown || 300 stm->state != CUBEB_STATE_STARTED) { 301 return; 302 } 303 304 if (!stm->input_stream){ 305 // Output/playback only operation. 306 // Write directly to output 307 assert(!stm->input_stream && stm->output_stream); 308 trigger_user_callback(s, NULL, nbytes, stm); 309 } 310 } 311 312 static void 313 stream_read_callback(pa_stream * s, size_t nbytes, void * u) 314 { 315 LOGV("Input callback buffer size %zd", nbytes); 316 cubeb_stream * stm = u; 317 if (stm->shutdown) { 318 return; 319 } 320 321 void const * read_data = NULL; 322 size_t read_size; 323 while (read_from_input(s, &read_data, &read_size) > 0) { 324 /* read_data can be NULL in case of a hole. */ 325 if (read_data) { 326 size_t in_frame_size = WRAP(pa_frame_size)(&stm->input_sample_spec); 327 size_t read_frames = read_size / in_frame_size; 328 329 if (stm->output_stream) { 330 // input/capture + output/playback operation 331 size_t out_frame_size = WRAP(pa_frame_size)(&stm->output_sample_spec); 332 size_t write_size = read_frames * out_frame_size; 333 // Offer full duplex data for writing 334 trigger_user_callback(stm->output_stream, read_data, write_size, stm); 335 } else { 336 // input/capture only operation. Call callback directly 337 long got = stm->data_callback(stm, stm->user_ptr, read_data, NULL, read_frames); 338 if (got < 0 || (size_t) got != read_frames) { 339 WRAP(pa_stream_cancel_write)(s); 340 stm->shutdown = 1; 341 break; 342 } 343 } 344 } 345 if (read_size > 0) { 346 WRAP(pa_stream_drop)(s); 347 } 348 349 if (stm->shutdown) { 350 return; 351 } 352 } 353 } 354 355 static int 356 wait_until_context_ready(cubeb * ctx) 357 { 358 for (;;) { 359 pa_context_state_t state = WRAP(pa_context_get_state)(ctx->context); 360 if (!PA_CONTEXT_IS_GOOD(state)) 361 return -1; 362 if (state == PA_CONTEXT_READY) 363 break; 364 WRAP(pa_threaded_mainloop_wait)(ctx->mainloop); 365 } 366 return 0; 367 } 368 369 static int 370 wait_until_io_stream_ready(pa_stream * stream, pa_threaded_mainloop * mainloop) 371 { 372 if (!stream || !mainloop){ 373 return -1; 374 } 375 for (;;) { 376 pa_stream_state_t state = WRAP(pa_stream_get_state)(stream); 377 if (!PA_STREAM_IS_GOOD(state)) 378 return -1; 379 if (state == PA_STREAM_READY) 380 break; 381 WRAP(pa_threaded_mainloop_wait)(mainloop); 382 } 383 return 0; 384 } 385 386 static int 387 wait_until_stream_ready(cubeb_stream * stm) 388 { 389 if (stm->output_stream && 390 wait_until_io_stream_ready(stm->output_stream, stm->context->mainloop) == -1) { 391 return -1; 392 } 393 if(stm->input_stream && 394 wait_until_io_stream_ready(stm->input_stream, stm->context->mainloop) == -1) { 395 return -1; 396 } 397 return 0; 398 } 399 400 static int 401 operation_wait(cubeb * ctx, pa_stream * stream, pa_operation * o) 402 { 403 while (WRAP(pa_operation_get_state)(o) == PA_OPERATION_RUNNING) { 404 WRAP(pa_threaded_mainloop_wait)(ctx->mainloop); 405 if (!PA_CONTEXT_IS_GOOD(WRAP(pa_context_get_state)(ctx->context))) { 406 return -1; 407 } 408 if (stream && !PA_STREAM_IS_GOOD(WRAP(pa_stream_get_state)(stream))) { 409 return -1; 410 } 411 } 412 return 0; 413 } 414 415 static void 416 cork_io_stream(cubeb_stream * stm, pa_stream * io_stream, enum cork_state state) 417 { 418 pa_operation * o; 419 if (!io_stream) { 420 return; 421 } 422 o = WRAP(pa_stream_cork)(io_stream, state & CORK, stream_success_callback, stm); 423 if (o) { 424 operation_wait(stm->context, io_stream, o); 425 WRAP(pa_operation_unref)(o); 426 } 427 } 428 429 static void 430 stream_cork(cubeb_stream * stm, enum cork_state state) 431 { 432 WRAP(pa_threaded_mainloop_lock)(stm->context->mainloop); 433 cork_io_stream(stm, stm->output_stream, state); 434 cork_io_stream(stm, stm->input_stream, state); 435 WRAP(pa_threaded_mainloop_unlock)(stm->context->mainloop); 436 437 if (state & NOTIFY) { 438 stream_state_change_callback(stm, state & CORK ? CUBEB_STATE_STOPPED 439 : CUBEB_STATE_STARTED); 440 } 441 } 442 443 static int 444 stream_update_timing_info(cubeb_stream * stm) 445 { 446 int r = -1; 447 pa_operation * o = NULL; 448 if (stm->output_stream) { 449 o = WRAP(pa_stream_update_timing_info)(stm->output_stream, stream_success_callback, stm); 450 if (o) { 451 r = operation_wait(stm->context, stm->output_stream, o); 452 WRAP(pa_operation_unref)(o); 453 } 454 if (r != 0) { 455 return r; 456 } 457 } 458 459 if (stm->input_stream) { 460 o = WRAP(pa_stream_update_timing_info)(stm->input_stream, stream_success_callback, stm); 461 if (o) { 462 r = operation_wait(stm->context, stm->input_stream, o); 463 WRAP(pa_operation_unref)(o); 464 } 465 } 466 467 return r; 468 } 469 470 static void pulse_context_destroy(cubeb * ctx); 471 static void pulse_destroy(cubeb * ctx); 472 473 static int 474 pulse_context_init(cubeb * ctx) 475 { 476 if (ctx->context) { 477 assert(ctx->error == 1); 478 pulse_context_destroy(ctx); 479 } 480 481 ctx->context = WRAP(pa_context_new)(WRAP(pa_threaded_mainloop_get_api)(ctx->mainloop), 482 ctx->context_name); 483 if (!ctx->context) { 484 return -1; 485 } 486 WRAP(pa_context_set_state_callback)(ctx->context, context_state_callback, ctx); 487 488 WRAP(pa_threaded_mainloop_lock)(ctx->mainloop); 489 WRAP(pa_context_connect)(ctx->context, NULL, 0, NULL); 490 491 if (wait_until_context_ready(ctx) != 0) { 492 WRAP(pa_threaded_mainloop_unlock)(ctx->mainloop); 493 pulse_context_destroy(ctx); 494 ctx->context = NULL; 495 return -1; 496 } 497 498 WRAP(pa_threaded_mainloop_unlock)(ctx->mainloop); 499 500 ctx->error = 0; 501 502 return 0; 503 } 504 505 /*static*/ int 506 pulse_init(cubeb ** context, char const * context_name) 507 { 508 void * libpulse = NULL; 509 cubeb * ctx; 510 511 *context = NULL; 512 513 #ifndef DISABLE_LIBPULSE_DLOPEN 514 libpulse = dlopen("libpulse.so.0", RTLD_LAZY); 515 if (!libpulse) { 516 return CUBEB_ERROR; 517 } 518 519 #define LOAD(x) { \ 520 cubeb_##x = dlsym(libpulse, #x); \ 521 if (!cubeb_##x) { \ 522 dlclose(libpulse); \ 523 return CUBEB_ERROR; \ 524 } \ 525 } 526 527 LIBPULSE_API_VISIT(LOAD); 528 #undef LOAD 529 #endif 530 531 ctx = calloc(1, sizeof(*ctx)); 532 assert(ctx); 533 534 ctx->ops = &pulse_ops; 535 ctx->libpulse = libpulse; 536 537 ctx->mainloop = WRAP(pa_threaded_mainloop_new)(); 538 ctx->default_sink_info = NULL; 539 540 WRAP(pa_threaded_mainloop_start)(ctx->mainloop); 541 542 ctx->context_name = context_name ? strdup(context_name) : NULL; 543 if (pulse_context_init(ctx) != 0) { 544 pulse_destroy(ctx); 545 return CUBEB_ERROR; 546 } 547 548 WRAP(pa_threaded_mainloop_lock)(ctx->mainloop); 549 WRAP(pa_context_get_server_info)(ctx->context, server_info_callback, ctx); 550 WRAP(pa_threaded_mainloop_unlock)(ctx->mainloop); 551 552 *context = ctx; 553 554 return CUBEB_OK; 555 } 556 557 static char const * 558 pulse_get_backend_id(cubeb * ctx) 559 { 560 (void)ctx; 561 return "pulse"; 562 } 563 564 static int 565 pulse_get_max_channel_count(cubeb * ctx, uint32_t * max_channels) 566 { 567 (void)ctx; 568 assert(ctx && max_channels); 569 570 WRAP(pa_threaded_mainloop_lock)(ctx->mainloop); 571 while (!ctx->default_sink_info) { 572 WRAP(pa_threaded_mainloop_wait)(ctx->mainloop); 573 } 574 WRAP(pa_threaded_mainloop_unlock)(ctx->mainloop); 575 576 *max_channels = ctx->default_sink_info->channel_map.channels; 577 578 return CUBEB_OK; 579 } 580 581 static int 582 pulse_get_preferred_sample_rate(cubeb * ctx, uint32_t * rate) 583 { 584 assert(ctx && rate); 585 (void)ctx; 586 587 WRAP(pa_threaded_mainloop_lock)(ctx->mainloop); 588 while (!ctx->default_sink_info) { 589 WRAP(pa_threaded_mainloop_wait)(ctx->mainloop); 590 } 591 WRAP(pa_threaded_mainloop_unlock)(ctx->mainloop); 592 593 *rate = ctx->default_sink_info->sample_spec.rate; 594 595 return CUBEB_OK; 596 } 597 598 static int 599 pulse_get_min_latency(cubeb * ctx, cubeb_stream_params params, uint32_t * latency_frames) 600 { 601 (void)ctx; 602 // According to PulseAudio developers, this is a safe minimum. 603 *latency_frames = 25 * params.rate / 1000; 604 605 return CUBEB_OK; 606 } 607 608 static void 609 pulse_context_destroy(cubeb * ctx) 610 { 611 pa_operation * o; 612 613 WRAP(pa_threaded_mainloop_lock)(ctx->mainloop); 614 o = WRAP(pa_context_drain)(ctx->context, context_notify_callback, ctx); 615 if (o) { 616 operation_wait(ctx, NULL, o); 617 WRAP(pa_operation_unref)(o); 618 } 619 WRAP(pa_context_set_state_callback)(ctx->context, NULL, NULL); 620 WRAP(pa_context_disconnect)(ctx->context); 621 WRAP(pa_context_unref)(ctx->context); 622 WRAP(pa_threaded_mainloop_unlock)(ctx->mainloop); 623 } 624 625 static void 626 pulse_destroy(cubeb * ctx) 627 { 628 if (ctx->context_name) { 629 free(ctx->context_name); 630 } 631 if (ctx->context) { 632 pulse_context_destroy(ctx); 633 } 634 635 if (ctx->mainloop) { 636 WRAP(pa_threaded_mainloop_stop)(ctx->mainloop); 637 WRAP(pa_threaded_mainloop_free)(ctx->mainloop); 638 } 639 640 if (ctx->libpulse) { 641 dlclose(ctx->libpulse); 642 } 643 if (ctx->default_sink_info) { 644 free(ctx->default_sink_info); 645 } 646 free(ctx); 647 } 648 649 static void pulse_stream_destroy(cubeb_stream * stm); 650 651 static pa_sample_format_t 652 to_pulse_format(cubeb_sample_format format) 653 { 654 switch (format) { 655 case CUBEB_SAMPLE_S16LE: 656 return PA_SAMPLE_S16LE; 657 case CUBEB_SAMPLE_S16BE: 658 return PA_SAMPLE_S16BE; 659 case CUBEB_SAMPLE_FLOAT32LE: 660 return PA_SAMPLE_FLOAT32LE; 661 case CUBEB_SAMPLE_FLOAT32BE: 662 return PA_SAMPLE_FLOAT32BE; 663 default: 664 return PA_SAMPLE_INVALID; 665 } 666 } 667 668 static int 669 create_pa_stream(cubeb_stream * stm, 670 pa_stream ** pa_stm, 671 cubeb_stream_params * stream_params, 672 char const * stream_name) 673 { 674 assert(stm && stream_params); 675 *pa_stm = NULL; 676 pa_sample_spec ss; 677 ss.format = to_pulse_format(stream_params->format); 678 if (ss.format == PA_SAMPLE_INVALID) 679 return CUBEB_ERROR_INVALID_FORMAT; 680 ss.rate = stream_params->rate; 681 ss.channels = stream_params->channels; 682 683 *pa_stm = WRAP(pa_stream_new)(stm->context->context, stream_name, &ss, NULL); 684 return (*pa_stm == NULL) ? CUBEB_ERROR : CUBEB_OK; 685 } 686 687 static pa_buffer_attr 688 set_buffering_attribute(unsigned int latency_frames, pa_sample_spec * sample_spec) 689 { 690 pa_buffer_attr battr; 691 battr.maxlength = -1; 692 battr.prebuf = -1; 693 battr.tlength = latency_frames * WRAP(pa_frame_size)(sample_spec); 694 battr.minreq = battr.tlength / 4; 695 battr.fragsize = battr.minreq; 696 697 LOG("Requested buffer attributes maxlength %u, tlength %u, prebuf %u, minreq %u, fragsize %u", 698 battr.maxlength, battr.tlength, battr.prebuf, battr.minreq, battr.fragsize); 699 700 return battr; 701 } 702 703 static int 704 pulse_stream_init(cubeb * context, 705 cubeb_stream ** stream, 706 char const * stream_name, 707 cubeb_devid input_device, 708 cubeb_stream_params * input_stream_params, 709 cubeb_devid output_device, 710 cubeb_stream_params * output_stream_params, 711 unsigned int latency_frames, 712 cubeb_data_callback data_callback, 713 cubeb_state_callback state_callback, 714 void * user_ptr) 715 { 716 cubeb_stream * stm; 717 pa_buffer_attr battr; 718 int r; 719 720 assert(context); 721 722 // If the connection failed for some reason, try to reconnect 723 if (context->error == 1 && pulse_context_init(context) != 0) { 724 return CUBEB_ERROR; 725 } 726 727 *stream = NULL; 728 729 stm = calloc(1, sizeof(*stm)); 730 assert(stm); 731 732 stm->context = context; 733 stm->data_callback = data_callback; 734 stm->state_callback = state_callback; 735 stm->user_ptr = user_ptr; 736 stm->volume = PULSE_NO_GAIN; 737 stm->state = -1; 738 assert(stm->shutdown == 0); 739 740 WRAP(pa_threaded_mainloop_lock)(stm->context->mainloop); 741 if (output_stream_params) { 742 r = create_pa_stream(stm, &stm->output_stream, output_stream_params, stream_name); 743 if (r != CUBEB_OK) { 744 WRAP(pa_threaded_mainloop_unlock)(stm->context->mainloop); 745 pulse_stream_destroy(stm); 746 return r; 747 } 748 749 stm->output_sample_spec = *(WRAP(pa_stream_get_sample_spec)(stm->output_stream)); 750 751 WRAP(pa_stream_set_state_callback)(stm->output_stream, stream_state_callback, stm); 752 WRAP(pa_stream_set_write_callback)(stm->output_stream, stream_write_callback, stm); 753 754 battr = set_buffering_attribute(latency_frames, &stm->output_sample_spec); 755 WRAP(pa_stream_connect_playback)(stm->output_stream, 756 output_device, 757 &battr, 758 PA_STREAM_AUTO_TIMING_UPDATE | PA_STREAM_INTERPOLATE_TIMING | 759 PA_STREAM_START_CORKED | PA_STREAM_ADJUST_LATENCY, 760 NULL, NULL); 761 } 762 763 // Set up input stream 764 if (input_stream_params) { 765 r = create_pa_stream(stm, &stm->input_stream, input_stream_params, stream_name); 766 if (r != CUBEB_OK) { 767 WRAP(pa_threaded_mainloop_unlock)(stm->context->mainloop); 768 pulse_stream_destroy(stm); 769 return r; 770 } 771 772 stm->input_sample_spec = *(WRAP(pa_stream_get_sample_spec)(stm->input_stream)); 773 774 WRAP(pa_stream_set_state_callback)(stm->input_stream, stream_state_callback, stm); 775 WRAP(pa_stream_set_read_callback)(stm->input_stream, stream_read_callback, stm); 776 777 battr = set_buffering_attribute(latency_frames, &stm->input_sample_spec); 778 WRAP(pa_stream_connect_record)(stm->input_stream, 779 input_device, 780 &battr, 781 PA_STREAM_AUTO_TIMING_UPDATE | PA_STREAM_INTERPOLATE_TIMING | 782 PA_STREAM_START_CORKED | PA_STREAM_ADJUST_LATENCY); 783 } 784 785 r = wait_until_stream_ready(stm); 786 if (r == 0) { 787 /* force a timing update now, otherwise timing info does not become valid 788 until some point after initialization has completed. */ 789 r = stream_update_timing_info(stm); 790 } 791 792 WRAP(pa_threaded_mainloop_unlock)(stm->context->mainloop); 793 794 if (r != 0) { 795 pulse_stream_destroy(stm); 796 return CUBEB_ERROR; 797 } 798 799 if (g_log_level) { 800 if (output_stream_params){ 801 const pa_buffer_attr * output_att; 802 output_att = WRAP(pa_stream_get_buffer_attr)(stm->output_stream); 803 LOG("Output buffer attributes maxlength %u, tlength %u, prebuf %u, minreq %u, fragsize %u",output_att->maxlength, output_att->tlength, 804 output_att->prebuf, output_att->minreq, output_att->fragsize); 805 } 806 807 if (input_stream_params){ 808 const pa_buffer_attr * input_att; 809 input_att = WRAP(pa_stream_get_buffer_attr)(stm->input_stream); 810 LOG("Input buffer attributes maxlength %u, tlength %u, prebuf %u, minreq %u, fragsize %u",input_att->maxlength, input_att->tlength, 811 input_att->prebuf, input_att->minreq, input_att->fragsize); 812 } 813 } 814 815 *stream = stm; 816 817 return CUBEB_OK; 818 } 819 820 static void 821 pulse_stream_destroy(cubeb_stream * stm) 822 { 823 stream_cork(stm, CORK); 824 825 WRAP(pa_threaded_mainloop_lock)(stm->context->mainloop); 826 if (stm->output_stream) { 827 828 if (stm->drain_timer) { 829 /* there's no pa_rttime_free, so use this instead. */ 830 WRAP(pa_threaded_mainloop_get_api)(stm->context->mainloop)->time_free(stm->drain_timer); 831 } 832 833 WRAP(pa_stream_set_state_callback)(stm->output_stream, NULL, NULL); 834 WRAP(pa_stream_set_write_callback)(stm->output_stream, NULL, NULL); 835 WRAP(pa_stream_disconnect)(stm->output_stream); 836 WRAP(pa_stream_unref)(stm->output_stream); 837 } 838 839 if (stm->input_stream) { 840 WRAP(pa_stream_set_state_callback)(stm->input_stream, NULL, NULL); 841 WRAP(pa_stream_set_read_callback)(stm->input_stream, NULL, NULL); 842 WRAP(pa_stream_disconnect)(stm->input_stream); 843 WRAP(pa_stream_unref)(stm->input_stream); 844 } 845 WRAP(pa_threaded_mainloop_unlock)(stm->context->mainloop); 846 847 free(stm); 848 } 849 850 static void 851 pulse_defer_event_cb(pa_mainloop_api * a, void * userdata) 852 { 853 (void)a; 854 cubeb_stream * stm = userdata; 855 if (stm->shutdown) { 856 return; 857 } 858 size_t writable_size = WRAP(pa_stream_writable_size)(stm->output_stream); 859 trigger_user_callback(stm->output_stream, NULL, writable_size, stm); 860 } 861 862 static int 863 pulse_stream_start(cubeb_stream * stm) 864 { 865 stm->shutdown = 0; 866 stream_cork(stm, UNCORK | NOTIFY); 867 868 if (stm->output_stream && !stm->input_stream) { 869 /* On output only case need to manually call user cb once in order to make 870 * things roll. This is done via a defer event in order to execute it 871 * from PA server thread. */ 872 WRAP(pa_threaded_mainloop_lock)(stm->context->mainloop); 873 WRAP(pa_mainloop_api_once)(WRAP(pa_threaded_mainloop_get_api)(stm->context->mainloop), 874 pulse_defer_event_cb, stm); 875 WRAP(pa_threaded_mainloop_unlock)(stm->context->mainloop); 876 } 877 878 return CUBEB_OK; 879 } 880 881 static int 882 pulse_stream_stop(cubeb_stream * stm) 883 { 884 WRAP(pa_threaded_mainloop_lock)(stm->context->mainloop); 885 stm->shutdown = 1; 886 // If draining is taking place wait to finish 887 while (stm->drain_timer) { 888 WRAP(pa_threaded_mainloop_wait)(stm->context->mainloop); 889 } 890 WRAP(pa_threaded_mainloop_unlock)(stm->context->mainloop); 891 892 stream_cork(stm, CORK | NOTIFY); 893 return CUBEB_OK; 894 } 895 896 static int 897 pulse_stream_get_position(cubeb_stream * stm, uint64_t * position) 898 { 899 int r, in_thread; 900 pa_usec_t r_usec; 901 uint64_t bytes; 902 903 if (!stm || !stm->output_stream) { 904 return CUBEB_ERROR; 905 } 906 907 in_thread = WRAP(pa_threaded_mainloop_in_thread)(stm->context->mainloop); 908 909 if (!in_thread) { 910 WRAP(pa_threaded_mainloop_lock)(stm->context->mainloop); 911 } 912 r = WRAP(pa_stream_get_time)(stm->output_stream, &r_usec); 913 if (!in_thread) { 914 WRAP(pa_threaded_mainloop_unlock)(stm->context->mainloop); 915 } 916 917 if (r != 0) { 918 return CUBEB_ERROR; 919 } 920 921 bytes = WRAP(pa_usec_to_bytes)(r_usec, &stm->output_sample_spec); 922 *position = bytes / WRAP(pa_frame_size)(&stm->output_sample_spec); 923 924 return CUBEB_OK; 925 } 926 927 static int 928 pulse_stream_get_latency(cubeb_stream * stm, uint32_t * latency) 929 { 930 pa_usec_t r_usec; 931 int negative, r; 932 933 if (!stm || !stm->output_stream) { 934 return CUBEB_ERROR; 935 } 936 937 r = WRAP(pa_stream_get_latency)(stm->output_stream, &r_usec, &negative); 938 assert(!negative); 939 if (r) { 940 return CUBEB_ERROR; 941 } 942 943 *latency = r_usec * stm->output_sample_spec.rate / PA_USEC_PER_SEC; 944 return CUBEB_OK; 945 } 946 947 static void 948 volume_success(pa_context *c, int success, void *userdata) 949 { 950 (void)success; 951 (void)c; 952 cubeb_stream * stream = userdata; 953 assert(success); 954 WRAP(pa_threaded_mainloop_signal)(stream->context->mainloop, 0); 955 } 956 957 static int 958 pulse_stream_set_volume(cubeb_stream * stm, float volume) 959 { 960 uint32_t index; 961 pa_operation * op; 962 pa_volume_t vol; 963 pa_cvolume cvol; 964 const pa_sample_spec * ss; 965 966 if (!stm->output_stream) { 967 return CUBEB_ERROR; 968 } 969 970 WRAP(pa_threaded_mainloop_lock)(stm->context->mainloop); 971 972 while (!stm->context->default_sink_info) { 973 WRAP(pa_threaded_mainloop_wait)(stm->context->mainloop); 974 } 975 976 /* if the pulse daemon is configured to use flat volumes, 977 * apply our own gain instead of changing the input volume on the sink. */ 978 if (stm->context->default_sink_info->flags & PA_SINK_FLAT_VOLUME) { 979 stm->volume = volume; 980 } else { 981 ss = WRAP(pa_stream_get_sample_spec)(stm->output_stream); 982 983 vol = WRAP(pa_sw_volume_from_linear)(volume); 984 WRAP(pa_cvolume_set)(&cvol, ss->channels, vol); 985 986 index = WRAP(pa_stream_get_index)(stm->output_stream); 987 988 op = WRAP(pa_context_set_sink_input_volume)(stm->context->context, 989 index, &cvol, volume_success, 990 stm); 991 if (op) { 992 operation_wait(stm->context, stm->output_stream, op); 993 WRAP(pa_operation_unref)(op); 994 } 995 } 996 997 WRAP(pa_threaded_mainloop_unlock)(stm->context->mainloop); 998 999 return CUBEB_OK; 1000 } 1001 1002 static int 1003 pulse_stream_set_panning(cubeb_stream * stream, float panning) 1004 { 1005 const pa_channel_map * map; 1006 pa_cvolume vol; 1007 1008 if (!stream->output_stream) { 1009 return CUBEB_ERROR; 1010 } 1011 1012 map = WRAP(pa_stream_get_channel_map)(stream->output_stream); 1013 1014 if (!WRAP(pa_channel_map_can_balance)(map)) { 1015 return CUBEB_ERROR; 1016 } 1017 1018 WRAP(pa_cvolume_set_balance)(&vol, map, panning); 1019 1020 return CUBEB_OK; 1021 } 1022 1023 typedef struct { 1024 char * default_sink_name; 1025 char * default_source_name; 1026 1027 cubeb_device_info ** devinfo; 1028 uint32_t max; 1029 uint32_t count; 1030 cubeb * context; 1031 } pulse_dev_list_data; 1032 1033 static cubeb_device_fmt 1034 pulse_format_to_cubeb_format(pa_sample_format_t format) 1035 { 1036 switch (format) { 1037 case PA_SAMPLE_S16LE: 1038 return CUBEB_DEVICE_FMT_S16LE; 1039 case PA_SAMPLE_S16BE: 1040 return CUBEB_DEVICE_FMT_S16BE; 1041 case PA_SAMPLE_FLOAT32LE: 1042 return CUBEB_DEVICE_FMT_F32LE; 1043 case PA_SAMPLE_FLOAT32BE: 1044 return CUBEB_DEVICE_FMT_F32BE; 1045 default: 1046 return CUBEB_DEVICE_FMT_F32NE; 1047 } 1048 } 1049 1050 static void 1051 pulse_ensure_dev_list_data_list_size (pulse_dev_list_data * list_data) 1052 { 1053 if (list_data->count == list_data->max) { 1054 list_data->max += 8; 1055 list_data->devinfo = realloc(list_data->devinfo, 1056 sizeof(cubeb_device_info *) * list_data->max); 1057 } 1058 } 1059 1060 static cubeb_device_state 1061 pulse_get_state_from_sink_port(pa_sink_port_info * info) 1062 { 1063 if (info != NULL) { 1064 #if PA_CHECK_VERSION(2, 0, 0) 1065 if (info->available == PA_PORT_AVAILABLE_NO) 1066 return CUBEB_DEVICE_STATE_UNPLUGGED; 1067 else /*if (info->available == PA_PORT_AVAILABLE_YES) + UNKNOWN */ 1068 #endif 1069 return CUBEB_DEVICE_STATE_ENABLED; 1070 } 1071 1072 return CUBEB_DEVICE_STATE_DISABLED; 1073 } 1074 1075 static void 1076 pulse_sink_info_cb(pa_context * context, const pa_sink_info * info, 1077 int eol, void * user_data) 1078 { 1079 pulse_dev_list_data * list_data = user_data; 1080 cubeb_device_info * devinfo; 1081 const char * prop; 1082 1083 (void)context; 1084 1085 if (eol || info == NULL) 1086 return; 1087 1088 devinfo = calloc(1, sizeof(cubeb_device_info)); 1089 1090 devinfo->device_id = strdup(info->name); 1091 devinfo->devid = devinfo->device_id; 1092 devinfo->friendly_name = strdup(info->description); 1093 prop = WRAP(pa_proplist_gets)(info->proplist, "sysfs.path"); 1094 if (prop) 1095 devinfo->group_id = strdup(prop); 1096 prop = WRAP(pa_proplist_gets)(info->proplist, "device.vendor.name"); 1097 if (prop) 1098 devinfo->vendor_name = strdup(prop); 1099 1100 devinfo->type = CUBEB_DEVICE_TYPE_OUTPUT; 1101 devinfo->state = pulse_get_state_from_sink_port(info->active_port); 1102 devinfo->preferred = strcmp(info->name, list_data->default_sink_name) == 0; 1103 1104 devinfo->format = CUBEB_DEVICE_FMT_ALL; 1105 devinfo->default_format = pulse_format_to_cubeb_format(info->sample_spec.format); 1106 devinfo->max_channels = info->channel_map.channels; 1107 devinfo->min_rate = 1; 1108 devinfo->max_rate = PA_RATE_MAX; 1109 devinfo->default_rate = info->sample_spec.rate; 1110 1111 devinfo->latency_lo = 0; 1112 devinfo->latency_hi = 0; 1113 1114 pulse_ensure_dev_list_data_list_size (list_data); 1115 list_data->devinfo[list_data->count++] = devinfo; 1116 1117 WRAP(pa_threaded_mainloop_signal)(list_data->context->mainloop, 0); 1118 } 1119 1120 static cubeb_device_state 1121 pulse_get_state_from_source_port(pa_source_port_info * info) 1122 { 1123 if (info != NULL) { 1124 #if PA_CHECK_VERSION(2, 0, 0) 1125 if (info->available == PA_PORT_AVAILABLE_NO) 1126 return CUBEB_DEVICE_STATE_UNPLUGGED; 1127 else /*if (info->available == PA_PORT_AVAILABLE_YES) + UNKNOWN */ 1128 #endif 1129 return CUBEB_DEVICE_STATE_ENABLED; 1130 } 1131 1132 return CUBEB_DEVICE_STATE_DISABLED; 1133 } 1134 1135 static void 1136 pulse_source_info_cb(pa_context * context, const pa_source_info * info, 1137 int eol, void * user_data) 1138 { 1139 pulse_dev_list_data * list_data = user_data; 1140 cubeb_device_info * devinfo; 1141 const char * prop; 1142 1143 (void)context; 1144 1145 if (eol) 1146 return; 1147 1148 devinfo = calloc(1, sizeof(cubeb_device_info)); 1149 1150 devinfo->device_id = strdup(info->name); 1151 devinfo->devid = devinfo->device_id; 1152 devinfo->friendly_name = strdup(info->description); 1153 prop = WRAP(pa_proplist_gets)(info->proplist, "sysfs.path"); 1154 if (prop) 1155 devinfo->group_id = strdup(prop); 1156 prop = WRAP(pa_proplist_gets)(info->proplist, "device.vendor.name"); 1157 if (prop) 1158 devinfo->vendor_name = strdup(prop); 1159 1160 devinfo->type = CUBEB_DEVICE_TYPE_INPUT; 1161 devinfo->state = pulse_get_state_from_source_port(info->active_port); 1162 devinfo->preferred = strcmp(info->name, list_data->default_source_name) == 0; 1163 1164 devinfo->format = CUBEB_DEVICE_FMT_ALL; 1165 devinfo->default_format = pulse_format_to_cubeb_format(info->sample_spec.format); 1166 devinfo->max_channels = info->channel_map.channels; 1167 devinfo->min_rate = 1; 1168 devinfo->max_rate = PA_RATE_MAX; 1169 devinfo->default_rate = info->sample_spec.rate; 1170 1171 devinfo->latency_lo = 0; 1172 devinfo->latency_hi = 0; 1173 1174 pulse_ensure_dev_list_data_list_size (list_data); 1175 list_data->devinfo[list_data->count++] = devinfo; 1176 1177 WRAP(pa_threaded_mainloop_signal)(list_data->context->mainloop, 0); 1178 } 1179 1180 static void 1181 pulse_server_info_cb(pa_context * c, const pa_server_info * i, void * userdata) 1182 { 1183 pulse_dev_list_data * list_data = userdata; 1184 1185 (void)c; 1186 1187 free(list_data->default_sink_name); 1188 free(list_data->default_source_name); 1189 list_data->default_sink_name = strdup(i->default_sink_name); 1190 list_data->default_source_name = strdup(i->default_source_name); 1191 1192 WRAP(pa_threaded_mainloop_signal)(list_data->context->mainloop, 0); 1193 } 1194 1195 static int 1196 pulse_enumerate_devices(cubeb * context, cubeb_device_type type, 1197 cubeb_device_collection ** collection) 1198 { 1199 pulse_dev_list_data user_data = { NULL, NULL, NULL, 0, 0, context }; 1200 pa_operation * o; 1201 uint32_t i; 1202 1203 WRAP(pa_threaded_mainloop_lock)(context->mainloop); 1204 1205 o = WRAP(pa_context_get_server_info)(context->context, 1206 pulse_server_info_cb, &user_data); 1207 if (o) { 1208 operation_wait(context, NULL, o); 1209 WRAP(pa_operation_unref)(o); 1210 } 1211 1212 if (type & CUBEB_DEVICE_TYPE_OUTPUT) { 1213 o = WRAP(pa_context_get_sink_info_list)(context->context, 1214 pulse_sink_info_cb, &user_data); 1215 if (o) { 1216 operation_wait(context, NULL, o); 1217 WRAP(pa_operation_unref)(o); 1218 } 1219 } 1220 1221 if (type & CUBEB_DEVICE_TYPE_INPUT) { 1222 o = WRAP(pa_context_get_source_info_list)(context->context, 1223 pulse_source_info_cb, &user_data); 1224 if (o) { 1225 operation_wait(context, NULL, o); 1226 WRAP(pa_operation_unref)(o); 1227 } 1228 } 1229 1230 WRAP(pa_threaded_mainloop_unlock)(context->mainloop); 1231 1232 *collection = malloc(sizeof(cubeb_device_collection) + 1233 sizeof(cubeb_device_info *) * (user_data.count > 0 ? user_data.count - 1 : 0)); 1234 (*collection)->count = user_data.count; 1235 for (i = 0; i < user_data.count; i++) 1236 (*collection)->device[i] = user_data.devinfo[i]; 1237 1238 free(user_data.default_sink_name); 1239 free(user_data.default_source_name); 1240 free(user_data.devinfo); 1241 return CUBEB_OK; 1242 } 1243 1244 static int 1245 pulse_stream_get_current_device(cubeb_stream * stm, cubeb_device ** const device) 1246 { 1247 #if PA_CHECK_VERSION(0, 9, 8) 1248 *device = calloc(1, sizeof(cubeb_device)); 1249 if (*device == NULL) 1250 return CUBEB_ERROR; 1251 1252 if (stm->input_stream) { 1253 const char * name = WRAP(pa_stream_get_device_name)(stm->input_stream); 1254 (*device)->input_name = (name == NULL) ? NULL : strdup(name); 1255 } 1256 1257 if (stm->output_stream) { 1258 const char * name = WRAP(pa_stream_get_device_name)(stm->output_stream); 1259 (*device)->output_name = (name == NULL) ? NULL : strdup(name); 1260 } 1261 1262 return CUBEB_OK; 1263 #else 1264 return CUBEB_ERROR_NOT_SUPPORTED; 1265 #endif 1266 } 1267 1268 static int 1269 pulse_stream_device_destroy(cubeb_stream * stream, 1270 cubeb_device * device) 1271 { 1272 (void)stream; 1273 free(device->input_name); 1274 free(device->output_name); 1275 free(device); 1276 return CUBEB_OK; 1277 } 1278 1279 static void 1280 pulse_subscribe_callback(pa_context * ctx, 1281 pa_subscription_event_type_t t, 1282 uint32_t index, void * userdata) 1283 { 1284 (void)ctx; 1285 cubeb * context = userdata; 1286 1287 switch (t & PA_SUBSCRIPTION_EVENT_FACILITY_MASK) { 1288 case PA_SUBSCRIPTION_EVENT_SOURCE: 1289 case PA_SUBSCRIPTION_EVENT_SINK: 1290 1291 if (g_log_level) { 1292 if ((t & PA_SUBSCRIPTION_EVENT_FACILITY_MASK) == PA_SUBSCRIPTION_EVENT_SOURCE && 1293 (t & PA_SUBSCRIPTION_EVENT_TYPE_MASK) == PA_SUBSCRIPTION_EVENT_REMOVE) { 1294 LOG("Removing sink index %d", index); 1295 } else if ((t & PA_SUBSCRIPTION_EVENT_FACILITY_MASK) == PA_SUBSCRIPTION_EVENT_SOURCE && 1296 (t & PA_SUBSCRIPTION_EVENT_TYPE_MASK) == PA_SUBSCRIPTION_EVENT_NEW) { 1297 LOG("Adding sink index %d", index); 1298 } 1299 if ((t & PA_SUBSCRIPTION_EVENT_FACILITY_MASK) == PA_SUBSCRIPTION_EVENT_SINK && 1300 (t & PA_SUBSCRIPTION_EVENT_TYPE_MASK) == PA_SUBSCRIPTION_EVENT_REMOVE) { 1301 LOG("Removing source index %d", index); 1302 } else if ((t & PA_SUBSCRIPTION_EVENT_FACILITY_MASK) == PA_SUBSCRIPTION_EVENT_SINK && 1303 (t & PA_SUBSCRIPTION_EVENT_TYPE_MASK) == PA_SUBSCRIPTION_EVENT_NEW) { 1304 LOG("Adding source index %d", index); 1305 } 1306 } 1307 1308 if ((t & PA_SUBSCRIPTION_EVENT_TYPE_MASK) == PA_SUBSCRIPTION_EVENT_REMOVE || 1309 (t & PA_SUBSCRIPTION_EVENT_TYPE_MASK) == PA_SUBSCRIPTION_EVENT_NEW) { 1310 context->collection_changed_callback(context, context->collection_changed_user_ptr); 1311 } 1312 break; 1313 } 1314 } 1315 1316 static void 1317 subscribe_success(pa_context *c, int success, void *userdata) 1318 { 1319 (void)c; 1320 cubeb * context = userdata; 1321 assert(success); 1322 WRAP(pa_threaded_mainloop_signal)(context->mainloop, 0); 1323 } 1324 1325 static int 1326 pulse_register_device_collection_changed(cubeb * context, 1327 cubeb_device_type devtype, 1328 cubeb_device_collection_changed_callback collection_changed_callback, 1329 void * user_ptr) 1330 { 1331 context->collection_changed_callback = collection_changed_callback; 1332 context->collection_changed_user_ptr = user_ptr; 1333 1334 WRAP(pa_threaded_mainloop_lock)(context->mainloop); 1335 1336 pa_subscription_mask_t mask; 1337 if (context->collection_changed_callback == NULL) { 1338 // Unregister subscription 1339 WRAP(pa_context_set_subscribe_callback)(context->context, NULL, NULL); 1340 mask = PA_SUBSCRIPTION_MASK_NULL; 1341 } else { 1342 WRAP(pa_context_set_subscribe_callback)(context->context, pulse_subscribe_callback, context); 1343 if (devtype == CUBEB_DEVICE_TYPE_INPUT) 1344 mask = PA_SUBSCRIPTION_MASK_SOURCE; 1345 else if (devtype == CUBEB_DEVICE_TYPE_OUTPUT) 1346 mask = PA_SUBSCRIPTION_MASK_SINK; 1347 else 1348 mask = PA_SUBSCRIPTION_MASK_SINK | PA_SUBSCRIPTION_MASK_SOURCE; 1349 } 1350 1351 pa_operation * o; 1352 o = WRAP(pa_context_subscribe)(context->context, mask, subscribe_success, context); 1353 if (o == NULL) { 1354 LOG("Context subscribe failed"); 1355 return CUBEB_ERROR; 1356 } 1357 operation_wait(context, NULL, o); 1358 WRAP(pa_operation_unref)(o); 1359 1360 WRAP(pa_threaded_mainloop_unlock)(context->mainloop); 1361 1362 return CUBEB_OK; 1363 } 1364 1365 static struct cubeb_ops const pulse_ops = { 1366 .init = pulse_init, 1367 .get_backend_id = pulse_get_backend_id, 1368 .get_max_channel_count = pulse_get_max_channel_count, 1369 .get_min_latency = pulse_get_min_latency, 1370 .get_preferred_sample_rate = pulse_get_preferred_sample_rate, 1371 .enumerate_devices = pulse_enumerate_devices, 1372 .destroy = pulse_destroy, 1373 .stream_init = pulse_stream_init, 1374 .stream_destroy = pulse_stream_destroy, 1375 .stream_start = pulse_stream_start, 1376 .stream_stop = pulse_stream_stop, 1377 .stream_get_position = pulse_stream_get_position, 1378 .stream_get_latency = pulse_stream_get_latency, 1379 .stream_set_volume = pulse_stream_set_volume, 1380 .stream_set_panning = pulse_stream_set_panning, 1381 .stream_get_current_device = pulse_stream_get_current_device, 1382 .stream_device_destroy = pulse_stream_device_destroy, 1383 .stream_register_device_changed_callback = NULL, 1384 .register_device_collection_changed = pulse_register_device_collection_changed 1385 }; 1386