1 /* Icecast
2 *
3 * This program is distributed under the GNU General Public License, version 2.
4 * A copy of this license is included with this source.
5 *
6 * Copyright 2000-2004, Jack Moffitt <jack@xiph.org,
7 * Michael Smith <msmith@xiph.org>,
8 * oddsock <oddsock@xiph.org>,
9 * Karl Heyes <karl@xiph.org>
10 * and others (see AUTHORS for details).
11 */
12
13 /* -*- c-basic-offset: 4; indent-tabs-mode: nil; -*- */
14 #ifdef HAVE_CONFIG_H
15 #include <config.h>
16 #endif
17
18 #include <stdio.h>
19 #include <stdlib.h>
20 #include <string.h>
21 #include <sys/types.h>
22 #include <ogg/ogg.h>
23 #include <errno.h>
24
25 #ifndef _WIN32
26 #include <unistd.h>
27 #include <sys/time.h>
28 #include <sys/socket.h>
29 #include <sys/wait.h>
30 #include <limits.h>
31 #ifndef PATH_MAX
32 #define PATH_MAX 4096
33 #endif
34 #else
35 #include <winsock2.h>
36 #include <windows.h>
37 #define snprintf _snprintf
38 #endif
39
40 #ifndef _WIN32
41 /* for __setup_empty_script_environment() */
42 #include <sys/stat.h>
43 #include <fcntl.h>
44 #endif
45
46 #include "thread/thread.h"
47 #include "avl/avl.h"
48 #include "httpp/httpp.h"
49 #include "net/sock.h"
50
51 #include "connection.h"
52 #include "global.h"
53 #include "refbuf.h"
54 #include "client.h"
55 #include "stats.h"
56 #include "logging.h"
57 #include "cfgfile.h"
58 #include "util.h"
59 #include "source.h"
60 #include "format.h"
61 #include "fserve.h"
62 #include "auth.h"
63 #include "compat.h"
64
65 #undef CATMODULE
66 #define CATMODULE "source"
67
68 #define MAX_FALLBACK_DEPTH 10
69
70 mutex_t move_clients_mutex;
71
72 /* avl tree helper */
73 static int _compare_clients(void *compare_arg, void *a, void *b);
74 static int _free_client(void *key);
75 static void _parse_audio_info (source_t *source, const char *s);
76 static void source_shutdown (source_t *source);
77 #ifdef _WIN32
78 #define source_run_script(x,y) ICECAST_LOG_WARN("on [dis]connect scripts disabled");
79 #else
80 static void source_run_script (char *command, char *mountpoint);
81 #endif
82
83 /* Allocate a new source with the stated mountpoint, if one already
84 * exists with that mountpoint in the global source tree then return
85 * NULL.
86 */
source_reserve(const char * mount)87 source_t *source_reserve (const char *mount)
88 {
89 source_t *src = NULL;
90
91 if(mount[0] != '/')
92 ICECAST_LOG_WARN("Source at \"%s\" does not start with '/', clients will be "
93 "unable to connect", mount);
94
95 do
96 {
97 avl_tree_wlock (global.source_tree);
98 src = source_find_mount_raw (mount);
99 if (src)
100 {
101 src = NULL;
102 break;
103 }
104
105 src = calloc (1, sizeof(source_t));
106 if (src == NULL)
107 break;
108
109 src->client_tree = avl_tree_new(_compare_clients, NULL);
110 src->pending_tree = avl_tree_new(_compare_clients, NULL);
111
112 /* make duplicates for strings or similar */
113 src->mount = strdup (mount);
114 src->max_listeners = -1;
115 thread_mutex_create(&src->lock);
116
117 avl_insert (global.source_tree, src);
118
119 } while (0);
120
121 avl_tree_unlock (global.source_tree);
122 return src;
123 }
124
125
126 /* Find a mount with this raw name - ignoring fallbacks. You should have the
127 * global source tree locked to call this.
128 */
source_find_mount_raw(const char * mount)129 source_t *source_find_mount_raw(const char *mount)
130 {
131 source_t *source;
132 avl_node *node;
133 int cmp;
134
135 if (!mount) {
136 return NULL;
137 }
138 /* get the root node */
139 node = global.source_tree->root->right;
140
141 while (node) {
142 source = (source_t *)node->key;
143 cmp = strcmp(mount, source->mount);
144 if (cmp < 0)
145 node = node->left;
146 else if (cmp > 0)
147 node = node->right;
148 else
149 return source;
150 }
151
152 /* didn't find it */
153 return NULL;
154 }
155
156
157 /* Search for mount, if the mount is there but not currently running then
158 * check the fallback, and so on. Must have a global source lock to call
159 * this function.
160 */
source_find_mount(const char * mount)161 source_t *source_find_mount (const char *mount)
162 {
163 source_t *source = NULL;
164 ice_config_t *config;
165 mount_proxy *mountinfo;
166 int depth = 0;
167
168 config = config_get_config();
169 while (mount && depth < MAX_FALLBACK_DEPTH)
170 {
171 source = source_find_mount_raw(mount);
172
173 if (source)
174 {
175 if (source->running || source->on_demand)
176 break;
177 }
178
179 /* we either have a source which is not active (relay) or no source
180 * at all. Check the mounts list for fallback settings
181 */
182 mountinfo = config_find_mount (config, mount, MOUNT_TYPE_NORMAL);
183 source = NULL;
184
185 if (mountinfo == NULL)
186 break;
187 mount = mountinfo->fallback_mount;
188 depth++;
189 }
190
191 config_release_config();
192 return source;
193 }
194
195
source_compare_sources(void * arg,void * a,void * b)196 int source_compare_sources(void *arg, void *a, void *b)
197 {
198 source_t *srca = (source_t *)a;
199 source_t *srcb = (source_t *)b;
200
201 return strcmp(srca->mount, srcb->mount);
202 }
203
204
source_clear_source(source_t * source)205 void source_clear_source (source_t *source)
206 {
207 int c;
208
209 ICECAST_LOG_DEBUG("clearing source \"%s\"", source->mount);
210
211 avl_tree_wlock (source->pending_tree);
212 client_destroy(source->client);
213 source->client = NULL;
214 source->parser = NULL;
215 source->con = NULL;
216
217 /* log bytes read in access log */
218 if (source->client && source->format)
219 source->client->con->sent_bytes = source->format->read_bytes;
220
221 if (source->dumpfile)
222 {
223 ICECAST_LOG_INFO("Closing dumpfile for %s", source->mount);
224 fclose (source->dumpfile);
225 source->dumpfile = NULL;
226 }
227
228 /* lets kick off any clients that are left on here */
229 avl_tree_wlock (source->client_tree);
230 c=0;
231 while (1)
232 {
233 avl_node *node = avl_get_first (source->client_tree);
234 if (node)
235 {
236 client_t *client = node->key;
237 if (client->respcode == 200)
238 c++; /* only count clients that have had some processing */
239 avl_delete (source->client_tree, client, _free_client);
240 continue;
241 }
242 break;
243 }
244 if (c)
245 {
246 stats_event_sub (NULL, "listeners", source->listeners);
247 ICECAST_LOG_INFO("%d active listeners on %s released", c, source->mount);
248 }
249 avl_tree_unlock (source->client_tree);
250
251 while (avl_get_first (source->pending_tree))
252 {
253 avl_delete (source->pending_tree,
254 avl_get_first(source->pending_tree)->key, _free_client);
255 }
256
257 if (source->format && source->format->free_plugin)
258 source->format->free_plugin (source->format);
259 source->format = NULL;
260
261 /* Lets clear out the source queue too */
262 while (source->stream_data)
263 {
264 refbuf_t *p = source->stream_data;
265 source->stream_data = p->next;
266 p->next = NULL;
267 /* can be referenced by burst handler as well */
268 while (p->_count > 1)
269 refbuf_release (p);
270 refbuf_release (p);
271 }
272 source->stream_data_tail = NULL;
273
274 source->burst_point = NULL;
275 source->burst_size = 0;
276 source->burst_offset = 0;
277 source->queue_size = 0;
278 source->queue_size_limit = 0;
279 source->listeners = 0;
280 source->max_listeners = -1;
281 source->prev_listeners = 0;
282 source->hidden = 0;
283 source->shoutcast_compat = 0;
284 source->client_stats_update = 0;
285 util_dict_free (source->audio_info);
286 source->audio_info = NULL;
287
288 free(source->fallback_mount);
289 source->fallback_mount = NULL;
290
291 free(source->dumpfilename);
292 source->dumpfilename = NULL;
293
294 if (source->intro_file)
295 {
296 fclose (source->intro_file);
297 source->intro_file = NULL;
298 }
299
300 source->on_demand_req = 0;
301 avl_tree_unlock (source->pending_tree);
302 }
303
304
305 /* Remove the provided source from the global tree and free it */
source_free_source(source_t * source)306 void source_free_source (source_t *source)
307 {
308 ICECAST_LOG_DEBUG("freeing source \"%s\"", source->mount);
309 avl_tree_wlock (global.source_tree);
310 avl_delete (global.source_tree, source, NULL);
311 avl_tree_unlock (global.source_tree);
312
313 avl_tree_free(source->pending_tree, _free_client);
314 avl_tree_free(source->client_tree, _free_client);
315
316 /* make sure all YP entries have gone */
317 yp_remove (source->mount);
318
319 free (source->mount);
320 free (source);
321
322 return;
323 }
324
325
source_find_client(source_t * source,int id)326 client_t *source_find_client(source_t *source, int id)
327 {
328 client_t fakeclient;
329 void *result;
330 connection_t fakecon;
331
332 fakeclient.con = &fakecon;
333 fakeclient.con->id = id;
334
335 avl_tree_rlock(source->client_tree);
336 if(avl_get_by_key(source->client_tree, &fakeclient, &result) == 0)
337 {
338 avl_tree_unlock(source->client_tree);
339 return result;
340 }
341
342 avl_tree_unlock(source->client_tree);
343 return NULL;
344 }
345
346
347 /* Move clients from source to dest provided dest is running
348 * and that the stream format is the same.
349 * The only lock that should be held when this is called is the
350 * source tree lock
351 */
source_move_clients(source_t * source,source_t * dest)352 void source_move_clients (source_t *source, source_t *dest)
353 {
354 unsigned long count = 0;
355 if (strcmp (source->mount, dest->mount) == 0)
356 {
357 ICECAST_LOG_WARN("src and dst are the same \"%s\", skipping", source->mount);
358 return;
359 }
360 /* we don't want the two write locks to deadlock in here */
361 thread_mutex_lock (&move_clients_mutex);
362
363 /* if the destination is not running then we can't move clients */
364
365 avl_tree_wlock (dest->pending_tree);
366 if (dest->running == 0 && dest->on_demand == 0)
367 {
368 ICECAST_LOG_WARN("destination mount %s not running, unable to move clients ", dest->mount);
369 avl_tree_unlock (dest->pending_tree);
370 thread_mutex_unlock (&move_clients_mutex);
371 return;
372 }
373
374 do
375 {
376 client_t *client;
377
378 /* we need to move the client and pending trees - we must take the
379 * locks in this order to avoid deadlocks */
380 avl_tree_wlock (source->pending_tree);
381 avl_tree_wlock (source->client_tree);
382
383 if (source->on_demand == 0 && source->format == NULL)
384 {
385 ICECAST_LOG_INFO("source mount %s is not available", source->mount);
386 break;
387 }
388 if (source->format && dest->format)
389 {
390 if (source->format->type != dest->format->type)
391 {
392 ICECAST_LOG_WARN("stream %s and %s are of different types, ignored", source->mount, dest->mount);
393 break;
394 }
395 }
396
397 while (1)
398 {
399 avl_node *node = avl_get_first (source->pending_tree);
400 if (node == NULL)
401 break;
402 client = (client_t *)(node->key);
403 avl_delete (source->pending_tree, client, NULL);
404
405 /* when switching a client to a different queue, be wary of the
406 * refbuf it's referring to, if it's http headers then we need
407 * to write them so don't release it.
408 */
409 if (client->check_buffer != format_check_http_buffer)
410 {
411 client_set_queue (client, NULL);
412 client->check_buffer = format_check_file_buffer;
413 if (source->con == NULL)
414 client->intro_offset = -1;
415 }
416
417 avl_insert (dest->pending_tree, (void *)client);
418 count++;
419 }
420
421 while (1)
422 {
423 avl_node *node = avl_get_first (source->client_tree);
424 if (node == NULL)
425 break;
426
427 client = (client_t *)(node->key);
428 avl_delete (source->client_tree, client, NULL);
429
430 /* when switching a client to a different queue, be wary of the
431 * refbuf it's referring to, if it's http headers then we need
432 * to write them so don't release it.
433 */
434 if (client->check_buffer != format_check_http_buffer)
435 {
436 client_set_queue (client, NULL);
437 client->check_buffer = format_check_file_buffer;
438 if (source->con == NULL)
439 client->intro_offset = -1;
440 }
441 avl_insert (dest->pending_tree, (void *)client);
442 count++;
443 }
444 ICECAST_LOG_INFO("passing %lu listeners to \"%s\"", count, dest->mount);
445
446 source->listeners = 0;
447 stats_event (source->mount, "listeners", "0");
448
449 } while (0);
450
451 avl_tree_unlock (source->pending_tree);
452 avl_tree_unlock (source->client_tree);
453
454 /* see if we need to wake up an on-demand relay */
455 if (dest->running == 0 && dest->on_demand && count)
456 dest->on_demand_req = 1;
457
458 avl_tree_unlock (dest->pending_tree);
459 thread_mutex_unlock (&move_clients_mutex);
460 }
461
462
463 /* get some data from the source. The stream data is placed in a refbuf
464 * and sent back, however NULL is also valid as in the case of a short
465 * timeout and there's no data pending.
466 */
get_next_buffer(source_t * source)467 static refbuf_t *get_next_buffer (source_t *source)
468 {
469 refbuf_t *refbuf = NULL;
470 int delay = 250;
471
472 if (source->short_delay)
473 delay = 0;
474 while (global.running == ICECAST_RUNNING && source->running)
475 {
476 int fds = 0;
477 time_t current = time (NULL);
478
479 if (source->client)
480 fds = util_timed_wait_for_fd (source->con->sock, delay);
481 else
482 {
483 thread_sleep (delay*1000);
484 source->last_read = current;
485 }
486
487 if (current >= source->client_stats_update)
488 {
489 stats_event_args (source->mount, "total_bytes_read",
490 "%"PRIu64, source->format->read_bytes);
491 stats_event_args (source->mount, "total_bytes_sent",
492 "%"PRIu64, source->format->sent_bytes);
493 source->client_stats_update = current + 5;
494 }
495 if (fds < 0)
496 {
497 if (! sock_recoverable (sock_error()))
498 {
499 ICECAST_LOG_WARN("Error while waiting on socket, Disconnecting source");
500 source->running = 0;
501 }
502 break;
503 }
504 if (fds == 0)
505 {
506 thread_mutex_lock(&source->lock);
507 if ((source->last_read + (time_t)source->timeout) < current)
508 {
509 ICECAST_LOG_DEBUG("last %ld, timeout %d, now %ld", (long)source->last_read,
510 source->timeout, (long)current);
511 ICECAST_LOG_WARN("Disconnecting source due to socket timeout");
512 source->running = 0;
513 }
514 thread_mutex_unlock(&source->lock);
515 break;
516 }
517 source->last_read = current;
518 refbuf = source->format->get_buffer (source);
519 if (source->client->con && source->client->con->error)
520 {
521 ICECAST_LOG_INFO("End of Stream %s", source->mount);
522 source->running = 0;
523 continue;
524 }
525 if (refbuf)
526 break;
527 }
528
529 return refbuf;
530 }
531
532
533 /* general send routine per listener. The deletion_expected tells us whether
534 * the last in the queue is about to disappear, so if this client is still
535 * referring to it after writing then drop the client as it's fallen too far
536 * behind
537 */
send_to_listener(source_t * source,client_t * client,int deletion_expected)538 static void send_to_listener (source_t *source, client_t *client, int deletion_expected)
539 {
540 int bytes;
541 int loop = 10; /* max number of iterations in one go */
542 int total_written = 0;
543
544 while (1)
545 {
546 /* check for limited listener time */
547 if (client->con->discon_time)
548 if (time(NULL) >= client->con->discon_time)
549 {
550 ICECAST_LOG_INFO("time limit reached for client #%lu", client->con->id);
551 client->con->error = 1;
552 }
553
554 /* jump out if client connection has died */
555 if (client->con->error)
556 break;
557
558 /* lets not send too much to one client in one go, but don't
559 sleep for too long if more data can be sent */
560 if (total_written > 20000 || loop == 0)
561 {
562 if (client->check_buffer != format_check_file_buffer)
563 source->short_delay = 1;
564 break;
565 }
566
567 loop--;
568
569 if (client->check_buffer (source, client) < 0)
570 break;
571
572 bytes = client->write_to_client (client);
573 if (bytes <= 0)
574 break; /* can't write any more */
575
576 total_written += bytes;
577 }
578 source->format->sent_bytes += total_written;
579
580 /* the refbuf referenced at head (last in queue) may be marked for deletion
581 * if so, check to see if this client is still referring to it */
582 if (deletion_expected && client->refbuf && client->refbuf == source->stream_data)
583 {
584 ICECAST_LOG_INFO("Client %lu (%s) has fallen too far behind, removing",
585 client->con->id, client->con->ip);
586 stats_event_inc (source->mount, "slow_listeners");
587 client->con->error = 1;
588 }
589 }
590
591
592 /* Open the file for stream dumping.
593 * This function should do all processing of the filename.
594 */
source_open_dumpfile(const char * filename)595 static FILE * source_open_dumpfile(const char * filename) {
596 #ifndef _WIN32
597 /* some of the below functions seems not to be standard winapi functions */
598 char buffer[PATH_MAX];
599 time_t curtime;
600 struct tm *loctime;
601
602 /* Get the current time. */
603 curtime = time (NULL);
604
605 /* Convert it to local time representation. */
606 loctime = localtime (&curtime);
607
608 strftime (buffer, sizeof(buffer), filename, loctime);
609 filename = buffer;
610 #endif
611
612 return fopen (filename, "ab");
613 }
614
615 /* Perform any initialisation just before the stream data is processed, the header
616 * info is processed by now and the format details are setup
617 */
source_init(source_t * source)618 static void source_init (source_t *source)
619 {
620 ice_config_t *config = config_get_config();
621 char *listenurl;
622 const char *str;
623 int listen_url_size;
624 mount_proxy *mountinfo;
625
626 /* 6 for max size of port */
627 listen_url_size = strlen("http://") + strlen(config->hostname) +
628 strlen(":") + 6 + strlen(source->mount) + 1;
629
630 listenurl = malloc (listen_url_size);
631 memset (listenurl, '\000', listen_url_size);
632 snprintf (listenurl, listen_url_size, "http://%s:%d%s",
633 config->hostname, config->port, source->mount);
634 config_release_config();
635
636 str = httpp_getvar(source->parser, "ice-audio-info");
637 source->audio_info = util_dict_new();
638 if (str)
639 {
640 _parse_audio_info (source, str);
641 stats_event (source->mount, "audio_info", str);
642 }
643
644 stats_event (source->mount, "listenurl", listenurl);
645
646 free(listenurl);
647
648 if (source->dumpfilename != NULL)
649 {
650 source->dumpfile = source_open_dumpfile (source->dumpfilename);
651 if (source->dumpfile == NULL)
652 {
653 ICECAST_LOG_WARN("Cannot open dump file \"%s\" for appending: %s, disabling.",
654 source->dumpfilename, strerror(errno));
655 }
656 }
657
658 /* grab a read lock, to make sure we get a chance to cleanup */
659 thread_rwlock_rlock (source->shutdown_rwlock);
660
661 /* start off the statistics */
662 source->listeners = 0;
663 stats_event_inc (NULL, "source_total_connections");
664 stats_event (source->mount, "slow_listeners", "0");
665 stats_event_args (source->mount, "listeners", "%lu", source->listeners);
666 stats_event_args (source->mount, "listener_peak", "%lu", source->peak_listeners);
667 stats_event_time (source->mount, "stream_start");
668 stats_event_time_iso8601 (source->mount, "stream_start_iso8601");
669
670 ICECAST_LOG_DEBUG("Source creation complete");
671 source->last_read = time (NULL);
672 source->prev_listeners = -1;
673 source->running = 1;
674
675 mountinfo = config_find_mount (config_get_config(), source->mount, MOUNT_TYPE_NORMAL);
676 if (mountinfo)
677 {
678 if (mountinfo->on_connect)
679 source_run_script (mountinfo->on_connect, source->mount);
680 auth_stream_start (mountinfo, source->mount);
681 }
682 config_release_config();
683
684 /*
685 ** Now, if we have a fallback source and override is on, we want
686 ** to steal its clients, because it means we've come back online
687 ** after a failure and they should be gotten back from the waiting
688 ** loop or jingle track or whatever the fallback is used for
689 */
690
691 if (source->fallback_override && source->fallback_mount)
692 {
693 source_t *fallback_source;
694
695 avl_tree_rlock(global.source_tree);
696 fallback_source = source_find_mount(source->fallback_mount);
697
698 if (fallback_source)
699 source_move_clients (fallback_source, source);
700
701 avl_tree_unlock(global.source_tree);
702 }
703 }
704
705
source_main(source_t * source)706 void source_main (source_t *source)
707 {
708 refbuf_t *refbuf;
709 client_t *client;
710 avl_node *client_node;
711
712 source_init (source);
713
714 while (global.running == ICECAST_RUNNING && source->running) {
715 int remove_from_q;
716
717 refbuf = get_next_buffer (source);
718
719 remove_from_q = 0;
720 source->short_delay = 0;
721
722 if (refbuf)
723 {
724 /* append buffer to the in-flight data queue, */
725 if (source->stream_data == NULL)
726 {
727 source->stream_data = refbuf;
728 source->burst_point = refbuf;
729 }
730 if (source->stream_data_tail)
731 source->stream_data_tail->next = refbuf;
732 source->stream_data_tail = refbuf;
733 source->queue_size += refbuf->len;
734 /* new buffer is referenced for burst */
735 refbuf_addref (refbuf);
736
737 /* new data on queue, so check the burst point */
738 source->burst_offset += refbuf->len;
739 while (source->burst_offset > source->burst_size)
740 {
741 refbuf_t *to_release = source->burst_point;
742
743 if (to_release->next)
744 {
745 source->burst_point = to_release->next;
746 source->burst_offset -= to_release->len;
747 refbuf_release (to_release);
748 continue;
749 }
750 break;
751 }
752
753 /* save stream to file */
754 if (source->dumpfile && source->format->write_buf_to_file)
755 source->format->write_buf_to_file (source, refbuf);
756 }
757 /* lets see if we have too much data in the queue, but don't remove it until later */
758 thread_mutex_lock(&source->lock);
759 if (source->queue_size > source->queue_size_limit)
760 remove_from_q = 1;
761 thread_mutex_unlock(&source->lock);
762
763 /* acquire write lock on pending_tree */
764 avl_tree_wlock(source->pending_tree);
765
766 /* acquire write lock on client_tree */
767 avl_tree_wlock(source->client_tree);
768
769 client_node = avl_get_first(source->client_tree);
770 while (client_node) {
771 client = (client_t *)client_node->key;
772
773 send_to_listener (source, client, remove_from_q);
774
775 if (client->con->error) {
776 client_node = avl_get_next(client_node);
777 if (client->respcode == 200)
778 stats_event_dec (NULL, "listeners");
779 avl_delete(source->client_tree, (void *)client, _free_client);
780 source->listeners--;
781 ICECAST_LOG_DEBUG("Client removed");
782 continue;
783 }
784 client_node = avl_get_next(client_node);
785 }
786
787 /** add pending clients **/
788 client_node = avl_get_first(source->pending_tree);
789 while (client_node) {
790
791 if(source->max_listeners != -1 &&
792 source->listeners >= (unsigned long)source->max_listeners)
793 {
794 /* The common case is caught in the main connection handler,
795 * this deals with rarer cases (mostly concerning fallbacks)
796 * and doesn't give the listening client any information about
797 * why they were disconnected
798 */
799 client = (client_t *)client_node->key;
800 client_node = avl_get_next(client_node);
801 avl_delete(source->pending_tree, (void *)client, _free_client);
802
803 ICECAST_LOG_INFO("Client deleted, exceeding maximum listeners for this "
804 "mountpoint (%s).", source->mount);
805 continue;
806 }
807
808 /* Otherwise, the client is accepted, add it */
809 avl_insert(source->client_tree, client_node->key);
810
811 source->listeners++;
812 ICECAST_LOG_DEBUG("Client added for mountpoint (%s)", source->mount);
813 stats_event_inc(source->mount, "connections");
814
815 client_node = avl_get_next(client_node);
816 }
817
818 /** clear pending tree **/
819 while (avl_get_first(source->pending_tree)) {
820 avl_delete(source->pending_tree,
821 avl_get_first(source->pending_tree)->key,
822 source_remove_client);
823 }
824
825 /* release write lock on pending_tree */
826 avl_tree_unlock(source->pending_tree);
827
828 /* update the stats if need be */
829 if (source->listeners != source->prev_listeners)
830 {
831 source->prev_listeners = source->listeners;
832 ICECAST_LOG_INFO("listener count on %s now %lu", source->mount, source->listeners);
833 if (source->listeners > source->peak_listeners)
834 {
835 source->peak_listeners = source->listeners;
836 stats_event_args (source->mount, "listener_peak", "%lu", source->peak_listeners);
837 }
838 stats_event_args (source->mount, "listeners", "%lu", source->listeners);
839 if (source->listeners == 0 && source->on_demand)
840 source->running = 0;
841 }
842
843 /* lets reduce the queue, any lagging clients should of been
844 * terminated by now
845 */
846 if (source->stream_data)
847 {
848 /* normal unreferenced queue data will have a refcount 1, but
849 * burst queue data will be at least 2, active clients will also
850 * increase refcount */
851 while (source->stream_data->_count == 1)
852 {
853 refbuf_t *to_go = source->stream_data;
854
855 if (to_go->next == NULL || source->burst_point == to_go)
856 {
857 /* this should not happen */
858 ICECAST_LOG_ERROR("queue state is unexpected");
859 source->running = 0;
860 break;
861 }
862 source->stream_data = to_go->next;
863 source->queue_size -= to_go->len;
864 to_go->next = NULL;
865 refbuf_release (to_go);
866 }
867 }
868
869 /* release write lock on client_tree */
870 avl_tree_unlock(source->client_tree);
871 }
872 source_shutdown (source);
873 }
874
875
source_shutdown(source_t * source)876 static void source_shutdown (source_t *source)
877 {
878 mount_proxy *mountinfo;
879
880 source->running = 0;
881 if (source->con && source->con->ip) {
882 ICECAST_LOG_INFO("Source from %s at \"%s\" exiting", source->con->ip, source->mount);
883 } else {
884 ICECAST_LOG_INFO("Source at \"%s\" exiting", source->mount);
885 }
886
887 mountinfo = config_find_mount (config_get_config(), source->mount, MOUNT_TYPE_NORMAL);
888 if (mountinfo)
889 {
890 if (mountinfo->on_disconnect)
891 source_run_script (mountinfo->on_disconnect, source->mount);
892 auth_stream_end (mountinfo, source->mount);
893 }
894 config_release_config();
895
896 /* we have de-activated the source now, so no more clients will be
897 * added, now move the listeners we have to the fallback (if any)
898 */
899 if (source->fallback_mount)
900 {
901 source_t *fallback_source;
902
903 avl_tree_rlock(global.source_tree);
904 fallback_source = source_find_mount (source->fallback_mount);
905
906 if (fallback_source != NULL)
907 source_move_clients (source, fallback_source);
908
909 avl_tree_unlock (global.source_tree);
910 }
911
912 /* delete this sources stats */
913 stats_event(source->mount, NULL, NULL);
914
915 /* we don't remove the source from the tree here, it may be a relay and
916 therefore reserved */
917 source_clear_source (source);
918
919 global_lock();
920 global.sources--;
921 stats_event_args (NULL, "sources", "%d", global.sources);
922 global_unlock();
923
924 /* release our hold on the lock so the main thread can continue cleaning up */
925 thread_rwlock_unlock(source->shutdown_rwlock);
926 }
927
928
_compare_clients(void * compare_arg,void * a,void * b)929 static int _compare_clients(void *compare_arg, void *a, void *b)
930 {
931 client_t *clienta = (client_t *)a;
932 client_t *clientb = (client_t *)b;
933
934 connection_t *cona = clienta->con;
935 connection_t *conb = clientb->con;
936
937 if (cona->id < conb->id) return -1;
938 if (cona->id > conb->id) return 1;
939
940 return 0;
941 }
942
source_remove_client(void * key)943 int source_remove_client(void *key)
944 {
945 return 1;
946 }
947
_free_client(void * key)948 static int _free_client(void *key)
949 {
950 client_t *client = (client_t *)key;
951
952 /* if no response has been sent then send a 404 */
953 if (client->respcode == 0)
954 client_send_404 (client, "Mount unavailable");
955 else
956 client_destroy(client);
957
958 return 1;
959 }
960
_parse_audio_info(source_t * source,const char * s)961 static void _parse_audio_info (source_t *source, const char *s)
962 {
963 const char *start = s;
964 unsigned int len;
965
966 while (start != NULL && *start != '\0')
967 {
968 if ((s = strchr (start, ';')) == NULL)
969 len = strlen (start);
970 else
971 {
972 len = (int)(s - start);
973 s++; /* skip passed the ';' */
974 }
975 if (len)
976 {
977 char name[100], value[200];
978 char *esc;
979
980 sscanf (start, "%99[^=]=%199[^;\r\n]", name, value);
981 esc = util_url_unescape (value);
982 if (esc)
983 {
984 util_dict_set (source->audio_info, name, esc);
985 stats_event (source->mount, name, esc);
986 free (esc);
987 }
988 }
989 start = s;
990 }
991 }
992
993
994 /* Apply the mountinfo details to the source */
source_apply_mount(source_t * source,mount_proxy * mountinfo)995 static void source_apply_mount (source_t *source, mount_proxy *mountinfo)
996 {
997 const char *str;
998 int val;
999 http_parser_t *parser = NULL;
1000
1001 ICECAST_LOG_DEBUG("Applying mount information for \"%s\"", source->mount);
1002 avl_tree_rlock (source->client_tree);
1003 stats_event_args (source->mount, "listener_peak", "%lu", source->peak_listeners);
1004
1005 if (mountinfo)
1006 {
1007 source->max_listeners = mountinfo->max_listeners;
1008 source->fallback_override = mountinfo->fallback_override;
1009 source->hidden = mountinfo->hidden;
1010 }
1011
1012 /* if a setting is available in the mount details then use it, else
1013 * check the parser details. */
1014
1015 if (source->client)
1016 parser = source->client->parser;
1017
1018 /* to be done before possible non-utf8 stats */
1019 if (source->format && source->format->apply_settings)
1020 source->format->apply_settings (source->client, source->format, mountinfo);
1021
1022 /* public */
1023 if (mountinfo && mountinfo->yp_public >= 0)
1024 val = mountinfo->yp_public;
1025 else
1026 {
1027 do {
1028 str = httpp_getvar (parser, "ice-public");
1029 if (str) break;
1030 str = httpp_getvar (parser, "icy-pub");
1031 if (str) break;
1032 str = httpp_getvar (parser, "x-audiocast-public");
1033 if (str) break;
1034 /* handle header from icecast v2 release */
1035 str = httpp_getvar (parser, "icy-public");
1036 if (str) break;
1037 str = "0";
1038 } while (0);
1039 val = atoi (str);
1040 }
1041 stats_event_args (source->mount, "public", "%d", val);
1042 if (source->yp_public != val)
1043 {
1044 ICECAST_LOG_DEBUG("YP changed to %d", val);
1045 if (val)
1046 yp_add (source->mount);
1047 else
1048 yp_remove (source->mount);
1049 source->yp_public = val;
1050 }
1051
1052 /* stream name */
1053 if (mountinfo && mountinfo->stream_name)
1054 stats_event (source->mount, "server_name", mountinfo->stream_name);
1055 else
1056 {
1057 do {
1058 str = httpp_getvar (parser, "ice-name");
1059 if (str) break;
1060 str = httpp_getvar (parser, "icy-name");
1061 if (str) break;
1062 str = httpp_getvar (parser, "x-audiocast-name");
1063 if (str) break;
1064 str = "Unspecified name";
1065 } while (0);
1066 if (source->format)
1067 stats_event_conv (source->mount, "server_name", str, source->format->charset);
1068 }
1069
1070 /* stream description */
1071 if (mountinfo && mountinfo->stream_description)
1072 stats_event (source->mount, "server_description", mountinfo->stream_description);
1073 else
1074 {
1075 do {
1076 str = httpp_getvar (parser, "ice-description");
1077 if (str) break;
1078 str = httpp_getvar (parser, "icy-description");
1079 if (str) break;
1080 str = httpp_getvar (parser, "x-audiocast-description");
1081 if (str) break;
1082 str = "Unspecified description";
1083 } while (0);
1084 if (source->format)
1085 stats_event_conv (source->mount, "server_description", str, source->format->charset);
1086 }
1087
1088 /* stream URL */
1089 if (mountinfo && mountinfo->stream_url)
1090 stats_event (source->mount, "server_url", mountinfo->stream_url);
1091 else
1092 {
1093 do {
1094 str = httpp_getvar (parser, "ice-url");
1095 if (str) break;
1096 str = httpp_getvar (parser, "icy-url");
1097 if (str) break;
1098 str = httpp_getvar (parser, "x-audiocast-url");
1099 if (str) break;
1100 } while (0);
1101 if (str && source->format)
1102 stats_event_conv (source->mount, "server_url", str, source->format->charset);
1103 }
1104
1105 /* stream genre */
1106 if (mountinfo && mountinfo->stream_genre)
1107 stats_event (source->mount, "genre", mountinfo->stream_genre);
1108 else
1109 {
1110 do {
1111 str = httpp_getvar (parser, "ice-genre");
1112 if (str) break;
1113 str = httpp_getvar (parser, "icy-genre");
1114 if (str) break;
1115 str = httpp_getvar (parser, "x-audiocast-genre");
1116 if (str) break;
1117 str = "various";
1118 } while (0);
1119 if (source->format)
1120 stats_event_conv (source->mount, "genre", str, source->format->charset);
1121 }
1122
1123 /* stream bitrate */
1124 if (mountinfo && mountinfo->bitrate)
1125 str = mountinfo->bitrate;
1126 else
1127 {
1128 do {
1129 str = httpp_getvar (parser, "ice-bitrate");
1130 if (str) break;
1131 str = httpp_getvar (parser, "icy-br");
1132 if (str) break;
1133 str = httpp_getvar (parser, "x-audiocast-bitrate");
1134 } while (0);
1135 }
1136 stats_event (source->mount, "bitrate", str);
1137
1138 /* handle MIME-type */
1139 if (mountinfo && mountinfo->type)
1140 stats_event (source->mount, "server_type", mountinfo->type);
1141 else
1142 if (source->format)
1143 stats_event (source->mount, "server_type", source->format->contenttype);
1144
1145 if (mountinfo && mountinfo->subtype)
1146 stats_event (source->mount, "subtype", mountinfo->subtype);
1147
1148 if (mountinfo && mountinfo->auth)
1149 stats_event (source->mount, "authenticator", mountinfo->auth->type);
1150 else
1151 stats_event (source->mount, "authenticator", NULL);
1152
1153 if (mountinfo && mountinfo->fallback_mount)
1154 {
1155 char *mount = source->fallback_mount;
1156 source->fallback_mount = strdup (mountinfo->fallback_mount);
1157 free (mount);
1158 }
1159 else
1160 source->fallback_mount = NULL;
1161
1162 if (mountinfo && mountinfo->dumpfile)
1163 {
1164 char *filename = source->dumpfilename;
1165 source->dumpfilename = strdup (mountinfo->dumpfile);
1166 free (filename);
1167 }
1168 else
1169 source->dumpfilename = NULL;
1170
1171 if (source->intro_file)
1172 {
1173 fclose (source->intro_file);
1174 source->intro_file = NULL;
1175 }
1176 if (mountinfo && mountinfo->intro_filename)
1177 {
1178 ice_config_t *config = config_get_config_unlocked ();
1179 unsigned int len = strlen (config->webroot_dir) +
1180 strlen (mountinfo->intro_filename) + 2;
1181 char *path = malloc (len);
1182 if (path)
1183 {
1184 FILE *f;
1185 snprintf (path, len, "%s" PATH_SEPARATOR "%s", config->webroot_dir,
1186 mountinfo->intro_filename);
1187
1188 f = fopen (path, "rb");
1189 if (f)
1190 source->intro_file = f;
1191 else
1192 ICECAST_LOG_WARN("Cannot open intro file \"%s\": %s", path, strerror(errno));
1193 free (path);
1194 }
1195 }
1196
1197 if (mountinfo && mountinfo->queue_size_limit)
1198 source->queue_size_limit = mountinfo->queue_size_limit;
1199
1200 if (mountinfo && mountinfo->source_timeout)
1201 source->timeout = mountinfo->source_timeout;
1202
1203 if (mountinfo && mountinfo->burst_size >= 0)
1204 source->burst_size = (unsigned int)mountinfo->burst_size;
1205
1206 if (mountinfo && mountinfo->fallback_when_full)
1207 source->fallback_when_full = mountinfo->fallback_when_full;
1208
1209 avl_tree_unlock (source->client_tree);
1210 }
1211
1212
1213 /* update the specified source with details from the config or mount.
1214 * mountinfo can be NULL in which case default settings should be taken
1215 * This function is called by the Slave thread
1216 */
source_update_settings(ice_config_t * config,source_t * source,mount_proxy * mountinfo)1217 void source_update_settings (ice_config_t *config, source_t *source, mount_proxy *mountinfo)
1218 {
1219 thread_mutex_lock(&source->lock);
1220 /* skip if source is a fallback to file */
1221 if (source->running && source->client == NULL)
1222 {
1223 stats_event_hidden (source->mount, NULL, 1);
1224 thread_mutex_unlock(&source->lock);
1225 return;
1226 }
1227 /* set global settings first */
1228 source->queue_size_limit = config->queue_size_limit;
1229 source->timeout = config->source_timeout;
1230 source->burst_size = config->burst_size;
1231
1232 stats_event_args (source->mount, "listenurl", "http://%s:%d%s",
1233 config->hostname, config->port, source->mount);
1234
1235 source_apply_mount (source, mountinfo);
1236
1237 if (source->fallback_mount)
1238 ICECAST_LOG_DEBUG("fallback %s", source->fallback_mount);
1239 if (mountinfo && mountinfo->intro_filename)
1240 ICECAST_LOG_DEBUG("intro file is %s", mountinfo->intro_filename);
1241 if (source->dumpfilename)
1242 ICECAST_LOG_DEBUG("Dumping stream to %s", source->dumpfilename);
1243 if (mountinfo && mountinfo->on_connect)
1244 ICECAST_LOG_DEBUG("connect script \"%s\"", mountinfo->on_connect);
1245 if (mountinfo && mountinfo->on_disconnect)
1246 ICECAST_LOG_DEBUG("disconnect script \"%s\"", mountinfo->on_disconnect);
1247 if (source->on_demand)
1248 {
1249 ICECAST_LOG_DEBUG("on_demand set");
1250 stats_event (source->mount, "on_demand", "1");
1251 stats_event_args (source->mount, "listeners", "%ld", source->listeners);
1252 }
1253 else
1254 stats_event (source->mount, "on_demand", NULL);
1255
1256 if (source->hidden)
1257 {
1258 stats_event_hidden (source->mount, NULL, 1);
1259 ICECAST_LOG_DEBUG("hidden from public");
1260 }
1261 else
1262 stats_event_hidden (source->mount, NULL, 0);
1263
1264 if (source->max_listeners == -1)
1265 stats_event (source->mount, "max_listeners", "unlimited");
1266 else
1267 {
1268 char buf [10];
1269 snprintf (buf, sizeof (buf), "%ld", source->max_listeners);
1270 stats_event (source->mount, "max_listeners", buf);
1271 }
1272 ICECAST_LOG_DEBUG("public set to %d", source->yp_public);
1273 ICECAST_LOG_DEBUG("max listeners to %ld", source->max_listeners);
1274 ICECAST_LOG_DEBUG("queue size to %u", source->queue_size_limit);
1275 ICECAST_LOG_DEBUG("burst size to %u", source->burst_size);
1276 ICECAST_LOG_DEBUG("source timeout to %u", source->timeout);
1277 ICECAST_LOG_DEBUG("fallback_when_full to %u", source->fallback_when_full);
1278 thread_mutex_unlock(&source->lock);
1279 }
1280
1281
source_client_thread(void * arg)1282 void *source_client_thread (void *arg)
1283 {
1284 source_t *source = arg;
1285
1286 stats_event_inc(NULL, "source_client_connections");
1287 stats_event (source->mount, "listeners", "0");
1288
1289 source_main (source);
1290
1291 source_free_source (source);
1292 slave_update_all_mounts();
1293
1294 return NULL;
1295 }
1296
1297
source_client_callback(client_t * client,void * arg)1298 void source_client_callback (client_t *client, void *arg)
1299 {
1300 const char *agent;
1301 source_t *source = arg;
1302 refbuf_t *old_data = client->refbuf;
1303
1304 if (client->con->error)
1305 {
1306 global_lock();
1307 global.sources--;
1308 global_unlock();
1309 source_clear_source (source);
1310 source_free_source (source);
1311 return;
1312 }
1313 client->refbuf = old_data->associated;
1314 old_data->associated = NULL;
1315 refbuf_release (old_data);
1316 stats_event (source->mount, "source_ip", source->client->con->ip);
1317 agent = httpp_getvar (source->client->parser, "user-agent");
1318 if (agent)
1319 stats_event (source->mount, "user_agent", agent);
1320
1321 thread_create ("Source Thread", source_client_thread,
1322 source, THREAD_DETACHED);
1323 }
1324
1325
1326 #ifndef _WIN32
1327 /* this sets up the new environment for script execution.
1328 * We ignore most failtures as we can not handle them anyway.
1329 */
__setup_empty_script_environment(void)1330 static inline void __setup_empty_script_environment(void) {
1331 int i;
1332
1333 /* close at least the first 1024 handles */
1334 for (i = 0; i < 1024; i++)
1335 close(i);
1336
1337 /* open null device */
1338 i = open("/dev/null", O_RDWR);
1339 if (i != -1) {
1340 /* attach null device to stdin, stdout and stderr */
1341 if (i != 0)
1342 dup2(i, 0);
1343 if (i != 1)
1344 dup2(i, 1);
1345 if (i != 2)
1346 dup2(i, 2);
1347
1348 /* close null device */
1349 if (i > 2)
1350 close(i);
1351 }
1352 }
1353
source_run_script(char * command,char * mountpoint)1354 static void source_run_script (char *command, char *mountpoint)
1355 {
1356 pid_t pid, external_pid;
1357
1358 /* do a fork twice so that the command has init as parent */
1359 external_pid = fork();
1360 switch (external_pid)
1361 {
1362 case 0:
1363 switch (pid = fork ())
1364 {
1365 case -1:
1366 ICECAST_LOG_ERROR("Unable to fork %s (%s)", command, strerror (errno));
1367 break;
1368 case 0: /* child */
1369 if (access(command, R_OK|X_OK) != 0) {
1370 ICECAST_LOG_ERROR("Unable to run command %s (%s)", command, strerror(errno));
1371 exit(1);
1372 }
1373 ICECAST_LOG_DEBUG("Starting command %s", command);
1374 __setup_empty_script_environment();
1375 /* consider to add action here as well */
1376 execl(command, command, mountpoint, (char *)NULL);
1377 exit(1);
1378 default: /* parent */
1379 break;
1380 }
1381 exit (0);
1382 case -1:
1383 ICECAST_LOG_ERROR("Unable to fork %s", strerror (errno));
1384 break;
1385 default: /* parent */
1386 waitpid (external_pid, NULL, 0);
1387 break;
1388 }
1389 }
1390 #endif
1391
1392
source_fallback_file(void * arg)1393 static void *source_fallback_file (void *arg)
1394 {
1395 char *mount = arg;
1396 char *type;
1397 char *path;
1398 unsigned int len;
1399 FILE *file = NULL;
1400 source_t *source = NULL;
1401 ice_config_t *config;
1402 http_parser_t *parser;
1403
1404 do
1405 {
1406 if (mount == NULL || mount[0] != '/')
1407 break;
1408 config = config_get_config ();
1409 len = strlen (config->webroot_dir) + strlen (mount) + 1;
1410 path = malloc (len);
1411 if (path)
1412 snprintf (path, len, "%s%s", config->webroot_dir, mount);
1413 config_release_config ();
1414
1415 if (path == NULL)
1416 break;
1417
1418 file = fopen (path, "rb");
1419 if (file == NULL)
1420 {
1421 ICECAST_LOG_WARN("unable to open file \"%s\"", path);
1422 free (path);
1423 break;
1424 }
1425 free (path);
1426 source = source_reserve (mount);
1427 if (source == NULL)
1428 {
1429 ICECAST_LOG_WARN("mountpoint \"%s\" already reserved", mount);
1430 break;
1431 }
1432 ICECAST_LOG_INFO("mountpoint %s is reserved", mount);
1433 type = fserve_content_type (mount);
1434 parser = httpp_create_parser();
1435 httpp_initialize (parser, NULL);
1436 httpp_setvar (parser, "content-type", type);
1437 free (type);
1438
1439 source->hidden = 1;
1440 source->yp_public = 0;
1441 source->intro_file = file;
1442 source->parser = parser;
1443 file = NULL;
1444
1445 if (connection_complete_source (source, 0) < 0)
1446 break;
1447 source_client_thread (source);
1448 httpp_destroy (parser);
1449 } while (0);
1450 if (file)
1451 fclose (file);
1452 free (mount);
1453 return NULL;
1454 }
1455
1456
1457 /* rescan the mount list, so that xsl files are updated to show
1458 * unconnected but active fallback mountpoints
1459 */
source_recheck_mounts(int update_all)1460 void source_recheck_mounts (int update_all)
1461 {
1462 ice_config_t *config;
1463 mount_proxy *mount;
1464
1465 avl_tree_rlock (global.source_tree);
1466 config = config_get_config();
1467 mount = config->mounts;
1468
1469 if (update_all)
1470 stats_clear_virtual_mounts ();
1471
1472 for (; mount; mount = mount->next)
1473 {
1474 if (mount->mounttype != MOUNT_TYPE_NORMAL)
1475 continue;
1476
1477 source_t *source = source_find_mount (mount->mountname);
1478
1479 if (source)
1480 {
1481 source = source_find_mount_raw (mount->mountname);
1482 if (source)
1483 {
1484 mount_proxy *mountinfo = config_find_mount (config, source->mount, MOUNT_TYPE_NORMAL);
1485 source_update_settings (config, source, mountinfo);
1486 }
1487 else if (update_all)
1488 {
1489 stats_event_hidden (mount->mountname, NULL, mount->hidden);
1490 stats_event_args (mount->mountname, "listenurl", "http://%s:%d%s",
1491 config->hostname, config->port, mount->mountname);
1492 stats_event (mount->mountname, "listeners", "0");
1493 if (mount->max_listeners < 0)
1494 stats_event (mount->mountname, "max_listeners", "unlimited");
1495 else
1496 stats_event_args (mount->mountname, "max_listeners", "%d", mount->max_listeners);
1497 }
1498 }
1499 else
1500 stats_event (mount->mountname, NULL, NULL);
1501
1502 /* check for fallback to file */
1503 if (global.running == ICECAST_RUNNING && mount->fallback_mount)
1504 {
1505 source_t *fallback = source_find_mount (mount->fallback_mount);
1506 if (fallback == NULL)
1507 {
1508 thread_create ("Fallback file thread", source_fallback_file,
1509 strdup (mount->fallback_mount), THREAD_DETACHED);
1510 }
1511 }
1512 }
1513 avl_tree_unlock (global.source_tree);
1514 config_release_config();
1515 }
1516
1517