1 /*
2 * MOC - music on console
3 * Copyright (C) 2005 Damian Pietras <daper@daper.net>
4 *
5 * This program is free software; you can redistribute it and/or modify
6 * it under the terms of the GNU General Public License as published by
7 * the Free Software Foundation; either version 2 of the License, or
8 * (at your option) any later version.
9 *
10 */
11
12 /* TODO:
13 * - handle SIGBUS (mmap() read error)
14 */
15
16 #ifdef HAVE_CONFIG_H
17 # include "config.h"
18 #endif
19
20 #include <stdlib.h>
21 #include <sys/types.h>
22 #include <sys/stat.h>
23 #include <unistd.h>
24 #include <fcntl.h>
25 #include <errno.h>
26 #include <string.h>
27 #include <strings.h>
28 #include <assert.h>
29 #include <pthread.h>
30 #include <inttypes.h>
31
32 #ifdef HAVE_MMAP
33 # include <sys/mman.h>
34 #endif
35
36 /*#define DEBUG*/
37
38 #include "common.h"
39 #include "log.h"
40 #include "io.h"
41 #include "options.h"
42 #include "files.h"
43 #ifdef HAVE_CURL
44 # include "io_curl.h"
45 #endif
46 #include "compat.h"
47
48 #ifdef HAVE_MMAP
io_read_mmap(struct io_stream * s,const int dont_move,void * buf,size_t count)49 static ssize_t io_read_mmap (struct io_stream *s, const int dont_move,
50 void *buf, size_t count)
51 {
52 struct stat file_stat;
53 size_t to_read;
54
55 assert (s->mem != NULL);
56
57 if (fstat(s->fd, &file_stat) == -1) {
58 logit ("fstat() failed: %s", strerror(errno));
59 return -1;
60 }
61
62 if (s->size != file_stat.st_size) {
63 logit ("File size has changed");
64
65 if (munmap (s->mem, (size_t)s->size)) {
66 logit ("munmap() failed: %s", strerror(errno));
67 return -1;
68 }
69
70 s->size = file_stat.st_size;
71 s->mem = NULL;
72
73 if (s->size < 1 || (uint64_t)s->size > SIZE_MAX) {
74 logit ("File size unsuitable for mmap()");
75 return -1;
76 }
77
78 s->mem = mmap (0, (size_t)s->size, PROT_READ, MAP_SHARED, s->fd, 0);
79 if (s->mem == MAP_FAILED) {
80 s->mem = NULL;
81 logit ("mmap() failed: %s", strerror(errno));
82 return -1;
83 }
84
85 logit ("mmap()ed %"PRId64" bytes", s->size);
86 if (s->mem_pos > s->size) {
87 logit ("File shrunk");
88 return 0;
89 }
90 }
91
92 if (s->mem_pos >= s->size)
93 return 0;
94
95 to_read = MIN(count, (size_t) (s->size - s->mem_pos));
96 memcpy (buf, (char *)s->mem + s->mem_pos, to_read);
97
98 if (!dont_move)
99 s->mem_pos += to_read;
100
101 return to_read;
102 }
103 #endif
104
io_read_fd(struct io_stream * s,const int dont_move,void * buf,size_t count)105 static ssize_t io_read_fd (struct io_stream *s, const int dont_move, void *buf,
106 size_t count)
107 {
108 ssize_t res;
109
110 res = read (s->fd, buf, count);
111
112 if (res < 0)
113 return -1;
114
115 if (dont_move && lseek(s->fd, -res, SEEK_CUR) < 0)
116 return -1;
117
118 return res;
119 }
120
121 /* Read the data from the stream resource. If dont_move was set, the stream
122 * position is unchanged. */
io_internal_read(struct io_stream * s,const int dont_move,char * buf,size_t count)123 static ssize_t io_internal_read (struct io_stream *s, const int dont_move,
124 char *buf, size_t count)
125 {
126 ssize_t res = 0;
127
128 assert (s != NULL);
129 assert (buf != NULL);
130
131 #ifdef HAVE_MMAP
132 if (s->source == IO_SOURCE_MMAP)
133 res = io_read_mmap (s, dont_move, buf, count);
134 else
135 #endif
136 #ifdef HAVE_CURL
137 if (s->source == IO_SOURCE_CURL) {
138 if (dont_move)
139 fatal ("You can't peek data directly from CURL!");
140 res = io_curl_read (s, buf, count);
141 }
142 else
143 #endif
144 if (s->source == IO_SOURCE_FD)
145 res = io_read_fd (s, dont_move, buf, count);
146 else
147 fatal ("Unknown io_stream->source: %d", s->source);
148
149 return res;
150 }
151
152 #ifdef HAVE_MMAP
io_seek_mmap(struct io_stream * s,const off_t where)153 static off_t io_seek_mmap (struct io_stream *s, const off_t where)
154 {
155 return (s->mem_pos = where);
156 }
157 #endif
158
io_seek_fd(struct io_stream * s,const off_t where)159 static off_t io_seek_fd (struct io_stream *s, const off_t where)
160 {
161 return lseek (s->fd, where, SEEK_SET);
162 }
163
io_seek_buffered(struct io_stream * s,const off_t where)164 static off_t io_seek_buffered (struct io_stream *s, const off_t where)
165 {
166 off_t res = -1;
167
168 logit ("Seeking...");
169
170 #ifdef HAVE_MMAP
171 if (s->source == IO_SOURCE_MMAP)
172 res = io_seek_mmap (s, where);
173 else
174 #endif
175 if (s->source == IO_SOURCE_FD)
176 res = io_seek_fd (s, where);
177 else
178 fatal ("Unknown io_stream->source: %d", s->source);
179
180 LOCK (s->buf_mutex);
181 fifo_buf_clear (&s->buf);
182 pthread_cond_signal (&s->buf_free_cond);
183 s->after_seek = 1;
184 s->eof = 0;
185 UNLOCK (s->buf_mutex);
186
187 return res;
188 }
189
io_seek_unbuffered(struct io_stream * s,const off_t where)190 static off_t io_seek_unbuffered (struct io_stream *s, const off_t where)
191 {
192 off_t res = -1;
193
194 #ifdef HAVE_MMAP
195 if (s->source == IO_SOURCE_MMAP)
196 res = io_seek_mmap (s, where);
197 #endif
198 if (s->source == IO_SOURCE_FD)
199 res = io_seek_fd (s, where);
200
201 return res;
202 }
203
io_seek(struct io_stream * s,off_t offset,int whence)204 off_t io_seek (struct io_stream *s, off_t offset, int whence)
205 {
206 off_t res, new_pos = 0;
207
208 assert (s != NULL);
209 assert (s->opened);
210
211 if (s->source == IO_SOURCE_CURL || !io_ok(s))
212 return -1;
213
214 LOCK (s->io_mutex);
215 switch (whence) {
216 case SEEK_SET:
217 new_pos = offset;
218 break;
219 case SEEK_CUR:
220 new_pos = s->pos + offset;
221 break;
222 case SEEK_END:
223 new_pos = s->size + offset;
224 break;
225 default:
226 fatal ("Bad whence value: %d", whence);
227 }
228
229 new_pos = CLAMP(0, new_pos, s->size);
230
231 if (s->buffered)
232 res = io_seek_buffered (s, new_pos);
233 else
234 res = io_seek_unbuffered (s, new_pos);
235
236 if (res != -1)
237 s->pos = res;
238 UNLOCK (s->io_mutex);
239
240 if (res != -1)
241 debug ("Seek to: %"PRId64, res);
242 else
243 logit ("Seek error");
244
245 return res;
246 }
247
248 /* Wake up the IO reading thread. */
io_wake_up(struct io_stream * s ATTR_UNUSED)249 static void io_wake_up (struct io_stream *s ATTR_UNUSED)
250 {
251 #ifdef HAVE_CURL
252 if (s->source == IO_SOURCE_CURL)
253 io_curl_wake_up (s);
254 #endif
255 }
256
257 /* Abort an IO operation from another thread. */
io_abort(struct io_stream * s)258 void io_abort (struct io_stream *s)
259 {
260 assert (s != NULL);
261
262 if (s->buffered && !s->stop_read_thread) {
263 logit ("Aborting...");
264 LOCK (s->buf_mutex);
265 s->stop_read_thread = 1;
266 io_wake_up (s);
267 pthread_cond_broadcast (&s->buf_fill_cond);
268 pthread_cond_broadcast (&s->buf_free_cond);
269 UNLOCK (s->buf_mutex);
270 logit ("done");
271 }
272 }
273
274 /* Close the stream and free all resources associated with it. */
io_close(struct io_stream * s)275 void io_close (struct io_stream *s)
276 {
277 int rc;
278
279 assert (s != NULL);
280
281 logit ("Closing stream...");
282
283 if (s->opened) {
284
285 if (s->buffered) {
286 io_abort (s);
287
288 logit ("Waiting for io_read_thread()...");
289 pthread_join (s->read_thread, NULL);
290 logit ("IO read thread exited");
291 }
292
293 #ifdef HAVE_MMAP
294 if (s->source == IO_SOURCE_MMAP) {
295 if (s->mem && munmap (s->mem, (size_t)s->size))
296 logit ("munmap() failed: %s", strerror(errno));
297 close (s->fd);
298 }
299 #endif
300
301 #ifdef HAVE_CURL
302 if (s->source == IO_SOURCE_CURL)
303 io_curl_close (s);
304 #endif
305
306 if (s->source == IO_SOURCE_FD)
307 close (s->fd);
308
309 s->opened = 0;
310
311 if (s->buffered) {
312 fifo_buf_destroy (&s->buf);
313 rc = pthread_cond_destroy (&s->buf_free_cond);
314 if (rc != 0)
315 logit ("Destroying buf_free_cond failed: %s", strerror (rc));
316 rc = pthread_cond_destroy (&s->buf_fill_cond);
317 if (rc != 0)
318 logit ("Destroying buf_fill_cond failed: %s", strerror (rc));
319 }
320
321 if (s->metadata.title)
322 free (s->metadata.title);
323 if (s->metadata.url)
324 free (s->metadata.url);
325 }
326
327 rc = pthread_mutex_destroy (&s->buf_mutex);
328 if (rc != 0)
329 logit ("Destroying buf_mutex failed: %s", strerror (rc));
330 rc = pthread_mutex_destroy (&s->io_mutex);
331 if (rc != 0)
332 logit ("Destroying io_mutex failed: %s", strerror (rc));
333 rc = pthread_mutex_destroy (&s->metadata.mutex);
334 if (rc != 0)
335 logit ("Destroying metadata mutex failed: %s", strerror (rc));
336
337 if (s->strerror)
338 free (s->strerror);
339 free (s);
340
341 logit ("done");
342 }
343
io_read_thread(void * data)344 static void *io_read_thread (void *data)
345 {
346 struct io_stream *s = (struct io_stream *)data;
347
348 logit ("IO read thread created");
349
350 while (!s->stop_read_thread) {
351 char read_buf[8096];
352 int read_buf_fill = 0;
353 int read_buf_pos = 0;
354
355 LOCK (s->io_mutex);
356 debug ("Reading...");
357
358 LOCK (s->buf_mutex);
359 s->after_seek = 0;
360 UNLOCK (s->buf_mutex);
361
362 read_buf_fill = io_internal_read (s, 0, read_buf, sizeof(read_buf));
363 UNLOCK (s->io_mutex);
364 debug ("Read %d bytes", read_buf_fill);
365
366 LOCK (s->buf_mutex);
367
368 if (s->stop_read_thread) {
369 UNLOCK (s->buf_mutex);
370 break;
371 }
372
373 if (read_buf_fill < 0) {
374 s->errno_val = errno;
375 s->read_error = 1;
376 logit ("Exiting due to read error.");
377 pthread_cond_broadcast (&s->buf_fill_cond);
378 UNLOCK (s->buf_mutex);
379 break;
380 }
381
382 if (read_buf_fill == 0) {
383 s->eof = 1;
384 debug ("EOF, waiting");
385 pthread_cond_broadcast (&s->buf_fill_cond);
386 pthread_cond_wait (&s->buf_free_cond, &s->buf_mutex);
387 debug ("Got signal");
388 UNLOCK (s->buf_mutex);
389 continue;
390 }
391
392 s->eof = 0;
393
394 while (read_buf_pos < read_buf_fill && !s->after_seek) {
395 size_t put;
396
397 debug ("Buffer fill: %zu", fifo_buf_get_fill (&s->buf));
398
399 put = fifo_buf_put (&s->buf,
400 read_buf + read_buf_pos,
401 read_buf_fill - read_buf_pos);
402
403 if (s->stop_read_thread)
404 break;
405
406 if (put > 0) {
407 debug ("Put %zu bytes into the buffer", put);
408 if (s->buf_fill_callback) {
409 UNLOCK (s->buf_mutex);
410 s->buf_fill_callback (s,
411 fifo_buf_get_fill(&s->buf),
412 fifo_buf_get_size(&s->buf),
413 s->buf_fill_callback_data);
414 LOCK (s->buf_mutex);
415 }
416 pthread_cond_broadcast (&s->buf_fill_cond);
417 read_buf_pos += put;
418 }
419 else {
420 debug ("The buffer is full, waiting.");
421 pthread_cond_wait (&s->buf_free_cond,
422 &s->buf_mutex);
423 debug ("Some space in the buffer was freed");
424 }
425 }
426
427 UNLOCK (s->buf_mutex);
428 }
429
430 if (s->stop_read_thread)
431 logit ("Stop request");
432
433 logit ("Exiting IO read thread");
434
435 return NULL;
436 }
437
io_open_file(struct io_stream * s,const char * file)438 static void io_open_file (struct io_stream *s, const char *file)
439 {
440 struct stat file_stat;
441
442 s->source = IO_SOURCE_FD;
443
444 if ((s->fd = open(file, O_RDONLY)) == -1)
445 s->errno_val = errno;
446 else if (fstat(s->fd, &file_stat) == -1) {
447 s->errno_val = errno;
448 close(s->fd);
449 }
450 else {
451
452 s->size = file_stat.st_size;
453
454 #ifdef HAVE_MMAP
455 if (options_get_int ("UseMMap") &&
456 s->size > 0 && (uint64_t)s->size <= SIZE_MAX) {
457 s->mem = mmap (0, (size_t)s->size, PROT_READ, MAP_SHARED, s->fd, 0);
458 if (s->mem == MAP_FAILED) {
459 s->mem = NULL;
460 logit ("mmap() failed: %s", strerror(errno));
461 }
462 else {
463 logit ("mmap()ed %"PRId64" bytes", s->size);
464 s->source = IO_SOURCE_MMAP;
465 s->mem_pos = 0;
466 }
467 }
468 else {
469 logit ("Not using mmap()");
470 }
471 #endif
472
473 s->opened = 1;
474 }
475 }
476
477 /* Open the file. */
io_open(const char * file,const int buffered)478 struct io_stream *io_open (const char *file, const int buffered)
479 {
480 int rc;
481 struct io_stream *s;
482
483 assert (file != NULL);
484
485 s = xmalloc (sizeof(struct io_stream));
486 s->errno_val = 0;
487 s->read_error = 0;
488 s->strerror = NULL;
489 s->opened = 0;
490 s->size = -1;
491 s->buf_fill_callback = NULL;
492 memset (&s->metadata, 0, sizeof(s->metadata));
493
494 #ifdef HAVE_CURL
495 s->curl.mime_type = NULL;
496 if (is_url (file))
497 io_curl_open (s, file);
498 else
499 #endif
500 io_open_file (s, file);
501
502 pthread_mutex_init (&s->buf_mutex, NULL);
503 pthread_mutex_init (&s->io_mutex, NULL);
504 pthread_mutex_init (&s->metadata.mutex, NULL);
505
506 if (!s->opened)
507 return s;
508
509 s->stop_read_thread = 0;
510 s->eof = 0;
511 s->after_seek = 0;
512 s->buffered = buffered;
513 s->pos = 0;
514
515 if (buffered) {
516 fifo_buf_init (&s->buf, options_get_int("InputBuffer") * 1024);
517 s->prebuffer = options_get_int("Prebuffering") * 1024;
518
519 pthread_cond_init (&s->buf_free_cond, NULL);
520 pthread_cond_init (&s->buf_fill_cond, NULL);
521
522 rc = pthread_create (&s->read_thread, NULL, io_read_thread, s);
523 if (rc != 0)
524 fatal ("Can't create read thread: %s", strerror (rc));
525 }
526
527 return s;
528 }
529
530 /* Return non-zero if the stream was free of errors. */
io_ok_nolock(struct io_stream * s)531 static int io_ok_nolock (struct io_stream *s)
532 {
533 return !s->read_error && s->errno_val == 0;
534 }
535
536 /* Return non-zero if the stream was free of errors. */
io_ok(struct io_stream * s)537 int io_ok (struct io_stream *s)
538 {
539 int res;
540
541 LOCK (s->buf_mutex);
542 res = io_ok_nolock (s);
543 UNLOCK (s->buf_mutex);
544
545 return res;
546 }
547
548 /* Read data from the buffer without removing them, so stream position is
549 * unchanged. You can't peek more data than the buffer size. */
io_peek_internal(struct io_stream * s,void * buf,size_t count)550 static ssize_t io_peek_internal (struct io_stream *s, void *buf, size_t count)
551 {
552 ssize_t received = 0;
553
554 debug ("Peeking data...");
555
556 LOCK (s->buf_mutex);
557
558 /* Wait until enough data will be available */
559 while (io_ok_nolock(s) && !s->stop_read_thread
560 && count > fifo_buf_get_fill(&s->buf)
561 && fifo_buf_get_space (&s->buf)
562 && !s->eof) {
563 debug ("waiting...");
564 pthread_cond_wait (&s->buf_fill_cond, &s->buf_mutex);
565 }
566
567 received = fifo_buf_peek (&s->buf, buf, count);
568 debug ("Read %zd bytes", received);
569
570 UNLOCK (s->buf_mutex);
571
572 return io_ok(s) ? received : -1;
573 }
574
575 /* Wait until there will be s->prebuffer bytes in the buffer or some event
576 * occurs which prevents prebuffering. */
io_prebuffer(struct io_stream * s,const size_t to_fill)577 void io_prebuffer (struct io_stream *s, const size_t to_fill)
578 {
579 logit ("prebuffering to %zu bytes...", to_fill);
580
581 LOCK (s->buf_mutex);
582 while (io_ok_nolock(s) && !s->stop_read_thread && !s->eof
583 && to_fill > fifo_buf_get_fill(&s->buf)) {
584 debug ("waiting (buffer %zu bytes full)", fifo_buf_get_fill (&s->buf));
585 pthread_cond_wait (&s->buf_fill_cond, &s->buf_mutex);
586 }
587 UNLOCK (s->buf_mutex);
588
589 logit ("done");
590 }
591
io_read_buffered(struct io_stream * s,void * buf,size_t count)592 static ssize_t io_read_buffered (struct io_stream *s, void *buf, size_t count)
593 {
594 ssize_t received = 0;
595
596 LOCK (s->buf_mutex);
597
598 while (received < (ssize_t)count && !s->stop_read_thread
599 && ((!s->eof && !s->read_error)
600 || fifo_buf_get_fill(&s->buf))) {
601 if (fifo_buf_get_fill(&s->buf)) {
602 received += fifo_buf_get (&s->buf, (char *)buf + received,
603 count - received);
604 debug ("Read %zd bytes so far", received);
605 pthread_cond_signal (&s->buf_free_cond);
606 }
607 else {
608 debug ("Buffer empty, waiting...");
609 pthread_cond_wait (&s->buf_fill_cond, &s->buf_mutex);
610 }
611 }
612
613 debug ("done");
614 s->pos += received;
615
616 UNLOCK (s->buf_mutex);
617
618 return received ? received : (s->read_error ? -1 : 0);
619 }
620
621 /* Read data from the stream without buffering. If dont_move was set, the
622 * stream position is unchanged. */
io_read_unbuffered(struct io_stream * s,const int dont_move,void * buf,size_t count)623 static ssize_t io_read_unbuffered (struct io_stream *s, const int dont_move,
624 void *buf, size_t count)
625 {
626 ssize_t res;
627
628 assert (!s->eof);
629
630 res = io_internal_read (s, dont_move, buf, count);
631
632 if (!dont_move) {
633 s->pos += res;
634 if (res == 0)
635 s->eof = 1;
636 }
637
638 return res;
639 }
640
641 /* Read data from the stream to the buffer of size count. Return the number
642 * of bytes read, 0 on EOF, < 0 on error. */
io_read(struct io_stream * s,void * buf,size_t count)643 ssize_t io_read (struct io_stream *s, void *buf, size_t count)
644 {
645 ssize_t received;
646
647 assert (s != NULL);
648 assert (buf != NULL);
649 assert (s->opened);
650
651 debug ("Reading...");
652
653 if (s->buffered)
654 received = io_read_buffered (s, buf, count);
655 else if (s->eof)
656 received = 0;
657 else
658 received = io_read_unbuffered (s, 0, buf, count);
659
660 return received;
661 }
662
663 /* Read data from the stream to the buffer of size count. The data are not
664 * removed from the stream. Return the number of bytes read, 0 on EOF, < 0
665 * on error. */
io_peek(struct io_stream * s,void * buf,size_t count)666 ssize_t io_peek (struct io_stream *s, void *buf, size_t count)
667 {
668 ssize_t received;
669
670 assert (s != NULL);
671 assert (buf != NULL);
672
673 debug ("Reading...");
674
675 if (s->buffered)
676 received = io_peek_internal (s, buf, count);
677 else
678 received = io_read_unbuffered (s, 1, buf, count);
679
680 return io_ok(s) ? received : -1;
681 }
682
683 /* Get the string describing the error associated with the stream. */
io_strerror(struct io_stream * s)684 char *io_strerror (struct io_stream *s)
685 {
686 char err[256];
687
688 if (s->strerror)
689 free (s->strerror);
690
691 #ifdef HAVE_CURL
692 if (s->source == IO_SOURCE_CURL)
693 io_curl_strerror (s);
694 else
695 #endif
696 if (s->errno_val) {
697 strerror_r (s->errno_val, err, sizeof(err));
698 s->strerror = xstrdup (err);
699 }
700 else
701 s->strerror = xstrdup ("OK");
702
703 return s->strerror;
704 }
705
706 /* Get the file size if available or -1. */
io_file_size(const struct io_stream * s)707 off_t io_file_size (const struct io_stream *s)
708 {
709 assert (s != NULL);
710
711 return s->size;
712 }
713
714 /* Return the stream position. */
io_tell(struct io_stream * s)715 off_t io_tell (struct io_stream *s)
716 {
717 off_t res = -1;
718
719 assert (s != NULL);
720
721 if (s->buffered) {
722 LOCK (s->buf_mutex);
723 res = s->pos;
724 UNLOCK (s->buf_mutex);
725 }
726 else
727 res = s->pos;
728
729 debug ("We are at byte %"PRId64, res);
730
731 return res;
732 }
733
734 /* Return != 0 if we are at the end of the stream. */
io_eof(struct io_stream * s)735 int io_eof (struct io_stream *s)
736 {
737 int eof;
738
739 assert (s != NULL);
740
741 LOCK (s->buf_mutex);
742 eof = (s->eof && (!s->buffered || !fifo_buf_get_fill(&s->buf))) ||
743 s->stop_read_thread;
744 UNLOCK (s->buf_mutex);
745
746 return eof;
747 }
748
io_init()749 void io_init ()
750 {
751 #ifdef HAVE_CURL
752 io_curl_init ();
753 #endif
754 }
755
io_cleanup()756 void io_cleanup ()
757 {
758 #ifdef HAVE_CURL
759 io_curl_cleanup ();
760 #endif
761 }
762
763 /* Return the mime type if available or NULL.
764 * The mime type is read by curl only after the first read (or peek), until
765 * then it's NULL. */
io_get_mime_type(struct io_stream * s ATTR_UNUSED)766 char *io_get_mime_type (struct io_stream *s ATTR_UNUSED)
767 {
768 #ifdef HAVE_CURL
769 return s->curl.mime_type;
770 #else
771 return NULL;
772 #endif
773 }
774
775 /* Return the malloc()ed stream title if available or NULL. */
io_get_metadata_title(struct io_stream * s)776 char *io_get_metadata_title (struct io_stream *s)
777 {
778 char *t;
779
780 LOCK (s->metadata.mutex);
781 t = xstrdup (s->metadata.title);
782 UNLOCK (s->metadata.mutex);
783
784 return t;
785 }
786
787 /* Return the malloc()ed stream url (from metadata) if available or NULL. */
io_get_metadata_url(struct io_stream * s)788 char *io_get_metadata_url (struct io_stream *s)
789 {
790 char *t;
791
792 LOCK (s->metadata.mutex);
793 t = xstrdup (s->metadata.url);
794 UNLOCK (s->metadata.mutex);
795
796 return t;
797 }
798
799 /* Set the metadata title of the stream. */
io_set_metadata_title(struct io_stream * s,const char * title)800 void io_set_metadata_title (struct io_stream *s, const char *title)
801 {
802 LOCK (s->metadata.mutex);
803 if (s->metadata.title)
804 free (s->metadata.title);
805 s->metadata.title = xstrdup (title);
806 UNLOCK (s->metadata.mutex);
807 }
808
809 /* Set the metadata url for the stream. */
io_set_metadata_url(struct io_stream * s,const char * url)810 void io_set_metadata_url (struct io_stream *s, const char *url)
811 {
812 LOCK (s->metadata.mutex);
813 if (s->metadata.url)
814 free (s->metadata.url);
815 s->metadata.url = xstrdup (url);
816 UNLOCK (s->metadata.mutex);
817 }
818
819 /* Set the callback function to be invoked when the fill of the buffer
820 * changes. data_ptr is a pointer passed to this function along with
821 * the pointer to the stream. */
io_set_buf_fill_callback(struct io_stream * s,buf_fill_callback_t callback,void * data_ptr)822 void io_set_buf_fill_callback (struct io_stream *s,
823 buf_fill_callback_t callback, void *data_ptr)
824 {
825 assert (s != NULL);
826 assert (callback != NULL);
827
828 LOCK (s->buf_mutex);
829 s->buf_fill_callback = callback;
830 s->buf_fill_callback_data = data_ptr;
831 UNLOCK (s->buf_mutex);
832 }
833
834 /* Return a non-zero value if the stream is seekable. */
io_seekable(const struct io_stream * s)835 int io_seekable (const struct io_stream *s)
836 {
837 return s->source == IO_SOURCE_FD || s->source == IO_SOURCE_MMAP;
838 }
839