1 /* Icecast
2  *
3  * This program is distributed under the GNU General Public License, version 2.
4  * A copy of this license is included with this source.
5  *
6  * Copyright 2000-2004, Jack Moffitt <jack@xiph.org,
7  *                      Michael Smith <msmith@xiph.org>,
8  *                      oddsock <oddsock@xiph.org>,
9  *                      Karl Heyes <karl@xiph.org>
10  *                      and others (see AUTHORS for details).
11  */
12 
13 /* -*- c-basic-offset: 4; indent-tabs-mode: nil; -*- */
14 #ifdef HAVE_CONFIG_H
15 #include <config.h>
16 #endif
17 
18 #include <stdio.h>
19 #include <stdlib.h>
20 #include <string.h>
21 #include <sys/types.h>
22 #include <ogg/ogg.h>
23 #include <errno.h>
24 
25 #ifndef _WIN32
26 #include <unistd.h>
27 #include <sys/time.h>
28 #include <sys/socket.h>
29 #include <sys/wait.h>
30 #include <limits.h>
31 #ifndef PATH_MAX
32 #define PATH_MAX 4096
33 #endif
34 #else
35 #include <winsock2.h>
36 #include <windows.h>
37 #define snprintf _snprintf
38 #endif
39 
40 #ifndef _WIN32
41 /* for __setup_empty_script_environment() */
42 #include <sys/stat.h>
43 #include <fcntl.h>
44 #endif
45 
46 #include "thread/thread.h"
47 #include "avl/avl.h"
48 #include "httpp/httpp.h"
49 #include "net/sock.h"
50 
51 #include "connection.h"
52 #include "global.h"
53 #include "refbuf.h"
54 #include "client.h"
55 #include "stats.h"
56 #include "logging.h"
57 #include "cfgfile.h"
58 #include "util.h"
59 #include "source.h"
60 #include "format.h"
61 #include "fserve.h"
62 #include "auth.h"
63 #include "compat.h"
64 
65 #undef CATMODULE
66 #define CATMODULE "source"
67 
68 #define MAX_FALLBACK_DEPTH 10
69 
70 mutex_t move_clients_mutex;
71 
72 /* avl tree helper */
73 static int _compare_clients(void *compare_arg, void *a, void *b);
74 static int _free_client(void *key);
75 static void _parse_audio_info (source_t *source, const char *s);
76 static void source_shutdown (source_t *source);
77 #ifdef _WIN32
78 #define source_run_script(x,y)  ICECAST_LOG_WARN("on [dis]connect scripts disabled");
79 #else
80 static void source_run_script (char *command, char *mountpoint);
81 #endif
82 
83 /* Allocate a new source with the stated mountpoint, if one already
84  * exists with that mountpoint in the global source tree then return
85  * NULL.
86  */
source_reserve(const char * mount)87 source_t *source_reserve (const char *mount)
88 {
89     source_t *src = NULL;
90 
91     if(mount[0] != '/')
92         ICECAST_LOG_WARN("Source at \"%s\" does not start with '/', clients will be "
93                 "unable to connect", mount);
94 
95     do
96     {
97         avl_tree_wlock (global.source_tree);
98         src = source_find_mount_raw (mount);
99         if (src)
100         {
101             src = NULL;
102             break;
103         }
104 
105         src = calloc (1, sizeof(source_t));
106         if (src == NULL)
107             break;
108 
109         src->client_tree = avl_tree_new(_compare_clients, NULL);
110         src->pending_tree = avl_tree_new(_compare_clients, NULL);
111 
112         /* make duplicates for strings or similar */
113         src->mount = strdup (mount);
114         src->max_listeners = -1;
115         thread_mutex_create(&src->lock);
116 
117         avl_insert (global.source_tree, src);
118 
119     } while (0);
120 
121     avl_tree_unlock (global.source_tree);
122     return src;
123 }
124 
125 
126 /* Find a mount with this raw name - ignoring fallbacks. You should have the
127  * global source tree locked to call this.
128  */
source_find_mount_raw(const char * mount)129 source_t *source_find_mount_raw(const char *mount)
130 {
131     source_t *source;
132     avl_node *node;
133     int cmp;
134 
135     if (!mount) {
136         return NULL;
137     }
138     /* get the root node */
139     node = global.source_tree->root->right;
140 
141     while (node) {
142         source = (source_t *)node->key;
143         cmp = strcmp(mount, source->mount);
144         if (cmp < 0)
145             node = node->left;
146         else if (cmp > 0)
147             node = node->right;
148         else
149             return source;
150     }
151 
152     /* didn't find it */
153     return NULL;
154 }
155 
156 
157 /* Search for mount, if the mount is there but not currently running then
158  * check the fallback, and so on.  Must have a global source lock to call
159  * this function.
160  */
source_find_mount(const char * mount)161 source_t *source_find_mount (const char *mount)
162 {
163     source_t *source = NULL;
164     ice_config_t *config;
165     mount_proxy *mountinfo;
166     int depth = 0;
167 
168     config = config_get_config();
169     while (mount && depth < MAX_FALLBACK_DEPTH)
170     {
171         source = source_find_mount_raw(mount);
172 
173         if (source)
174         {
175             if (source->running || source->on_demand)
176                 break;
177         }
178 
179         /* we either have a source which is not active (relay) or no source
180          * at all. Check the mounts list for fallback settings
181          */
182         mountinfo = config_find_mount (config, mount, MOUNT_TYPE_NORMAL);
183         source = NULL;
184 
185         if (mountinfo == NULL)
186             break;
187         mount = mountinfo->fallback_mount;
188         depth++;
189     }
190 
191     config_release_config();
192     return source;
193 }
194 
195 
source_compare_sources(void * arg,void * a,void * b)196 int source_compare_sources(void *arg, void *a, void *b)
197 {
198     source_t *srca = (source_t *)a;
199     source_t *srcb = (source_t *)b;
200 
201     return strcmp(srca->mount, srcb->mount);
202 }
203 
204 
source_clear_source(source_t * source)205 void source_clear_source (source_t *source)
206 {
207     int c;
208 
209     ICECAST_LOG_DEBUG("clearing source \"%s\"", source->mount);
210 
211     avl_tree_wlock (source->pending_tree);
212     client_destroy(source->client);
213     source->client = NULL;
214     source->parser = NULL;
215     source->con = NULL;
216 
217     /* log bytes read in access log */
218     if (source->client && source->format)
219         source->client->con->sent_bytes = source->format->read_bytes;
220 
221     if (source->dumpfile)
222     {
223         ICECAST_LOG_INFO("Closing dumpfile for %s", source->mount);
224         fclose (source->dumpfile);
225         source->dumpfile = NULL;
226     }
227 
228     /* lets kick off any clients that are left on here */
229     avl_tree_wlock (source->client_tree);
230     c=0;
231     while (1)
232     {
233         avl_node *node = avl_get_first (source->client_tree);
234         if (node)
235         {
236             client_t *client = node->key;
237             if (client->respcode == 200)
238                 c++; /* only count clients that have had some processing */
239             avl_delete (source->client_tree, client, _free_client);
240             continue;
241         }
242         break;
243     }
244     if (c)
245     {
246         stats_event_sub (NULL, "listeners", source->listeners);
247         ICECAST_LOG_INFO("%d active listeners on %s released", c, source->mount);
248     }
249     avl_tree_unlock (source->client_tree);
250 
251     while (avl_get_first (source->pending_tree))
252     {
253         avl_delete (source->pending_tree,
254                 avl_get_first(source->pending_tree)->key, _free_client);
255     }
256 
257     if (source->format && source->format->free_plugin)
258         source->format->free_plugin (source->format);
259     source->format = NULL;
260 
261     /* Lets clear out the source queue too */
262     while (source->stream_data)
263     {
264         refbuf_t *p = source->stream_data;
265         source->stream_data = p->next;
266         p->next = NULL;
267         /* can be referenced by burst handler as well */
268         while (p->_count > 1)
269             refbuf_release (p);
270         refbuf_release (p);
271     }
272     source->stream_data_tail = NULL;
273 
274     source->burst_point = NULL;
275     source->burst_size = 0;
276     source->burst_offset = 0;
277     source->queue_size = 0;
278     source->queue_size_limit = 0;
279     source->listeners = 0;
280     source->max_listeners = -1;
281     source->prev_listeners = 0;
282     source->hidden = 0;
283     source->shoutcast_compat = 0;
284     source->client_stats_update = 0;
285     util_dict_free (source->audio_info);
286     source->audio_info = NULL;
287 
288     free(source->fallback_mount);
289     source->fallback_mount = NULL;
290 
291     free(source->dumpfilename);
292     source->dumpfilename = NULL;
293 
294     if (source->intro_file)
295     {
296         fclose (source->intro_file);
297         source->intro_file = NULL;
298     }
299 
300     source->on_demand_req = 0;
301     avl_tree_unlock (source->pending_tree);
302 }
303 
304 
305 /* Remove the provided source from the global tree and free it */
source_free_source(source_t * source)306 void source_free_source (source_t *source)
307 {
308     ICECAST_LOG_DEBUG("freeing source \"%s\"", source->mount);
309     avl_tree_wlock (global.source_tree);
310     avl_delete (global.source_tree, source, NULL);
311     avl_tree_unlock (global.source_tree);
312 
313     avl_tree_free(source->pending_tree, _free_client);
314     avl_tree_free(source->client_tree, _free_client);
315 
316     /* make sure all YP entries have gone */
317     yp_remove (source->mount);
318 
319     free (source->mount);
320     free (source);
321 
322     return;
323 }
324 
325 
source_find_client(source_t * source,int id)326 client_t *source_find_client(source_t *source, int id)
327 {
328     client_t fakeclient;
329     void *result;
330     connection_t fakecon;
331 
332     fakeclient.con = &fakecon;
333     fakeclient.con->id = id;
334 
335     avl_tree_rlock(source->client_tree);
336     if(avl_get_by_key(source->client_tree, &fakeclient, &result) == 0)
337     {
338         avl_tree_unlock(source->client_tree);
339         return result;
340     }
341 
342     avl_tree_unlock(source->client_tree);
343     return NULL;
344 }
345 
346 
347 /* Move clients from source to dest provided dest is running
348  * and that the stream format is the same.
349  * The only lock that should be held when this is called is the
350  * source tree lock
351  */
source_move_clients(source_t * source,source_t * dest)352 void source_move_clients (source_t *source, source_t *dest)
353 {
354     unsigned long count = 0;
355     if (strcmp (source->mount, dest->mount) == 0)
356     {
357         ICECAST_LOG_WARN("src and dst are the same \"%s\", skipping", source->mount);
358         return;
359     }
360     /* we don't want the two write locks to deadlock in here */
361     thread_mutex_lock (&move_clients_mutex);
362 
363     /* if the destination is not running then we can't move clients */
364 
365     avl_tree_wlock (dest->pending_tree);
366     if (dest->running == 0 && dest->on_demand == 0)
367     {
368         ICECAST_LOG_WARN("destination mount %s not running, unable to move clients ", dest->mount);
369         avl_tree_unlock (dest->pending_tree);
370         thread_mutex_unlock (&move_clients_mutex);
371         return;
372     }
373 
374     do
375     {
376         client_t *client;
377 
378         /* we need to move the client and pending trees - we must take the
379          * locks in this order to avoid deadlocks */
380         avl_tree_wlock (source->pending_tree);
381         avl_tree_wlock (source->client_tree);
382 
383         if (source->on_demand == 0 && source->format == NULL)
384         {
385             ICECAST_LOG_INFO("source mount %s is not available", source->mount);
386             break;
387         }
388         if (source->format && dest->format)
389         {
390             if (source->format->type != dest->format->type)
391             {
392                 ICECAST_LOG_WARN("stream %s and %s are of different types, ignored", source->mount, dest->mount);
393                 break;
394             }
395         }
396 
397         while (1)
398         {
399             avl_node *node = avl_get_first (source->pending_tree);
400             if (node == NULL)
401                 break;
402             client = (client_t *)(node->key);
403             avl_delete (source->pending_tree, client, NULL);
404 
405             /* when switching a client to a different queue, be wary of the
406              * refbuf it's referring to, if it's http headers then we need
407              * to write them so don't release it.
408              */
409             if (client->check_buffer != format_check_http_buffer)
410             {
411                 client_set_queue (client, NULL);
412                 client->check_buffer = format_check_file_buffer;
413                 if (source->con == NULL)
414                     client->intro_offset = -1;
415             }
416 
417             avl_insert (dest->pending_tree, (void *)client);
418             count++;
419         }
420 
421         while (1)
422         {
423             avl_node *node = avl_get_first (source->client_tree);
424             if (node == NULL)
425                 break;
426 
427             client = (client_t *)(node->key);
428             avl_delete (source->client_tree, client, NULL);
429 
430             /* when switching a client to a different queue, be wary of the
431              * refbuf it's referring to, if it's http headers then we need
432              * to write them so don't release it.
433              */
434             if (client->check_buffer != format_check_http_buffer)
435             {
436                 client_set_queue (client, NULL);
437                 client->check_buffer = format_check_file_buffer;
438                 if (source->con == NULL)
439                     client->intro_offset = -1;
440             }
441             avl_insert (dest->pending_tree, (void *)client);
442             count++;
443         }
444         ICECAST_LOG_INFO("passing %lu listeners to \"%s\"", count, dest->mount);
445 
446         source->listeners = 0;
447         stats_event (source->mount, "listeners", "0");
448 
449     } while (0);
450 
451     avl_tree_unlock (source->pending_tree);
452     avl_tree_unlock (source->client_tree);
453 
454     /* see if we need to wake up an on-demand relay */
455     if (dest->running == 0 && dest->on_demand && count)
456         dest->on_demand_req = 1;
457 
458     avl_tree_unlock (dest->pending_tree);
459     thread_mutex_unlock (&move_clients_mutex);
460 }
461 
462 
463 /* get some data from the source. The stream data is placed in a refbuf
464  * and sent back, however NULL is also valid as in the case of a short
465  * timeout and there's no data pending.
466  */
get_next_buffer(source_t * source)467 static refbuf_t *get_next_buffer (source_t *source)
468 {
469     refbuf_t *refbuf = NULL;
470     int delay = 250;
471 
472     if (source->short_delay)
473         delay = 0;
474     while (global.running == ICECAST_RUNNING && source->running)
475     {
476         int fds = 0;
477         time_t current = time (NULL);
478 
479         if (source->client)
480             fds = util_timed_wait_for_fd (source->con->sock, delay);
481         else
482         {
483             thread_sleep (delay*1000);
484             source->last_read = current;
485         }
486 
487         if (current >= source->client_stats_update)
488         {
489             stats_event_args (source->mount, "total_bytes_read",
490                     "%"PRIu64, source->format->read_bytes);
491             stats_event_args (source->mount, "total_bytes_sent",
492                     "%"PRIu64, source->format->sent_bytes);
493             source->client_stats_update = current + 5;
494         }
495         if (fds < 0)
496         {
497             if (! sock_recoverable (sock_error()))
498             {
499                 ICECAST_LOG_WARN("Error while waiting on socket, Disconnecting source");
500                 source->running = 0;
501             }
502             break;
503         }
504         if (fds == 0)
505         {
506             thread_mutex_lock(&source->lock);
507             if ((source->last_read + (time_t)source->timeout) < current)
508             {
509                 ICECAST_LOG_DEBUG("last %ld, timeout %d, now %ld", (long)source->last_read,
510                         source->timeout, (long)current);
511                 ICECAST_LOG_WARN("Disconnecting source due to socket timeout");
512                 source->running = 0;
513             }
514             thread_mutex_unlock(&source->lock);
515             break;
516         }
517         source->last_read = current;
518         refbuf = source->format->get_buffer (source);
519         if (source->client->con && source->client->con->error)
520         {
521             ICECAST_LOG_INFO("End of Stream %s", source->mount);
522             source->running = 0;
523             continue;
524         }
525         if (refbuf)
526             break;
527     }
528 
529     return refbuf;
530 }
531 
532 
533 /* general send routine per listener.  The deletion_expected tells us whether
534  * the last in the queue is about to disappear, so if this client is still
535  * referring to it after writing then drop the client as it's fallen too far
536  * behind
537  */
send_to_listener(source_t * source,client_t * client,int deletion_expected)538 static void send_to_listener (source_t *source, client_t *client, int deletion_expected)
539 {
540     int bytes;
541     int loop = 10;   /* max number of iterations in one go */
542     int total_written = 0;
543 
544     while (1)
545     {
546         /* check for limited listener time */
547         if (client->con->discon_time)
548             if (time(NULL) >= client->con->discon_time)
549             {
550                 ICECAST_LOG_INFO("time limit reached for client #%lu", client->con->id);
551                 client->con->error = 1;
552             }
553 
554         /* jump out if client connection has died */
555         if (client->con->error)
556             break;
557 
558         /* lets not send too much to one client in one go, but don't
559            sleep for too long if more data can be sent */
560         if (total_written > 20000 || loop == 0)
561         {
562             if (client->check_buffer != format_check_file_buffer)
563                 source->short_delay = 1;
564             break;
565         }
566 
567         loop--;
568 
569         if (client->check_buffer (source, client) < 0)
570             break;
571 
572         bytes = client->write_to_client (client);
573         if (bytes <= 0)
574             break;  /* can't write any more */
575 
576         total_written += bytes;
577     }
578     source->format->sent_bytes += total_written;
579 
580     /* the refbuf referenced at head (last in queue) may be marked for deletion
581      * if so, check to see if this client is still referring to it */
582     if (deletion_expected && client->refbuf && client->refbuf == source->stream_data)
583     {
584         ICECAST_LOG_INFO("Client %lu (%s) has fallen too far behind, removing",
585                 client->con->id, client->con->ip);
586         stats_event_inc (source->mount, "slow_listeners");
587         client->con->error = 1;
588     }
589 }
590 
591 
592 /* Open the file for stream dumping.
593  * This function should do all processing of the filename.
594  */
source_open_dumpfile(const char * filename)595 static FILE * source_open_dumpfile(const char * filename) {
596 #ifndef _WIN32
597     /* some of the below functions seems not to be standard winapi functions */
598     char buffer[PATH_MAX];
599     time_t curtime;
600     struct tm *loctime;
601 
602     /* Get the current time. */
603     curtime = time (NULL);
604 
605     /* Convert it to local time representation. */
606     loctime = localtime (&curtime);
607 
608     strftime (buffer, sizeof(buffer), filename, loctime);
609     filename = buffer;
610 #endif
611 
612     return fopen (filename, "ab");
613 }
614 
615 /* Perform any initialisation just before the stream data is processed, the header
616  * info is processed by now and the format details are setup
617  */
source_init(source_t * source)618 static void source_init (source_t *source)
619 {
620     ice_config_t *config = config_get_config();
621     char *listenurl;
622     const char *str;
623     int listen_url_size;
624     mount_proxy *mountinfo;
625 
626     /* 6 for max size of port */
627     listen_url_size = strlen("http://") + strlen(config->hostname) +
628         strlen(":") + 6 + strlen(source->mount) + 1;
629 
630     listenurl = malloc (listen_url_size);
631     memset (listenurl, '\000', listen_url_size);
632     snprintf (listenurl, listen_url_size, "http://%s:%d%s",
633             config->hostname, config->port, source->mount);
634     config_release_config();
635 
636     str = httpp_getvar(source->parser, "ice-audio-info");
637     source->audio_info = util_dict_new();
638     if (str)
639     {
640         _parse_audio_info (source, str);
641         stats_event (source->mount, "audio_info", str);
642     }
643 
644     stats_event (source->mount, "listenurl", listenurl);
645 
646     free(listenurl);
647 
648     if (source->dumpfilename != NULL)
649     {
650         source->dumpfile = source_open_dumpfile (source->dumpfilename);
651         if (source->dumpfile == NULL)
652         {
653             ICECAST_LOG_WARN("Cannot open dump file \"%s\" for appending: %s, disabling.",
654                     source->dumpfilename, strerror(errno));
655         }
656     }
657 
658     /* grab a read lock, to make sure we get a chance to cleanup */
659     thread_rwlock_rlock (source->shutdown_rwlock);
660 
661     /* start off the statistics */
662     source->listeners = 0;
663     stats_event_inc (NULL, "source_total_connections");
664     stats_event (source->mount, "slow_listeners", "0");
665     stats_event_args (source->mount, "listeners", "%lu", source->listeners);
666     stats_event_args (source->mount, "listener_peak", "%lu", source->peak_listeners);
667     stats_event_time (source->mount, "stream_start");
668     stats_event_time_iso8601 (source->mount, "stream_start_iso8601");
669 
670     ICECAST_LOG_DEBUG("Source creation complete");
671     source->last_read = time (NULL);
672     source->prev_listeners = -1;
673     source->running = 1;
674 
675     mountinfo = config_find_mount (config_get_config(), source->mount, MOUNT_TYPE_NORMAL);
676     if (mountinfo)
677     {
678         if (mountinfo->on_connect)
679             source_run_script (mountinfo->on_connect, source->mount);
680         auth_stream_start (mountinfo, source->mount);
681     }
682     config_release_config();
683 
684     /*
685     ** Now, if we have a fallback source and override is on, we want
686     ** to steal its clients, because it means we've come back online
687     ** after a failure and they should be gotten back from the waiting
688     ** loop or jingle track or whatever the fallback is used for
689     */
690 
691     if (source->fallback_override && source->fallback_mount)
692     {
693         source_t *fallback_source;
694 
695         avl_tree_rlock(global.source_tree);
696         fallback_source = source_find_mount(source->fallback_mount);
697 
698         if (fallback_source)
699             source_move_clients (fallback_source, source);
700 
701         avl_tree_unlock(global.source_tree);
702     }
703 }
704 
705 
source_main(source_t * source)706 void source_main (source_t *source)
707 {
708     refbuf_t *refbuf;
709     client_t *client;
710     avl_node *client_node;
711 
712     source_init (source);
713 
714     while (global.running == ICECAST_RUNNING && source->running) {
715         int remove_from_q;
716 
717         refbuf = get_next_buffer (source);
718 
719         remove_from_q = 0;
720         source->short_delay = 0;
721 
722         if (refbuf)
723         {
724             /* append buffer to the in-flight data queue,  */
725             if (source->stream_data == NULL)
726             {
727                 source->stream_data = refbuf;
728                 source->burst_point = refbuf;
729             }
730             if (source->stream_data_tail)
731                 source->stream_data_tail->next = refbuf;
732             source->stream_data_tail = refbuf;
733             source->queue_size += refbuf->len;
734             /* new buffer is referenced for burst */
735             refbuf_addref (refbuf);
736 
737             /* new data on queue, so check the burst point */
738             source->burst_offset += refbuf->len;
739             while (source->burst_offset > source->burst_size)
740             {
741                 refbuf_t *to_release = source->burst_point;
742 
743                 if (to_release->next)
744                 {
745                     source->burst_point = to_release->next;
746                     source->burst_offset -= to_release->len;
747                     refbuf_release (to_release);
748                     continue;
749                 }
750                 break;
751             }
752 
753             /* save stream to file */
754             if (source->dumpfile && source->format->write_buf_to_file)
755                 source->format->write_buf_to_file (source, refbuf);
756         }
757         /* lets see if we have too much data in the queue, but don't remove it until later */
758         thread_mutex_lock(&source->lock);
759         if (source->queue_size > source->queue_size_limit)
760             remove_from_q = 1;
761         thread_mutex_unlock(&source->lock);
762 
763         /* acquire write lock on pending_tree */
764         avl_tree_wlock(source->pending_tree);
765 
766         /* acquire write lock on client_tree */
767         avl_tree_wlock(source->client_tree);
768 
769         client_node = avl_get_first(source->client_tree);
770         while (client_node) {
771             client = (client_t *)client_node->key;
772 
773             send_to_listener (source, client, remove_from_q);
774 
775             if (client->con->error) {
776                 client_node = avl_get_next(client_node);
777                 if (client->respcode == 200)
778                     stats_event_dec (NULL, "listeners");
779                 avl_delete(source->client_tree, (void *)client, _free_client);
780                 source->listeners--;
781                 ICECAST_LOG_DEBUG("Client removed");
782                 continue;
783             }
784             client_node = avl_get_next(client_node);
785         }
786 
787         /** add pending clients **/
788         client_node = avl_get_first(source->pending_tree);
789         while (client_node) {
790 
791             if(source->max_listeners != -1 &&
792                     source->listeners >= (unsigned long)source->max_listeners)
793             {
794                 /* The common case is caught in the main connection handler,
795                  * this deals with rarer cases (mostly concerning fallbacks)
796                  * and doesn't give the listening client any information about
797                  * why they were disconnected
798                  */
799                 client = (client_t *)client_node->key;
800                 client_node = avl_get_next(client_node);
801                 avl_delete(source->pending_tree, (void *)client, _free_client);
802 
803                 ICECAST_LOG_INFO("Client deleted, exceeding maximum listeners for this "
804                         "mountpoint (%s).", source->mount);
805                 continue;
806             }
807 
808             /* Otherwise, the client is accepted, add it */
809             avl_insert(source->client_tree, client_node->key);
810 
811             source->listeners++;
812             ICECAST_LOG_DEBUG("Client added for mountpoint (%s)", source->mount);
813             stats_event_inc(source->mount, "connections");
814 
815             client_node = avl_get_next(client_node);
816         }
817 
818         /** clear pending tree **/
819         while (avl_get_first(source->pending_tree)) {
820             avl_delete(source->pending_tree,
821                     avl_get_first(source->pending_tree)->key,
822                     source_remove_client);
823         }
824 
825         /* release write lock on pending_tree */
826         avl_tree_unlock(source->pending_tree);
827 
828         /* update the stats if need be */
829         if (source->listeners != source->prev_listeners)
830         {
831             source->prev_listeners = source->listeners;
832             ICECAST_LOG_INFO("listener count on %s now %lu", source->mount, source->listeners);
833             if (source->listeners > source->peak_listeners)
834             {
835                 source->peak_listeners = source->listeners;
836                 stats_event_args (source->mount, "listener_peak", "%lu", source->peak_listeners);
837             }
838             stats_event_args (source->mount, "listeners", "%lu", source->listeners);
839             if (source->listeners == 0 && source->on_demand)
840                 source->running = 0;
841         }
842 
843         /* lets reduce the queue, any lagging clients should of been
844          * terminated by now
845          */
846         if (source->stream_data)
847         {
848             /* normal unreferenced queue data will have a refcount 1, but
849              * burst queue data will be at least 2, active clients will also
850              * increase refcount */
851             while (source->stream_data->_count == 1)
852             {
853                 refbuf_t *to_go = source->stream_data;
854 
855                 if (to_go->next == NULL || source->burst_point == to_go)
856                 {
857                     /* this should not happen */
858                     ICECAST_LOG_ERROR("queue state is unexpected");
859                     source->running = 0;
860                     break;
861                 }
862                 source->stream_data = to_go->next;
863                 source->queue_size -= to_go->len;
864                 to_go->next = NULL;
865                 refbuf_release (to_go);
866             }
867         }
868 
869         /* release write lock on client_tree */
870         avl_tree_unlock(source->client_tree);
871     }
872     source_shutdown (source);
873 }
874 
875 
source_shutdown(source_t * source)876 static void source_shutdown (source_t *source)
877 {
878     mount_proxy *mountinfo;
879 
880     source->running = 0;
881     if (source->con && source->con->ip) {
882         ICECAST_LOG_INFO("Source from %s at \"%s\" exiting", source->con->ip, source->mount);
883     } else {
884         ICECAST_LOG_INFO("Source at \"%s\" exiting", source->mount);
885     }
886 
887     mountinfo = config_find_mount (config_get_config(), source->mount, MOUNT_TYPE_NORMAL);
888     if (mountinfo)
889     {
890         if (mountinfo->on_disconnect)
891             source_run_script (mountinfo->on_disconnect, source->mount);
892         auth_stream_end (mountinfo, source->mount);
893     }
894     config_release_config();
895 
896     /* we have de-activated the source now, so no more clients will be
897      * added, now move the listeners we have to the fallback (if any)
898      */
899     if (source->fallback_mount)
900     {
901         source_t *fallback_source;
902 
903         avl_tree_rlock(global.source_tree);
904         fallback_source = source_find_mount (source->fallback_mount);
905 
906         if (fallback_source != NULL)
907             source_move_clients (source, fallback_source);
908 
909         avl_tree_unlock (global.source_tree);
910     }
911 
912     /* delete this sources stats */
913     stats_event(source->mount, NULL, NULL);
914 
915     /* we don't remove the source from the tree here, it may be a relay and
916        therefore reserved */
917     source_clear_source (source);
918 
919     global_lock();
920     global.sources--;
921     stats_event_args (NULL, "sources", "%d", global.sources);
922     global_unlock();
923 
924     /* release our hold on the lock so the main thread can continue cleaning up */
925     thread_rwlock_unlock(source->shutdown_rwlock);
926 }
927 
928 
_compare_clients(void * compare_arg,void * a,void * b)929 static int _compare_clients(void *compare_arg, void *a, void *b)
930 {
931     client_t *clienta = (client_t *)a;
932     client_t *clientb = (client_t *)b;
933 
934     connection_t *cona = clienta->con;
935     connection_t *conb = clientb->con;
936 
937     if (cona->id < conb->id) return -1;
938     if (cona->id > conb->id) return 1;
939 
940     return 0;
941 }
942 
source_remove_client(void * key)943 int source_remove_client(void *key)
944 {
945     return 1;
946 }
947 
_free_client(void * key)948 static int _free_client(void *key)
949 {
950     client_t *client = (client_t *)key;
951 
952     /* if no response has been sent then send a 404 */
953     if (client->respcode == 0)
954         client_send_404 (client, "Mount unavailable");
955     else
956         client_destroy(client);
957 
958     return 1;
959 }
960 
_parse_audio_info(source_t * source,const char * s)961 static void _parse_audio_info (source_t *source, const char *s)
962 {
963     const char *start = s;
964     unsigned int len;
965 
966     while (start != NULL && *start != '\0')
967     {
968         if ((s = strchr (start, ';')) == NULL)
969             len = strlen (start);
970         else
971         {
972             len = (int)(s - start);
973             s++; /* skip passed the ';' */
974         }
975         if (len)
976         {
977             char name[100], value[200];
978             char *esc;
979 
980             sscanf (start, "%99[^=]=%199[^;\r\n]", name, value);
981             esc = util_url_unescape (value);
982             if (esc)
983             {
984                 util_dict_set (source->audio_info, name, esc);
985                 stats_event (source->mount, name, esc);
986                 free (esc);
987             }
988         }
989         start = s;
990     }
991 }
992 
993 
994 /* Apply the mountinfo details to the source */
source_apply_mount(source_t * source,mount_proxy * mountinfo)995 static void source_apply_mount (source_t *source, mount_proxy *mountinfo)
996 {
997     const char *str;
998     int val;
999     http_parser_t *parser = NULL;
1000 
1001     ICECAST_LOG_DEBUG("Applying mount information for \"%s\"", source->mount);
1002     avl_tree_rlock (source->client_tree);
1003     stats_event_args (source->mount, "listener_peak", "%lu", source->peak_listeners);
1004 
1005     if (mountinfo)
1006     {
1007         source->max_listeners = mountinfo->max_listeners;
1008         source->fallback_override = mountinfo->fallback_override;
1009         source->hidden = mountinfo->hidden;
1010     }
1011 
1012     /* if a setting is available in the mount details then use it, else
1013      * check the parser details. */
1014 
1015     if (source->client)
1016         parser = source->client->parser;
1017 
1018     /* to be done before possible non-utf8 stats */
1019     if (source->format && source->format->apply_settings)
1020         source->format->apply_settings (source->client, source->format, mountinfo);
1021 
1022     /* public */
1023     if (mountinfo && mountinfo->yp_public >= 0)
1024         val = mountinfo->yp_public;
1025     else
1026     {
1027         do {
1028             str = httpp_getvar (parser, "ice-public");
1029             if (str) break;
1030             str = httpp_getvar (parser, "icy-pub");
1031             if (str) break;
1032             str = httpp_getvar (parser, "x-audiocast-public");
1033             if (str) break;
1034             /* handle header from icecast v2 release */
1035             str = httpp_getvar (parser, "icy-public");
1036             if (str) break;
1037             str = "0";
1038         } while (0);
1039         val = atoi (str);
1040     }
1041     stats_event_args (source->mount, "public", "%d", val);
1042     if (source->yp_public != val)
1043     {
1044         ICECAST_LOG_DEBUG("YP changed to %d", val);
1045         if (val)
1046             yp_add (source->mount);
1047         else
1048             yp_remove (source->mount);
1049         source->yp_public = val;
1050     }
1051 
1052     /* stream name */
1053     if (mountinfo && mountinfo->stream_name)
1054         stats_event (source->mount, "server_name", mountinfo->stream_name);
1055     else
1056     {
1057         do {
1058             str = httpp_getvar (parser, "ice-name");
1059             if (str) break;
1060             str = httpp_getvar (parser, "icy-name");
1061             if (str) break;
1062             str = httpp_getvar (parser, "x-audiocast-name");
1063             if (str) break;
1064             str = "Unspecified name";
1065         } while (0);
1066         if (source->format)
1067             stats_event_conv (source->mount, "server_name", str, source->format->charset);
1068     }
1069 
1070     /* stream description */
1071     if (mountinfo && mountinfo->stream_description)
1072         stats_event (source->mount, "server_description", mountinfo->stream_description);
1073     else
1074     {
1075         do {
1076             str = httpp_getvar (parser, "ice-description");
1077             if (str) break;
1078             str = httpp_getvar (parser, "icy-description");
1079             if (str) break;
1080             str = httpp_getvar (parser, "x-audiocast-description");
1081             if (str) break;
1082             str = "Unspecified description";
1083         } while (0);
1084         if (source->format)
1085             stats_event_conv (source->mount, "server_description", str, source->format->charset);
1086     }
1087 
1088     /* stream URL */
1089     if (mountinfo && mountinfo->stream_url)
1090         stats_event (source->mount, "server_url", mountinfo->stream_url);
1091     else
1092     {
1093         do {
1094             str = httpp_getvar (parser, "ice-url");
1095             if (str) break;
1096             str = httpp_getvar (parser, "icy-url");
1097             if (str) break;
1098             str = httpp_getvar (parser, "x-audiocast-url");
1099             if (str) break;
1100         } while (0);
1101         if (str && source->format)
1102             stats_event_conv (source->mount, "server_url", str, source->format->charset);
1103     }
1104 
1105     /* stream genre */
1106     if (mountinfo && mountinfo->stream_genre)
1107         stats_event (source->mount, "genre", mountinfo->stream_genre);
1108     else
1109     {
1110         do {
1111             str = httpp_getvar (parser, "ice-genre");
1112             if (str) break;
1113             str = httpp_getvar (parser, "icy-genre");
1114             if (str) break;
1115             str = httpp_getvar (parser, "x-audiocast-genre");
1116             if (str) break;
1117             str = "various";
1118         } while (0);
1119         if (source->format)
1120             stats_event_conv (source->mount, "genre", str, source->format->charset);
1121     }
1122 
1123     /* stream bitrate */
1124     if (mountinfo && mountinfo->bitrate)
1125         str = mountinfo->bitrate;
1126     else
1127     {
1128         do {
1129             str = httpp_getvar (parser, "ice-bitrate");
1130             if (str) break;
1131             str = httpp_getvar (parser, "icy-br");
1132             if (str) break;
1133             str = httpp_getvar (parser, "x-audiocast-bitrate");
1134         } while (0);
1135     }
1136     stats_event (source->mount, "bitrate", str);
1137 
1138     /* handle MIME-type */
1139     if (mountinfo && mountinfo->type)
1140         stats_event (source->mount, "server_type", mountinfo->type);
1141     else
1142         if (source->format)
1143             stats_event (source->mount, "server_type", source->format->contenttype);
1144 
1145     if (mountinfo && mountinfo->subtype)
1146         stats_event (source->mount, "subtype", mountinfo->subtype);
1147 
1148     if (mountinfo && mountinfo->auth)
1149         stats_event (source->mount, "authenticator", mountinfo->auth->type);
1150     else
1151         stats_event (source->mount, "authenticator", NULL);
1152 
1153     if (mountinfo && mountinfo->fallback_mount)
1154     {
1155         char *mount = source->fallback_mount;
1156         source->fallback_mount = strdup (mountinfo->fallback_mount);
1157         free (mount);
1158     }
1159     else
1160         source->fallback_mount = NULL;
1161 
1162     if (mountinfo && mountinfo->dumpfile)
1163     {
1164         char *filename = source->dumpfilename;
1165         source->dumpfilename = strdup (mountinfo->dumpfile);
1166         free (filename);
1167     }
1168     else
1169         source->dumpfilename = NULL;
1170 
1171     if (source->intro_file)
1172     {
1173         fclose (source->intro_file);
1174         source->intro_file = NULL;
1175     }
1176     if (mountinfo && mountinfo->intro_filename)
1177     {
1178         ice_config_t *config = config_get_config_unlocked ();
1179         unsigned int len  = strlen (config->webroot_dir) +
1180             strlen (mountinfo->intro_filename) + 2;
1181         char *path = malloc (len);
1182         if (path)
1183         {
1184             FILE *f;
1185             snprintf (path, len, "%s" PATH_SEPARATOR "%s", config->webroot_dir,
1186                     mountinfo->intro_filename);
1187 
1188             f = fopen (path, "rb");
1189             if (f)
1190                 source->intro_file = f;
1191             else
1192                 ICECAST_LOG_WARN("Cannot open intro file \"%s\": %s", path, strerror(errno));
1193             free (path);
1194         }
1195     }
1196 
1197     if (mountinfo && mountinfo->queue_size_limit)
1198         source->queue_size_limit = mountinfo->queue_size_limit;
1199 
1200     if (mountinfo && mountinfo->source_timeout)
1201         source->timeout = mountinfo->source_timeout;
1202 
1203     if (mountinfo && mountinfo->burst_size >= 0)
1204         source->burst_size = (unsigned int)mountinfo->burst_size;
1205 
1206     if (mountinfo && mountinfo->fallback_when_full)
1207         source->fallback_when_full = mountinfo->fallback_when_full;
1208 
1209     avl_tree_unlock (source->client_tree);
1210 }
1211 
1212 
1213 /* update the specified source with details from the config or mount.
1214  * mountinfo can be NULL in which case default settings should be taken
1215  * This function is called by the Slave thread
1216  */
source_update_settings(ice_config_t * config,source_t * source,mount_proxy * mountinfo)1217 void source_update_settings (ice_config_t *config, source_t *source, mount_proxy *mountinfo)
1218 {
1219     thread_mutex_lock(&source->lock);
1220     /*  skip if source is a fallback to file */
1221     if (source->running && source->client == NULL)
1222     {
1223         stats_event_hidden (source->mount, NULL, 1);
1224         thread_mutex_unlock(&source->lock);
1225         return;
1226     }
1227     /* set global settings first */
1228     source->queue_size_limit = config->queue_size_limit;
1229     source->timeout = config->source_timeout;
1230     source->burst_size = config->burst_size;
1231 
1232     stats_event_args (source->mount, "listenurl", "http://%s:%d%s",
1233             config->hostname, config->port, source->mount);
1234 
1235     source_apply_mount (source, mountinfo);
1236 
1237     if (source->fallback_mount)
1238         ICECAST_LOG_DEBUG("fallback %s", source->fallback_mount);
1239     if (mountinfo && mountinfo->intro_filename)
1240         ICECAST_LOG_DEBUG("intro file is %s", mountinfo->intro_filename);
1241     if (source->dumpfilename)
1242         ICECAST_LOG_DEBUG("Dumping stream to %s", source->dumpfilename);
1243     if (mountinfo && mountinfo->on_connect)
1244         ICECAST_LOG_DEBUG("connect script \"%s\"", mountinfo->on_connect);
1245     if (mountinfo && mountinfo->on_disconnect)
1246         ICECAST_LOG_DEBUG("disconnect script \"%s\"", mountinfo->on_disconnect);
1247     if (source->on_demand)
1248     {
1249         ICECAST_LOG_DEBUG("on_demand set");
1250         stats_event (source->mount, "on_demand", "1");
1251         stats_event_args (source->mount, "listeners", "%ld", source->listeners);
1252     }
1253     else
1254         stats_event (source->mount, "on_demand", NULL);
1255 
1256     if (source->hidden)
1257     {
1258         stats_event_hidden (source->mount, NULL, 1);
1259         ICECAST_LOG_DEBUG("hidden from public");
1260     }
1261     else
1262         stats_event_hidden (source->mount, NULL, 0);
1263 
1264     if (source->max_listeners == -1)
1265         stats_event (source->mount, "max_listeners", "unlimited");
1266     else
1267     {
1268         char buf [10];
1269         snprintf (buf, sizeof (buf), "%ld", source->max_listeners);
1270         stats_event (source->mount, "max_listeners", buf);
1271     }
1272     ICECAST_LOG_DEBUG("public set to %d", source->yp_public);
1273     ICECAST_LOG_DEBUG("max listeners to %ld", source->max_listeners);
1274     ICECAST_LOG_DEBUG("queue size to %u", source->queue_size_limit);
1275     ICECAST_LOG_DEBUG("burst size to %u", source->burst_size);
1276     ICECAST_LOG_DEBUG("source timeout to %u", source->timeout);
1277     ICECAST_LOG_DEBUG("fallback_when_full to %u", source->fallback_when_full);
1278     thread_mutex_unlock(&source->lock);
1279 }
1280 
1281 
source_client_thread(void * arg)1282 void *source_client_thread (void *arg)
1283 {
1284     source_t *source = arg;
1285 
1286     stats_event_inc(NULL, "source_client_connections");
1287     stats_event (source->mount, "listeners", "0");
1288 
1289     source_main (source);
1290 
1291     source_free_source (source);
1292     slave_update_all_mounts();
1293 
1294     return NULL;
1295 }
1296 
1297 
source_client_callback(client_t * client,void * arg)1298 void source_client_callback (client_t *client, void *arg)
1299 {
1300     const char *agent;
1301     source_t *source = arg;
1302     refbuf_t *old_data = client->refbuf;
1303 
1304     if (client->con->error)
1305     {
1306         global_lock();
1307         global.sources--;
1308         global_unlock();
1309         source_clear_source (source);
1310         source_free_source (source);
1311         return;
1312     }
1313     client->refbuf = old_data->associated;
1314     old_data->associated = NULL;
1315     refbuf_release (old_data);
1316     stats_event (source->mount, "source_ip", source->client->con->ip);
1317     agent = httpp_getvar (source->client->parser, "user-agent");
1318     if (agent)
1319         stats_event (source->mount, "user_agent", agent);
1320 
1321     thread_create ("Source Thread", source_client_thread,
1322             source, THREAD_DETACHED);
1323 }
1324 
1325 
1326 #ifndef _WIN32
1327 /* this sets up the new environment for script execution.
1328  * We ignore most failtures as we can not handle them anyway.
1329  */
__setup_empty_script_environment(void)1330 static inline void __setup_empty_script_environment(void) {
1331     int i;
1332 
1333     /* close at least the first 1024 handles */
1334     for (i = 0; i < 1024; i++)
1335         close(i);
1336 
1337     /* open null device */
1338     i = open("/dev/null", O_RDWR);
1339     if (i != -1) {
1340         /* attach null device to stdin, stdout and stderr */
1341         if (i != 0)
1342             dup2(i, 0);
1343         if (i != 1)
1344             dup2(i, 1);
1345         if (i != 2)
1346             dup2(i, 2);
1347 
1348         /* close null device */
1349         if (i > 2)
1350             close(i);
1351     }
1352 }
1353 
source_run_script(char * command,char * mountpoint)1354 static void source_run_script (char *command, char *mountpoint)
1355 {
1356     pid_t pid, external_pid;
1357 
1358     /* do a fork twice so that the command has init as parent */
1359     external_pid = fork();
1360     switch (external_pid)
1361     {
1362         case 0:
1363             switch (pid = fork ())
1364             {
1365                 case -1:
1366                     ICECAST_LOG_ERROR("Unable to fork %s (%s)", command, strerror (errno));
1367                     break;
1368                 case 0:  /* child */
1369                     if (access(command, R_OK|X_OK) != 0) {
1370                         ICECAST_LOG_ERROR("Unable to run command %s (%s)", command, strerror(errno));
1371                         exit(1);
1372                     }
1373                     ICECAST_LOG_DEBUG("Starting command %s", command);
1374                     __setup_empty_script_environment();
1375                     /* consider to add action here as well */
1376                     execl(command, command, mountpoint, (char *)NULL);
1377                     exit(1);
1378                 default: /* parent */
1379                     break;
1380             }
1381             exit (0);
1382         case -1:
1383             ICECAST_LOG_ERROR("Unable to fork %s", strerror (errno));
1384             break;
1385         default: /* parent */
1386             waitpid (external_pid, NULL, 0);
1387             break;
1388     }
1389 }
1390 #endif
1391 
1392 
source_fallback_file(void * arg)1393 static void *source_fallback_file (void *arg)
1394 {
1395     char *mount = arg;
1396     char *type;
1397     char *path;
1398     unsigned int len;
1399     FILE *file = NULL;
1400     source_t *source = NULL;
1401     ice_config_t *config;
1402     http_parser_t *parser;
1403 
1404     do
1405     {
1406         if (mount == NULL || mount[0] != '/')
1407             break;
1408         config = config_get_config ();
1409         len  = strlen (config->webroot_dir) + strlen (mount) + 1;
1410         path = malloc (len);
1411         if (path)
1412             snprintf (path, len, "%s%s", config->webroot_dir, mount);
1413         config_release_config ();
1414 
1415         if (path == NULL)
1416             break;
1417 
1418         file = fopen (path, "rb");
1419         if (file == NULL)
1420         {
1421             ICECAST_LOG_WARN("unable to open file \"%s\"", path);
1422             free (path);
1423             break;
1424         }
1425         free (path);
1426         source = source_reserve (mount);
1427         if (source == NULL)
1428         {
1429             ICECAST_LOG_WARN("mountpoint \"%s\" already reserved", mount);
1430             break;
1431         }
1432         ICECAST_LOG_INFO("mountpoint %s is reserved", mount);
1433         type = fserve_content_type (mount);
1434         parser = httpp_create_parser();
1435         httpp_initialize (parser, NULL);
1436         httpp_setvar (parser, "content-type", type);
1437         free (type);
1438 
1439         source->hidden = 1;
1440         source->yp_public = 0;
1441         source->intro_file = file;
1442         source->parser = parser;
1443         file = NULL;
1444 
1445         if (connection_complete_source (source, 0) < 0)
1446             break;
1447         source_client_thread (source);
1448         httpp_destroy (parser);
1449     } while (0);
1450     if (file)
1451         fclose (file);
1452     free (mount);
1453     return NULL;
1454 }
1455 
1456 
1457 /* rescan the mount list, so that xsl files are updated to show
1458  * unconnected but active fallback mountpoints
1459  */
source_recheck_mounts(int update_all)1460 void source_recheck_mounts (int update_all)
1461 {
1462     ice_config_t *config;
1463     mount_proxy *mount;
1464 
1465     avl_tree_rlock (global.source_tree);
1466     config = config_get_config();
1467     mount = config->mounts;
1468 
1469     if (update_all)
1470         stats_clear_virtual_mounts ();
1471 
1472     for (; mount; mount = mount->next)
1473     {
1474         if (mount->mounttype != MOUNT_TYPE_NORMAL)
1475 	    continue;
1476 
1477         source_t *source = source_find_mount (mount->mountname);
1478 
1479         if (source)
1480         {
1481             source = source_find_mount_raw (mount->mountname);
1482             if (source)
1483             {
1484                 mount_proxy *mountinfo = config_find_mount (config, source->mount, MOUNT_TYPE_NORMAL);
1485                 source_update_settings (config, source, mountinfo);
1486             }
1487             else if (update_all)
1488             {
1489                 stats_event_hidden (mount->mountname, NULL, mount->hidden);
1490                 stats_event_args (mount->mountname, "listenurl", "http://%s:%d%s",
1491                         config->hostname, config->port, mount->mountname);
1492                 stats_event (mount->mountname, "listeners", "0");
1493                 if (mount->max_listeners < 0)
1494                     stats_event (mount->mountname, "max_listeners", "unlimited");
1495                 else
1496                     stats_event_args (mount->mountname, "max_listeners", "%d", mount->max_listeners);
1497             }
1498         }
1499         else
1500             stats_event (mount->mountname, NULL, NULL);
1501 
1502         /* check for fallback to file */
1503         if (global.running == ICECAST_RUNNING && mount->fallback_mount)
1504         {
1505             source_t *fallback = source_find_mount (mount->fallback_mount);
1506             if (fallback == NULL)
1507             {
1508                 thread_create ("Fallback file thread", source_fallback_file,
1509                         strdup (mount->fallback_mount), THREAD_DETACHED);
1510             }
1511         }
1512     }
1513     avl_tree_unlock (global.source_tree);
1514     config_release_config();
1515 }
1516 
1517