1 /*
2  * Copyright © 2011 Mozilla Foundation
3  *
4  * This program is made available under an ISC-style license.  See the
5  * accompanying file LICENSE for details.
6  */
7 #undef NDEBUG
8 #include <assert.h>
9 #include <dlfcn.h>
10 #include <stdlib.h>
11 #include <pulse/pulseaudio.h>
12 #include <string.h>
13 #include "cubeb/cubeb.h"
14 #include "cubeb-internal.h"
15 #include <stdio.h>
16 
17 #ifdef DISABLE_LIBPULSE_DLOPEN
18 #define WRAP(x) x
19 #else
20 #define WRAP(x) cubeb_##x
21 #define LIBPULSE_API_VISIT(X)                   \
22   X(pa_channel_map_can_balance)                 \
23   X(pa_channel_map_init_auto)                   \
24   X(pa_context_connect)                         \
25   X(pa_context_disconnect)                      \
26   X(pa_context_drain)                           \
27   X(pa_context_get_server_info)                 \
28   X(pa_context_get_sink_info_by_name)           \
29   X(pa_context_get_sink_info_list)              \
30   X(pa_context_get_source_info_list)            \
31   X(pa_context_get_state)                       \
32   X(pa_context_new)                             \
33   X(pa_context_rttime_new)                      \
34   X(pa_context_set_sink_input_volume)           \
35   X(pa_context_set_state_callback)              \
36   X(pa_context_unref)                           \
37   X(pa_cvolume_set)                             \
38   X(pa_cvolume_set_balance)                     \
39   X(pa_frame_size)                              \
40   X(pa_operation_get_state)                     \
41   X(pa_operation_unref)                         \
42   X(pa_proplist_gets)                           \
43   X(pa_rtclock_now)                             \
44   X(pa_stream_begin_write)                      \
45   X(pa_stream_cancel_write)                     \
46   X(pa_stream_connect_playback)                 \
47   X(pa_stream_cork)                             \
48   X(pa_stream_disconnect)                       \
49   X(pa_stream_get_channel_map)                  \
50   X(pa_stream_get_index)                        \
51   X(pa_stream_get_latency)                      \
52   X(pa_stream_get_sample_spec)                  \
53   X(pa_stream_get_state)                        \
54   X(pa_stream_get_time)                         \
55   X(pa_stream_new)                              \
56   X(pa_stream_set_state_callback)               \
57   X(pa_stream_set_write_callback)               \
58   X(pa_stream_unref)                            \
59   X(pa_stream_update_timing_info)               \
60   X(pa_stream_write)                            \
61   X(pa_sw_volume_from_linear)                   \
62   X(pa_threaded_mainloop_free)                  \
63   X(pa_threaded_mainloop_get_api)               \
64   X(pa_threaded_mainloop_in_thread)             \
65   X(pa_threaded_mainloop_lock)                  \
66   X(pa_threaded_mainloop_new)                   \
67   X(pa_threaded_mainloop_signal)                \
68   X(pa_threaded_mainloop_start)                 \
69   X(pa_threaded_mainloop_stop)                  \
70   X(pa_threaded_mainloop_unlock)                \
71   X(pa_threaded_mainloop_wait)                  \
72   X(pa_usec_to_bytes)                           \
73   X(pa_stream_set_read_callback)                \
74   X(pa_stream_connect_record)                   \
75   X(pa_stream_readable_size)                    \
76   X(pa_stream_writable_size)                    \
77   X(pa_stream_peek)                             \
78   X(pa_stream_drop)                             \
79   X(pa_stream_get_buffer_attr)                  \
80   X(pa_stream_get_device_name)                  \
81   X(pa_context_set_subscribe_callback)          \
82   X(pa_context_subscribe)                       \
83   X(pa_mainloop_api_once)                       \
84 
85 #define MAKE_TYPEDEF(x) static typeof(x) * cubeb_##x;
86 LIBPULSE_API_VISIT(MAKE_TYPEDEF);
87 #undef MAKE_TYPEDEF
88 #endif
89 
90 static struct cubeb_ops const pulse_ops;
91 
92 struct cubeb {
93   struct cubeb_ops const * ops;
94   void * libpulse;
95   pa_threaded_mainloop * mainloop;
96   pa_context * context;
97   pa_sink_info * default_sink_info;
98   char * context_name;
99   int error;
100   cubeb_device_collection_changed_callback collection_changed_callback;
101   void * collection_changed_user_ptr;
102 };
103 
104 struct cubeb_stream {
105   cubeb * context;
106   pa_stream * output_stream;
107   pa_stream * input_stream;
108   cubeb_data_callback data_callback;
109   cubeb_state_callback state_callback;
110   void * user_ptr;
111   pa_time_event * drain_timer;
112   pa_sample_spec output_sample_spec;
113   pa_sample_spec input_sample_spec;
114   int shutdown;
115   float volume;
116   cubeb_state state;
117 };
118 
119 static const float PULSE_NO_GAIN = -1.0;
120 
121 enum cork_state {
122   UNCORK = 0,
123   CORK = 1 << 0,
124   NOTIFY = 1 << 1
125 };
126 
127 static void
128 sink_info_callback(pa_context * context, const pa_sink_info * info, int eol, void * u)
129 {
130   (void)context;
131   cubeb * ctx = u;
132   if (!eol) {
133     free(ctx->default_sink_info);
134     ctx->default_sink_info = malloc(sizeof(pa_sink_info));
135     memcpy(ctx->default_sink_info, info, sizeof(pa_sink_info));
136   }
137   WRAP(pa_threaded_mainloop_signal)(ctx->mainloop, 0);
138 }
139 
140 static void
141 server_info_callback(pa_context * context, const pa_server_info * info, void * u)
142 {
143   WRAP(pa_context_get_sink_info_by_name)(context, info->default_sink_name, sink_info_callback, u);
144 }
145 
146 static void
147 context_state_callback(pa_context * c, void * u)
148 {
149   cubeb * ctx = u;
150   if (!PA_CONTEXT_IS_GOOD(WRAP(pa_context_get_state)(c))) {
151     ctx->error = 1;
152   }
153   WRAP(pa_threaded_mainloop_signal)(ctx->mainloop, 0);
154 }
155 
156 static void
157 context_notify_callback(pa_context * c, void * u)
158 {
159   (void)c;
160   cubeb * ctx = u;
161   WRAP(pa_threaded_mainloop_signal)(ctx->mainloop, 0);
162 }
163 
164 static void
165 stream_success_callback(pa_stream * s, int success, void * u)
166 {
167   (void)s;
168   (void)success;
169   cubeb_stream * stm = u;
170   WRAP(pa_threaded_mainloop_signal)(stm->context->mainloop, 0);
171 }
172 
173 static void
174 stream_state_change_callback(cubeb_stream * stm, cubeb_state s)
175 {
176   stm->state = s;
177   stm->state_callback(stm, stm->user_ptr, s);
178 }
179 
180 static void
181 stream_drain_callback(pa_mainloop_api * a, pa_time_event * e, struct timeval const * tv, void * u)
182 {
183   (void)a;
184   (void)tv;
185   cubeb_stream * stm = u;
186   assert(stm->drain_timer == e);
187   stream_state_change_callback(stm, CUBEB_STATE_DRAINED);
188   /* there's no pa_rttime_free, so use this instead. */
189   a->time_free(stm->drain_timer);
190   stm->drain_timer = NULL;
191   WRAP(pa_threaded_mainloop_signal)(stm->context->mainloop, 0);
192 }
193 
194 static void
195 stream_state_callback(pa_stream * s, void * u)
196 {
197   cubeb_stream * stm = u;
198   if (!PA_STREAM_IS_GOOD(WRAP(pa_stream_get_state)(s))) {
199     stream_state_change_callback(stm, CUBEB_STATE_ERROR);
200   }
201   WRAP(pa_threaded_mainloop_signal)(stm->context->mainloop, 0);
202 }
203 
204 static void
205 trigger_user_callback(pa_stream * s, void const * input_data, size_t nbytes, cubeb_stream * stm)
206 {
207   void * buffer;
208   size_t size;
209   int r;
210   long got;
211   size_t towrite, read_offset;
212   size_t frame_size;
213 
214   frame_size = WRAP(pa_frame_size)(&stm->output_sample_spec);
215   assert(nbytes % frame_size == 0);
216 
217   towrite = nbytes;
218   read_offset = 0;
219   while (towrite) {
220     size = towrite;
221     r = WRAP(pa_stream_begin_write)(s, &buffer, &size);
222     // Note: this has failed running under rr on occassion - needs investigation.
223     assert(r == 0);
224     assert(size > 0);
225     assert(size % frame_size == 0);
226 
227     LOGV("Trigger user callback with output buffer size=%zd, read_offset=%zd", size, read_offset);
228     got = stm->data_callback(stm, stm->user_ptr, (uint8_t const *)input_data + read_offset, buffer, size / frame_size);
229     if (got < 0) {
230       WRAP(pa_stream_cancel_write)(s);
231       stm->shutdown = 1;
232       return;
233     }
234     // If more iterations move offset of read buffer
235     if (input_data) {
236       size_t in_frame_size = WRAP(pa_frame_size)(&stm->input_sample_spec);
237       read_offset += (size / frame_size) * in_frame_size;
238     }
239 
240     if (stm->volume != PULSE_NO_GAIN) {
241       uint32_t samples =  size * stm->output_sample_spec.channels / frame_size ;
242 
243       if (stm->output_sample_spec.format == PA_SAMPLE_S16BE ||
244           stm->output_sample_spec.format == PA_SAMPLE_S16LE) {
245         short * b = buffer;
246         for (uint32_t i = 0; i < samples; i++) {
247           b[i] *= stm->volume;
248         }
249       } else {
250         float * b = buffer;
251         for (uint32_t i = 0; i < samples; i++) {
252           b[i] *= stm->volume;
253         }
254       }
255     }
256 
257     r = WRAP(pa_stream_write)(s, buffer, got * frame_size, NULL, 0, PA_SEEK_RELATIVE);
258     assert(r == 0);
259 
260     if ((size_t) got < size / frame_size) {
261       pa_usec_t latency = 0;
262       r = WRAP(pa_stream_get_latency)(s, &latency, NULL);
263       if (r == -PA_ERR_NODATA) {
264         /* this needs a better guess. */
265         latency = 100 * PA_USEC_PER_MSEC;
266       }
267       assert(r == 0 || r == -PA_ERR_NODATA);
268       /* pa_stream_drain is useless, see PA bug# 866. this is a workaround. */
269       /* arbitrary safety margin: double the current latency. */
270       assert(!stm->drain_timer);
271       stm->drain_timer = WRAP(pa_context_rttime_new)(stm->context->context, WRAP(pa_rtclock_now)() + 2 * latency, stream_drain_callback, stm);
272       stm->shutdown = 1;
273       return;
274     }
275 
276     towrite -= size;
277   }
278 
279   assert(towrite == 0);
280 }
281 
282 static int
283 read_from_input(pa_stream * s, void const ** buffer, size_t * size)
284 {
285   size_t readable_size = WRAP(pa_stream_readable_size)(s);
286   if (readable_size > 0) {
287     if (WRAP(pa_stream_peek)(s, buffer, size) < 0) {
288       return -1;
289     }
290   }
291   return readable_size;
292 }
293 
294 static void
295 stream_write_callback(pa_stream * s, size_t nbytes, void * u)
296 {
297   LOGV("Output callback to be written buffer size %zd", nbytes);
298   cubeb_stream * stm = u;
299   if (stm->shutdown ||
300       stm->state != CUBEB_STATE_STARTED) {
301     return;
302   }
303 
304   if (!stm->input_stream){
305     // Output/playback only operation.
306     // Write directly to output
307     assert(!stm->input_stream && stm->output_stream);
308     trigger_user_callback(s, NULL, nbytes, stm);
309   }
310 }
311 
312 static void
313 stream_read_callback(pa_stream * s, size_t nbytes, void * u)
314 {
315   LOGV("Input callback buffer size %zd", nbytes);
316   cubeb_stream * stm = u;
317   if (stm->shutdown) {
318     return;
319   }
320 
321   void const * read_data = NULL;
322   size_t read_size;
323   while (read_from_input(s, &read_data, &read_size) > 0) {
324     /* read_data can be NULL in case of a hole. */
325     if (read_data) {
326       size_t in_frame_size = WRAP(pa_frame_size)(&stm->input_sample_spec);
327       size_t read_frames = read_size / in_frame_size;
328 
329       if (stm->output_stream) {
330         // input/capture + output/playback operation
331         size_t out_frame_size = WRAP(pa_frame_size)(&stm->output_sample_spec);
332         size_t write_size = read_frames * out_frame_size;
333         // Offer full duplex data for writing
334         trigger_user_callback(stm->output_stream, read_data, write_size, stm);
335       } else {
336         // input/capture only operation. Call callback directly
337         long got = stm->data_callback(stm, stm->user_ptr, read_data, NULL, read_frames);
338         if (got < 0 || (size_t) got != read_frames) {
339           WRAP(pa_stream_cancel_write)(s);
340           stm->shutdown = 1;
341           break;
342         }
343       }
344     }
345     if (read_size > 0) {
346       WRAP(pa_stream_drop)(s);
347     }
348 
349     if (stm->shutdown) {
350       return;
351     }
352   }
353 }
354 
355 static int
356 wait_until_context_ready(cubeb * ctx)
357 {
358   for (;;) {
359     pa_context_state_t state = WRAP(pa_context_get_state)(ctx->context);
360     if (!PA_CONTEXT_IS_GOOD(state))
361       return -1;
362     if (state == PA_CONTEXT_READY)
363       break;
364     WRAP(pa_threaded_mainloop_wait)(ctx->mainloop);
365   }
366   return 0;
367 }
368 
369 static int
370 wait_until_io_stream_ready(pa_stream * stream, pa_threaded_mainloop * mainloop)
371 {
372   if (!stream || !mainloop){
373     return -1;
374   }
375   for (;;) {
376     pa_stream_state_t state = WRAP(pa_stream_get_state)(stream);
377     if (!PA_STREAM_IS_GOOD(state))
378       return -1;
379     if (state == PA_STREAM_READY)
380       break;
381     WRAP(pa_threaded_mainloop_wait)(mainloop);
382   }
383   return 0;
384 }
385 
386 static int
387 wait_until_stream_ready(cubeb_stream * stm)
388 {
389   if (stm->output_stream &&
390       wait_until_io_stream_ready(stm->output_stream, stm->context->mainloop) == -1) {
391     return -1;
392   }
393   if(stm->input_stream &&
394      wait_until_io_stream_ready(stm->input_stream, stm->context->mainloop) == -1) {
395     return -1;
396   }
397   return 0;
398 }
399 
400 static int
401 operation_wait(cubeb * ctx, pa_stream * stream, pa_operation * o)
402 {
403   while (WRAP(pa_operation_get_state)(o) == PA_OPERATION_RUNNING) {
404     WRAP(pa_threaded_mainloop_wait)(ctx->mainloop);
405     if (!PA_CONTEXT_IS_GOOD(WRAP(pa_context_get_state)(ctx->context))) {
406       return -1;
407     }
408     if (stream && !PA_STREAM_IS_GOOD(WRAP(pa_stream_get_state)(stream))) {
409       return -1;
410     }
411   }
412   return 0;
413 }
414 
415 static void
416 cork_io_stream(cubeb_stream * stm, pa_stream * io_stream, enum cork_state state)
417 {
418   pa_operation * o;
419   if (!io_stream) {
420     return;
421   }
422   o = WRAP(pa_stream_cork)(io_stream, state & CORK, stream_success_callback, stm);
423   if (o) {
424     operation_wait(stm->context, io_stream, o);
425     WRAP(pa_operation_unref)(o);
426   }
427 }
428 
429 static void
430 stream_cork(cubeb_stream * stm, enum cork_state state)
431 {
432   WRAP(pa_threaded_mainloop_lock)(stm->context->mainloop);
433   cork_io_stream(stm, stm->output_stream, state);
434   cork_io_stream(stm, stm->input_stream, state);
435   WRAP(pa_threaded_mainloop_unlock)(stm->context->mainloop);
436 
437   if (state & NOTIFY) {
438     stream_state_change_callback(stm, state & CORK ? CUBEB_STATE_STOPPED
439                                                    : CUBEB_STATE_STARTED);
440   }
441 }
442 
443 static int
444 stream_update_timing_info(cubeb_stream * stm)
445 {
446   int r = -1;
447   pa_operation * o = NULL;
448   if (stm->output_stream) {
449     o = WRAP(pa_stream_update_timing_info)(stm->output_stream, stream_success_callback, stm);
450     if (o) {
451       r = operation_wait(stm->context, stm->output_stream, o);
452       WRAP(pa_operation_unref)(o);
453     }
454     if (r != 0) {
455       return r;
456     }
457   }
458 
459   if (stm->input_stream) {
460     o = WRAP(pa_stream_update_timing_info)(stm->input_stream, stream_success_callback, stm);
461     if (o) {
462       r = operation_wait(stm->context, stm->input_stream, o);
463       WRAP(pa_operation_unref)(o);
464     }
465   }
466 
467   return r;
468 }
469 
470 static void pulse_context_destroy(cubeb * ctx);
471 static void pulse_destroy(cubeb * ctx);
472 
473 static int
474 pulse_context_init(cubeb * ctx)
475 {
476   if (ctx->context) {
477     assert(ctx->error == 1);
478     pulse_context_destroy(ctx);
479   }
480 
481   ctx->context = WRAP(pa_context_new)(WRAP(pa_threaded_mainloop_get_api)(ctx->mainloop),
482                                       ctx->context_name);
483   if (!ctx->context) {
484     return -1;
485   }
486   WRAP(pa_context_set_state_callback)(ctx->context, context_state_callback, ctx);
487 
488   WRAP(pa_threaded_mainloop_lock)(ctx->mainloop);
489   WRAP(pa_context_connect)(ctx->context, NULL, 0, NULL);
490 
491   if (wait_until_context_ready(ctx) != 0) {
492     WRAP(pa_threaded_mainloop_unlock)(ctx->mainloop);
493     pulse_context_destroy(ctx);
494     ctx->context = NULL;
495     return -1;
496   }
497 
498   WRAP(pa_threaded_mainloop_unlock)(ctx->mainloop);
499 
500   ctx->error = 0;
501 
502   return 0;
503 }
504 
505 /*static*/ int
506 pulse_init(cubeb ** context, char const * context_name)
507 {
508   void * libpulse = NULL;
509   cubeb * ctx;
510 
511   *context = NULL;
512 
513 #ifndef DISABLE_LIBPULSE_DLOPEN
514   libpulse = dlopen("libpulse.so.0", RTLD_LAZY);
515   if (!libpulse) {
516     return CUBEB_ERROR;
517   }
518 
519 #define LOAD(x) {                               \
520     cubeb_##x = dlsym(libpulse, #x);            \
521     if (!cubeb_##x) {                           \
522       dlclose(libpulse);                        \
523       return CUBEB_ERROR;                       \
524     }                                           \
525   }
526 
527   LIBPULSE_API_VISIT(LOAD);
528 #undef LOAD
529 #endif
530 
531   ctx = calloc(1, sizeof(*ctx));
532   assert(ctx);
533 
534   ctx->ops = &pulse_ops;
535   ctx->libpulse = libpulse;
536 
537   ctx->mainloop = WRAP(pa_threaded_mainloop_new)();
538   ctx->default_sink_info = NULL;
539 
540   WRAP(pa_threaded_mainloop_start)(ctx->mainloop);
541 
542   ctx->context_name = context_name ? strdup(context_name) : NULL;
543   if (pulse_context_init(ctx) != 0) {
544     pulse_destroy(ctx);
545     return CUBEB_ERROR;
546   }
547 
548   WRAP(pa_threaded_mainloop_lock)(ctx->mainloop);
549   WRAP(pa_context_get_server_info)(ctx->context, server_info_callback, ctx);
550   WRAP(pa_threaded_mainloop_unlock)(ctx->mainloop);
551 
552   *context = ctx;
553 
554   return CUBEB_OK;
555 }
556 
557 static char const *
558 pulse_get_backend_id(cubeb * ctx)
559 {
560   (void)ctx;
561   return "pulse";
562 }
563 
564 static int
565 pulse_get_max_channel_count(cubeb * ctx, uint32_t * max_channels)
566 {
567   (void)ctx;
568   assert(ctx && max_channels);
569 
570   WRAP(pa_threaded_mainloop_lock)(ctx->mainloop);
571   while (!ctx->default_sink_info) {
572     WRAP(pa_threaded_mainloop_wait)(ctx->mainloop);
573   }
574   WRAP(pa_threaded_mainloop_unlock)(ctx->mainloop);
575 
576   *max_channels = ctx->default_sink_info->channel_map.channels;
577 
578   return CUBEB_OK;
579 }
580 
581 static int
582 pulse_get_preferred_sample_rate(cubeb * ctx, uint32_t * rate)
583 {
584   assert(ctx && rate);
585   (void)ctx;
586 
587   WRAP(pa_threaded_mainloop_lock)(ctx->mainloop);
588   while (!ctx->default_sink_info) {
589     WRAP(pa_threaded_mainloop_wait)(ctx->mainloop);
590   }
591   WRAP(pa_threaded_mainloop_unlock)(ctx->mainloop);
592 
593   *rate = ctx->default_sink_info->sample_spec.rate;
594 
595   return CUBEB_OK;
596 }
597 
598 static int
599 pulse_get_min_latency(cubeb * ctx, cubeb_stream_params params, uint32_t * latency_frames)
600 {
601   (void)ctx;
602   // According to PulseAudio developers, this is a safe minimum.
603   *latency_frames = 25 * params.rate / 1000;
604 
605   return CUBEB_OK;
606 }
607 
608 static void
609 pulse_context_destroy(cubeb * ctx)
610 {
611   pa_operation * o;
612 
613   WRAP(pa_threaded_mainloop_lock)(ctx->mainloop);
614   o = WRAP(pa_context_drain)(ctx->context, context_notify_callback, ctx);
615   if (o) {
616     operation_wait(ctx, NULL, o);
617     WRAP(pa_operation_unref)(o);
618   }
619   WRAP(pa_context_set_state_callback)(ctx->context, NULL, NULL);
620   WRAP(pa_context_disconnect)(ctx->context);
621   WRAP(pa_context_unref)(ctx->context);
622   WRAP(pa_threaded_mainloop_unlock)(ctx->mainloop);
623 }
624 
625 static void
626 pulse_destroy(cubeb * ctx)
627 {
628   if (ctx->context_name) {
629     free(ctx->context_name);
630   }
631   if (ctx->context) {
632     pulse_context_destroy(ctx);
633   }
634 
635   if (ctx->mainloop) {
636     WRAP(pa_threaded_mainloop_stop)(ctx->mainloop);
637     WRAP(pa_threaded_mainloop_free)(ctx->mainloop);
638   }
639 
640   if (ctx->libpulse) {
641     dlclose(ctx->libpulse);
642   }
643   if (ctx->default_sink_info) {
644     free(ctx->default_sink_info);
645   }
646   free(ctx);
647 }
648 
649 static void pulse_stream_destroy(cubeb_stream * stm);
650 
651 static pa_sample_format_t
652 to_pulse_format(cubeb_sample_format format)
653 {
654   switch (format) {
655   case CUBEB_SAMPLE_S16LE:
656     return PA_SAMPLE_S16LE;
657   case CUBEB_SAMPLE_S16BE:
658     return PA_SAMPLE_S16BE;
659   case CUBEB_SAMPLE_FLOAT32LE:
660     return PA_SAMPLE_FLOAT32LE;
661   case CUBEB_SAMPLE_FLOAT32BE:
662     return PA_SAMPLE_FLOAT32BE;
663   default:
664     return PA_SAMPLE_INVALID;
665   }
666 }
667 
668 static int
669 create_pa_stream(cubeb_stream * stm,
670                  pa_stream ** pa_stm,
671                  cubeb_stream_params * stream_params,
672                  char const * stream_name)
673 {
674   assert(stm && stream_params);
675   *pa_stm = NULL;
676   pa_sample_spec ss;
677   ss.format = to_pulse_format(stream_params->format);
678   if (ss.format == PA_SAMPLE_INVALID)
679     return CUBEB_ERROR_INVALID_FORMAT;
680   ss.rate = stream_params->rate;
681   ss.channels = stream_params->channels;
682 
683   *pa_stm = WRAP(pa_stream_new)(stm->context->context, stream_name, &ss, NULL);
684   return (*pa_stm == NULL) ? CUBEB_ERROR : CUBEB_OK;
685 }
686 
687 static pa_buffer_attr
688 set_buffering_attribute(unsigned int latency_frames, pa_sample_spec * sample_spec)
689 {
690   pa_buffer_attr battr;
691   battr.maxlength = -1;
692   battr.prebuf    = -1;
693   battr.tlength   = latency_frames * WRAP(pa_frame_size)(sample_spec);
694   battr.minreq    = battr.tlength / 4;
695   battr.fragsize  = battr.minreq;
696 
697   LOG("Requested buffer attributes maxlength %u, tlength %u, prebuf %u, minreq %u, fragsize %u",
698       battr.maxlength, battr.tlength, battr.prebuf, battr.minreq, battr.fragsize);
699 
700   return battr;
701 }
702 
703 static int
704 pulse_stream_init(cubeb * context,
705                   cubeb_stream ** stream,
706                   char const * stream_name,
707                   cubeb_devid input_device,
708                   cubeb_stream_params * input_stream_params,
709                   cubeb_devid output_device,
710                   cubeb_stream_params * output_stream_params,
711                   unsigned int latency_frames,
712                   cubeb_data_callback data_callback,
713                   cubeb_state_callback state_callback,
714                   void * user_ptr)
715 {
716   cubeb_stream * stm;
717   pa_buffer_attr battr;
718   int r;
719 
720   assert(context);
721 
722   // If the connection failed for some reason, try to reconnect
723   if (context->error == 1 && pulse_context_init(context) != 0) {
724     return CUBEB_ERROR;
725   }
726 
727   *stream = NULL;
728 
729   stm = calloc(1, sizeof(*stm));
730   assert(stm);
731 
732   stm->context = context;
733   stm->data_callback = data_callback;
734   stm->state_callback = state_callback;
735   stm->user_ptr = user_ptr;
736   stm->volume = PULSE_NO_GAIN;
737   stm->state = -1;
738   assert(stm->shutdown == 0);
739 
740   WRAP(pa_threaded_mainloop_lock)(stm->context->mainloop);
741   if (output_stream_params) {
742     r = create_pa_stream(stm, &stm->output_stream, output_stream_params, stream_name);
743     if (r != CUBEB_OK) {
744       WRAP(pa_threaded_mainloop_unlock)(stm->context->mainloop);
745       pulse_stream_destroy(stm);
746       return r;
747     }
748 
749     stm->output_sample_spec = *(WRAP(pa_stream_get_sample_spec)(stm->output_stream));
750 
751     WRAP(pa_stream_set_state_callback)(stm->output_stream, stream_state_callback, stm);
752     WRAP(pa_stream_set_write_callback)(stm->output_stream, stream_write_callback, stm);
753 
754     battr = set_buffering_attribute(latency_frames, &stm->output_sample_spec);
755     WRAP(pa_stream_connect_playback)(stm->output_stream,
756                                      output_device,
757                                      &battr,
758                                      PA_STREAM_AUTO_TIMING_UPDATE | PA_STREAM_INTERPOLATE_TIMING |
759                                      PA_STREAM_START_CORKED | PA_STREAM_ADJUST_LATENCY,
760                                      NULL, NULL);
761   }
762 
763   // Set up input stream
764   if (input_stream_params) {
765     r = create_pa_stream(stm, &stm->input_stream, input_stream_params, stream_name);
766     if (r != CUBEB_OK) {
767       WRAP(pa_threaded_mainloop_unlock)(stm->context->mainloop);
768       pulse_stream_destroy(stm);
769       return r;
770     }
771 
772     stm->input_sample_spec = *(WRAP(pa_stream_get_sample_spec)(stm->input_stream));
773 
774     WRAP(pa_stream_set_state_callback)(stm->input_stream, stream_state_callback, stm);
775     WRAP(pa_stream_set_read_callback)(stm->input_stream, stream_read_callback, stm);
776 
777     battr = set_buffering_attribute(latency_frames, &stm->input_sample_spec);
778     WRAP(pa_stream_connect_record)(stm->input_stream,
779                                    input_device,
780                                    &battr,
781                                    PA_STREAM_AUTO_TIMING_UPDATE | PA_STREAM_INTERPOLATE_TIMING |
782                                    PA_STREAM_START_CORKED | PA_STREAM_ADJUST_LATENCY);
783   }
784 
785   r = wait_until_stream_ready(stm);
786   if (r == 0) {
787     /* force a timing update now, otherwise timing info does not become valid
788        until some point after initialization has completed. */
789     r = stream_update_timing_info(stm);
790   }
791 
792   WRAP(pa_threaded_mainloop_unlock)(stm->context->mainloop);
793 
794   if (r != 0) {
795     pulse_stream_destroy(stm);
796     return CUBEB_ERROR;
797   }
798 
799   if (g_log_level) {
800     if (output_stream_params){
801       const pa_buffer_attr * output_att;
802       output_att = WRAP(pa_stream_get_buffer_attr)(stm->output_stream);
803       LOG("Output buffer attributes maxlength %u, tlength %u, prebuf %u, minreq %u, fragsize %u",output_att->maxlength, output_att->tlength,
804           output_att->prebuf, output_att->minreq, output_att->fragsize);
805     }
806 
807     if (input_stream_params){
808       const pa_buffer_attr * input_att;
809       input_att = WRAP(pa_stream_get_buffer_attr)(stm->input_stream);
810       LOG("Input buffer attributes maxlength %u, tlength %u, prebuf %u, minreq %u, fragsize %u",input_att->maxlength, input_att->tlength,
811           input_att->prebuf, input_att->minreq, input_att->fragsize);
812     }
813   }
814 
815   *stream = stm;
816 
817   return CUBEB_OK;
818 }
819 
820 static void
821 pulse_stream_destroy(cubeb_stream * stm)
822 {
823   stream_cork(stm, CORK);
824 
825   WRAP(pa_threaded_mainloop_lock)(stm->context->mainloop);
826   if (stm->output_stream) {
827 
828     if (stm->drain_timer) {
829       /* there's no pa_rttime_free, so use this instead. */
830       WRAP(pa_threaded_mainloop_get_api)(stm->context->mainloop)->time_free(stm->drain_timer);
831     }
832 
833     WRAP(pa_stream_set_state_callback)(stm->output_stream, NULL, NULL);
834     WRAP(pa_stream_set_write_callback)(stm->output_stream, NULL, NULL);
835     WRAP(pa_stream_disconnect)(stm->output_stream);
836     WRAP(pa_stream_unref)(stm->output_stream);
837   }
838 
839   if (stm->input_stream) {
840     WRAP(pa_stream_set_state_callback)(stm->input_stream, NULL, NULL);
841     WRAP(pa_stream_set_read_callback)(stm->input_stream, NULL, NULL);
842     WRAP(pa_stream_disconnect)(stm->input_stream);
843     WRAP(pa_stream_unref)(stm->input_stream);
844   }
845   WRAP(pa_threaded_mainloop_unlock)(stm->context->mainloop);
846 
847   free(stm);
848 }
849 
850 static void
851 pulse_defer_event_cb(pa_mainloop_api * a, void * userdata)
852 {
853   (void)a;
854   cubeb_stream * stm = userdata;
855   if (stm->shutdown) {
856     return;
857   }
858   size_t writable_size = WRAP(pa_stream_writable_size)(stm->output_stream);
859   trigger_user_callback(stm->output_stream, NULL, writable_size, stm);
860 }
861 
862 static int
863 pulse_stream_start(cubeb_stream * stm)
864 {
865   stm->shutdown = 0;
866   stream_cork(stm, UNCORK | NOTIFY);
867 
868   if (stm->output_stream && !stm->input_stream) {
869     /* On output only case need to manually call user cb once in order to make
870      * things roll. This is done via a defer event in order to execute it
871      * from PA server thread. */
872     WRAP(pa_threaded_mainloop_lock)(stm->context->mainloop);
873     WRAP(pa_mainloop_api_once)(WRAP(pa_threaded_mainloop_get_api)(stm->context->mainloop),
874                                pulse_defer_event_cb, stm);
875     WRAP(pa_threaded_mainloop_unlock)(stm->context->mainloop);
876   }
877 
878   return CUBEB_OK;
879 }
880 
881 static int
882 pulse_stream_stop(cubeb_stream * stm)
883 {
884   WRAP(pa_threaded_mainloop_lock)(stm->context->mainloop);
885   stm->shutdown = 1;
886   // If draining is taking place wait to finish
887   while (stm->drain_timer) {
888     WRAP(pa_threaded_mainloop_wait)(stm->context->mainloop);
889   }
890   WRAP(pa_threaded_mainloop_unlock)(stm->context->mainloop);
891 
892   stream_cork(stm, CORK | NOTIFY);
893   return CUBEB_OK;
894 }
895 
896 static int
897 pulse_stream_get_position(cubeb_stream * stm, uint64_t * position)
898 {
899   int r, in_thread;
900   pa_usec_t r_usec;
901   uint64_t bytes;
902 
903   if (!stm || !stm->output_stream) {
904     return CUBEB_ERROR;
905   }
906 
907   in_thread = WRAP(pa_threaded_mainloop_in_thread)(stm->context->mainloop);
908 
909   if (!in_thread) {
910     WRAP(pa_threaded_mainloop_lock)(stm->context->mainloop);
911   }
912   r = WRAP(pa_stream_get_time)(stm->output_stream, &r_usec);
913   if (!in_thread) {
914     WRAP(pa_threaded_mainloop_unlock)(stm->context->mainloop);
915   }
916 
917   if (r != 0) {
918     return CUBEB_ERROR;
919   }
920 
921   bytes = WRAP(pa_usec_to_bytes)(r_usec, &stm->output_sample_spec);
922   *position = bytes / WRAP(pa_frame_size)(&stm->output_sample_spec);
923 
924   return CUBEB_OK;
925 }
926 
927 static int
928 pulse_stream_get_latency(cubeb_stream * stm, uint32_t * latency)
929 {
930   pa_usec_t r_usec;
931   int negative, r;
932 
933   if (!stm || !stm->output_stream) {
934     return CUBEB_ERROR;
935   }
936 
937   r = WRAP(pa_stream_get_latency)(stm->output_stream, &r_usec, &negative);
938   assert(!negative);
939   if (r) {
940     return CUBEB_ERROR;
941   }
942 
943   *latency = r_usec * stm->output_sample_spec.rate / PA_USEC_PER_SEC;
944   return CUBEB_OK;
945 }
946 
947 static void
948 volume_success(pa_context *c, int success, void *userdata)
949 {
950   (void)success;
951   (void)c;
952   cubeb_stream * stream = userdata;
953   assert(success);
954   WRAP(pa_threaded_mainloop_signal)(stream->context->mainloop, 0);
955 }
956 
957 static int
958 pulse_stream_set_volume(cubeb_stream * stm, float volume)
959 {
960   uint32_t index;
961   pa_operation * op;
962   pa_volume_t vol;
963   pa_cvolume cvol;
964   const pa_sample_spec * ss;
965 
966   if (!stm->output_stream) {
967     return CUBEB_ERROR;
968   }
969 
970   WRAP(pa_threaded_mainloop_lock)(stm->context->mainloop);
971 
972   while (!stm->context->default_sink_info) {
973     WRAP(pa_threaded_mainloop_wait)(stm->context->mainloop);
974   }
975 
976   /* if the pulse daemon is configured to use flat volumes,
977    * apply our own gain instead of changing the input volume on the sink. */
978   if (stm->context->default_sink_info->flags & PA_SINK_FLAT_VOLUME) {
979     stm->volume = volume;
980   } else {
981     ss = WRAP(pa_stream_get_sample_spec)(stm->output_stream);
982 
983     vol = WRAP(pa_sw_volume_from_linear)(volume);
984     WRAP(pa_cvolume_set)(&cvol, ss->channels, vol);
985 
986     index = WRAP(pa_stream_get_index)(stm->output_stream);
987 
988     op = WRAP(pa_context_set_sink_input_volume)(stm->context->context,
989                                                 index, &cvol, volume_success,
990                                                 stm);
991     if (op) {
992       operation_wait(stm->context, stm->output_stream, op);
993       WRAP(pa_operation_unref)(op);
994     }
995   }
996 
997   WRAP(pa_threaded_mainloop_unlock)(stm->context->mainloop);
998 
999   return CUBEB_OK;
1000 }
1001 
1002 static int
1003 pulse_stream_set_panning(cubeb_stream * stream, float panning)
1004 {
1005   const pa_channel_map * map;
1006   pa_cvolume vol;
1007 
1008   if (!stream->output_stream) {
1009     return CUBEB_ERROR;
1010   }
1011 
1012   map = WRAP(pa_stream_get_channel_map)(stream->output_stream);
1013 
1014   if (!WRAP(pa_channel_map_can_balance)(map)) {
1015     return CUBEB_ERROR;
1016   }
1017 
1018   WRAP(pa_cvolume_set_balance)(&vol, map, panning);
1019 
1020   return CUBEB_OK;
1021 }
1022 
1023 typedef struct {
1024   char * default_sink_name;
1025   char * default_source_name;
1026 
1027   cubeb_device_info ** devinfo;
1028   uint32_t max;
1029   uint32_t count;
1030   cubeb * context;
1031 } pulse_dev_list_data;
1032 
1033 static cubeb_device_fmt
1034 pulse_format_to_cubeb_format(pa_sample_format_t format)
1035 {
1036   switch (format) {
1037   case PA_SAMPLE_S16LE:
1038     return CUBEB_DEVICE_FMT_S16LE;
1039   case PA_SAMPLE_S16BE:
1040     return CUBEB_DEVICE_FMT_S16BE;
1041   case PA_SAMPLE_FLOAT32LE:
1042     return CUBEB_DEVICE_FMT_F32LE;
1043   case PA_SAMPLE_FLOAT32BE:
1044     return CUBEB_DEVICE_FMT_F32BE;
1045   default:
1046     return CUBEB_DEVICE_FMT_F32NE;
1047   }
1048 }
1049 
1050 static void
1051 pulse_ensure_dev_list_data_list_size (pulse_dev_list_data * list_data)
1052 {
1053   if (list_data->count == list_data->max) {
1054     list_data->max += 8;
1055     list_data->devinfo = realloc(list_data->devinfo,
1056         sizeof(cubeb_device_info *) * list_data->max);
1057   }
1058 }
1059 
1060 static cubeb_device_state
1061 pulse_get_state_from_sink_port(pa_sink_port_info * info)
1062 {
1063   if (info != NULL) {
1064 #if PA_CHECK_VERSION(2, 0, 0)
1065     if (info->available == PA_PORT_AVAILABLE_NO)
1066       return CUBEB_DEVICE_STATE_UNPLUGGED;
1067     else /*if (info->available == PA_PORT_AVAILABLE_YES) + UNKNOWN */
1068 #endif
1069       return CUBEB_DEVICE_STATE_ENABLED;
1070   }
1071 
1072   return CUBEB_DEVICE_STATE_DISABLED;
1073 }
1074 
1075 static void
1076 pulse_sink_info_cb(pa_context * context, const pa_sink_info * info,
1077     int eol, void * user_data)
1078 {
1079   pulse_dev_list_data * list_data = user_data;
1080   cubeb_device_info * devinfo;
1081   const char * prop;
1082 
1083   (void)context;
1084 
1085   if (eol || info == NULL)
1086     return;
1087 
1088   devinfo = calloc(1, sizeof(cubeb_device_info));
1089 
1090   devinfo->device_id = strdup(info->name);
1091   devinfo->devid = devinfo->device_id;
1092   devinfo->friendly_name = strdup(info->description);
1093   prop = WRAP(pa_proplist_gets)(info->proplist, "sysfs.path");
1094   if (prop)
1095     devinfo->group_id = strdup(prop);
1096   prop = WRAP(pa_proplist_gets)(info->proplist, "device.vendor.name");
1097   if (prop)
1098     devinfo->vendor_name = strdup(prop);
1099 
1100   devinfo->type = CUBEB_DEVICE_TYPE_OUTPUT;
1101   devinfo->state = pulse_get_state_from_sink_port(info->active_port);
1102   devinfo->preferred = strcmp(info->name, list_data->default_sink_name) == 0;
1103 
1104   devinfo->format = CUBEB_DEVICE_FMT_ALL;
1105   devinfo->default_format = pulse_format_to_cubeb_format(info->sample_spec.format);
1106   devinfo->max_channels = info->channel_map.channels;
1107   devinfo->min_rate = 1;
1108   devinfo->max_rate = PA_RATE_MAX;
1109   devinfo->default_rate = info->sample_spec.rate;
1110 
1111   devinfo->latency_lo = 0;
1112   devinfo->latency_hi = 0;
1113 
1114   pulse_ensure_dev_list_data_list_size (list_data);
1115   list_data->devinfo[list_data->count++] = devinfo;
1116 
1117   WRAP(pa_threaded_mainloop_signal)(list_data->context->mainloop, 0);
1118 }
1119 
1120 static cubeb_device_state
1121 pulse_get_state_from_source_port(pa_source_port_info * info)
1122 {
1123   if (info != NULL) {
1124 #if PA_CHECK_VERSION(2, 0, 0)
1125     if (info->available == PA_PORT_AVAILABLE_NO)
1126       return CUBEB_DEVICE_STATE_UNPLUGGED;
1127     else /*if (info->available == PA_PORT_AVAILABLE_YES) + UNKNOWN */
1128 #endif
1129       return CUBEB_DEVICE_STATE_ENABLED;
1130   }
1131 
1132   return CUBEB_DEVICE_STATE_DISABLED;
1133 }
1134 
1135 static void
1136 pulse_source_info_cb(pa_context * context, const pa_source_info * info,
1137     int eol, void * user_data)
1138 {
1139   pulse_dev_list_data * list_data = user_data;
1140   cubeb_device_info * devinfo;
1141   const char * prop;
1142 
1143   (void)context;
1144 
1145   if (eol)
1146     return;
1147 
1148   devinfo = calloc(1, sizeof(cubeb_device_info));
1149 
1150   devinfo->device_id = strdup(info->name);
1151   devinfo->devid = devinfo->device_id;
1152   devinfo->friendly_name = strdup(info->description);
1153   prop = WRAP(pa_proplist_gets)(info->proplist, "sysfs.path");
1154   if (prop)
1155     devinfo->group_id = strdup(prop);
1156   prop = WRAP(pa_proplist_gets)(info->proplist, "device.vendor.name");
1157   if (prop)
1158     devinfo->vendor_name = strdup(prop);
1159 
1160   devinfo->type = CUBEB_DEVICE_TYPE_INPUT;
1161   devinfo->state = pulse_get_state_from_source_port(info->active_port);
1162   devinfo->preferred = strcmp(info->name, list_data->default_source_name) == 0;
1163 
1164   devinfo->format = CUBEB_DEVICE_FMT_ALL;
1165   devinfo->default_format = pulse_format_to_cubeb_format(info->sample_spec.format);
1166   devinfo->max_channels = info->channel_map.channels;
1167   devinfo->min_rate = 1;
1168   devinfo->max_rate = PA_RATE_MAX;
1169   devinfo->default_rate = info->sample_spec.rate;
1170 
1171   devinfo->latency_lo = 0;
1172   devinfo->latency_hi = 0;
1173 
1174   pulse_ensure_dev_list_data_list_size (list_data);
1175   list_data->devinfo[list_data->count++] = devinfo;
1176 
1177   WRAP(pa_threaded_mainloop_signal)(list_data->context->mainloop, 0);
1178 }
1179 
1180 static void
1181 pulse_server_info_cb(pa_context * c, const pa_server_info * i, void * userdata)
1182 {
1183   pulse_dev_list_data * list_data = userdata;
1184 
1185   (void)c;
1186 
1187   free(list_data->default_sink_name);
1188   free(list_data->default_source_name);
1189   list_data->default_sink_name = strdup(i->default_sink_name);
1190   list_data->default_source_name = strdup(i->default_source_name);
1191 
1192   WRAP(pa_threaded_mainloop_signal)(list_data->context->mainloop, 0);
1193 }
1194 
1195 static int
1196 pulse_enumerate_devices(cubeb * context, cubeb_device_type type,
1197                         cubeb_device_collection ** collection)
1198 {
1199   pulse_dev_list_data user_data = { NULL, NULL, NULL, 0, 0, context };
1200   pa_operation * o;
1201   uint32_t i;
1202 
1203   WRAP(pa_threaded_mainloop_lock)(context->mainloop);
1204 
1205   o = WRAP(pa_context_get_server_info)(context->context,
1206       pulse_server_info_cb, &user_data);
1207   if (o) {
1208     operation_wait(context, NULL, o);
1209     WRAP(pa_operation_unref)(o);
1210   }
1211 
1212   if (type & CUBEB_DEVICE_TYPE_OUTPUT) {
1213     o = WRAP(pa_context_get_sink_info_list)(context->context,
1214         pulse_sink_info_cb, &user_data);
1215     if (o) {
1216       operation_wait(context, NULL, o);
1217       WRAP(pa_operation_unref)(o);
1218     }
1219   }
1220 
1221   if (type & CUBEB_DEVICE_TYPE_INPUT) {
1222     o = WRAP(pa_context_get_source_info_list)(context->context,
1223         pulse_source_info_cb, &user_data);
1224     if (o) {
1225       operation_wait(context, NULL, o);
1226       WRAP(pa_operation_unref)(o);
1227     }
1228   }
1229 
1230   WRAP(pa_threaded_mainloop_unlock)(context->mainloop);
1231 
1232   *collection = malloc(sizeof(cubeb_device_collection) +
1233       sizeof(cubeb_device_info *) * (user_data.count > 0 ? user_data.count - 1 : 0));
1234   (*collection)->count = user_data.count;
1235   for (i = 0; i < user_data.count; i++)
1236     (*collection)->device[i] = user_data.devinfo[i];
1237 
1238   free(user_data.default_sink_name);
1239   free(user_data.default_source_name);
1240   free(user_data.devinfo);
1241   return CUBEB_OK;
1242 }
1243 
1244 static int
1245 pulse_stream_get_current_device(cubeb_stream * stm, cubeb_device ** const device)
1246 {
1247 #if PA_CHECK_VERSION(0, 9, 8)
1248   *device = calloc(1, sizeof(cubeb_device));
1249   if (*device == NULL)
1250     return CUBEB_ERROR;
1251 
1252   if (stm->input_stream) {
1253     const char * name = WRAP(pa_stream_get_device_name)(stm->input_stream);
1254     (*device)->input_name = (name == NULL) ? NULL : strdup(name);
1255   }
1256 
1257   if (stm->output_stream) {
1258     const char * name = WRAP(pa_stream_get_device_name)(stm->output_stream);
1259     (*device)->output_name = (name == NULL) ? NULL : strdup(name);
1260   }
1261 
1262   return CUBEB_OK;
1263 #else
1264   return CUBEB_ERROR_NOT_SUPPORTED;
1265 #endif
1266 }
1267 
1268 static int
1269 pulse_stream_device_destroy(cubeb_stream * stream,
1270                             cubeb_device * device)
1271 {
1272   (void)stream;
1273   free(device->input_name);
1274   free(device->output_name);
1275   free(device);
1276   return CUBEB_OK;
1277 }
1278 
1279 static void
1280 pulse_subscribe_callback(pa_context * ctx,
1281                          pa_subscription_event_type_t t,
1282                          uint32_t index, void * userdata)
1283 {
1284   (void)ctx;
1285   cubeb * context = userdata;
1286 
1287   switch (t & PA_SUBSCRIPTION_EVENT_FACILITY_MASK) {
1288   case PA_SUBSCRIPTION_EVENT_SOURCE:
1289   case PA_SUBSCRIPTION_EVENT_SINK:
1290 
1291     if (g_log_level) {
1292       if ((t & PA_SUBSCRIPTION_EVENT_FACILITY_MASK) == PA_SUBSCRIPTION_EVENT_SOURCE &&
1293           (t & PA_SUBSCRIPTION_EVENT_TYPE_MASK) == PA_SUBSCRIPTION_EVENT_REMOVE) {
1294         LOG("Removing sink index %d", index);
1295       } else if ((t & PA_SUBSCRIPTION_EVENT_FACILITY_MASK) == PA_SUBSCRIPTION_EVENT_SOURCE &&
1296           (t & PA_SUBSCRIPTION_EVENT_TYPE_MASK) == PA_SUBSCRIPTION_EVENT_NEW) {
1297         LOG("Adding sink index %d", index);
1298       }
1299       if ((t & PA_SUBSCRIPTION_EVENT_FACILITY_MASK) == PA_SUBSCRIPTION_EVENT_SINK &&
1300           (t & PA_SUBSCRIPTION_EVENT_TYPE_MASK) == PA_SUBSCRIPTION_EVENT_REMOVE) {
1301         LOG("Removing source index %d", index);
1302       } else if ((t & PA_SUBSCRIPTION_EVENT_FACILITY_MASK) == PA_SUBSCRIPTION_EVENT_SINK &&
1303           (t & PA_SUBSCRIPTION_EVENT_TYPE_MASK) == PA_SUBSCRIPTION_EVENT_NEW) {
1304         LOG("Adding source index %d", index);
1305       }
1306     }
1307 
1308     if ((t & PA_SUBSCRIPTION_EVENT_TYPE_MASK) == PA_SUBSCRIPTION_EVENT_REMOVE ||
1309         (t & PA_SUBSCRIPTION_EVENT_TYPE_MASK) == PA_SUBSCRIPTION_EVENT_NEW) {
1310       context->collection_changed_callback(context, context->collection_changed_user_ptr);
1311     }
1312     break;
1313   }
1314 }
1315 
1316 static void
1317 subscribe_success(pa_context *c, int success, void *userdata)
1318 {
1319   (void)c;
1320   cubeb * context = userdata;
1321   assert(success);
1322   WRAP(pa_threaded_mainloop_signal)(context->mainloop, 0);
1323 }
1324 
1325 static int
1326 pulse_register_device_collection_changed(cubeb * context,
1327                                          cubeb_device_type devtype,
1328                                          cubeb_device_collection_changed_callback collection_changed_callback,
1329                                          void * user_ptr)
1330 {
1331   context->collection_changed_callback = collection_changed_callback;
1332   context->collection_changed_user_ptr = user_ptr;
1333 
1334   WRAP(pa_threaded_mainloop_lock)(context->mainloop);
1335 
1336   pa_subscription_mask_t mask;
1337   if (context->collection_changed_callback == NULL) {
1338     // Unregister subscription
1339     WRAP(pa_context_set_subscribe_callback)(context->context, NULL, NULL);
1340     mask = PA_SUBSCRIPTION_MASK_NULL;
1341   } else {
1342     WRAP(pa_context_set_subscribe_callback)(context->context, pulse_subscribe_callback, context);
1343     if (devtype == CUBEB_DEVICE_TYPE_INPUT)
1344       mask = PA_SUBSCRIPTION_MASK_SOURCE;
1345     else if (devtype == CUBEB_DEVICE_TYPE_OUTPUT)
1346       mask = PA_SUBSCRIPTION_MASK_SINK;
1347     else
1348       mask = PA_SUBSCRIPTION_MASK_SINK | PA_SUBSCRIPTION_MASK_SOURCE;
1349   }
1350 
1351   pa_operation * o;
1352   o = WRAP(pa_context_subscribe)(context->context, mask, subscribe_success, context);
1353   if (o == NULL) {
1354     LOG("Context subscribe failed");
1355     return CUBEB_ERROR;
1356   }
1357   operation_wait(context, NULL, o);
1358   WRAP(pa_operation_unref)(o);
1359 
1360   WRAP(pa_threaded_mainloop_unlock)(context->mainloop);
1361 
1362   return CUBEB_OK;
1363 }
1364 
1365 static struct cubeb_ops const pulse_ops = {
1366   .init = pulse_init,
1367   .get_backend_id = pulse_get_backend_id,
1368   .get_max_channel_count = pulse_get_max_channel_count,
1369   .get_min_latency = pulse_get_min_latency,
1370   .get_preferred_sample_rate = pulse_get_preferred_sample_rate,
1371   .enumerate_devices = pulse_enumerate_devices,
1372   .destroy = pulse_destroy,
1373   .stream_init = pulse_stream_init,
1374   .stream_destroy = pulse_stream_destroy,
1375   .stream_start = pulse_stream_start,
1376   .stream_stop = pulse_stream_stop,
1377   .stream_get_position = pulse_stream_get_position,
1378   .stream_get_latency = pulse_stream_get_latency,
1379   .stream_set_volume = pulse_stream_set_volume,
1380   .stream_set_panning = pulse_stream_set_panning,
1381   .stream_get_current_device = pulse_stream_get_current_device,
1382   .stream_device_destroy = pulse_stream_device_destroy,
1383   .stream_register_device_changed_callback = NULL,
1384   .register_device_collection_changed = pulse_register_device_collection_changed
1385 };
1386