1 /* Icecast
2  *
3  * This program is distributed under the GNU General Public License, version 2.
4  * A copy of this license is included with this source.
5  *
6  * Copyright 2000-2004, Jack Moffitt <jack@xiph.org,
7  *                      Michael Smith <msmith@xiph.org>,
8  *                      oddsock <oddsock@xiph.org>,
9  *                      Karl Heyes <karl@xiph.org>
10  *                      and others (see AUTHORS for details).
11  */
12 
13 /* -*- c-basic-offset: 4; indent-tabs-mode: nil; -*- */
14 /* slave.c
15  * by Ciaran Anscomb <ciaran.anscomb@6809.org.uk>
16  *
17  * Periodically requests a list of streams from a master server
18  * and creates source threads for any it doesn't already have.
19  * */
20 
21 #ifdef HAVE_CONFIG_H
22 #include <config.h>
23 #endif
24 
25 #ifdef _WIN32
26 #include <winsock2.h>
27 #endif
28 
29 #include <stdio.h>
30 #include <stdlib.h>
31 #include <string.h>
32 #include <sys/types.h>
33 
34 #ifdef HAVE_SYS_SOCKET_H
35 #include <sys/socket.h>
36 #endif
37 #ifdef HAVE_NETINET_IN_H
38 #include <netinet/in.h>
39 #endif
40 #ifdef HAVE_CURL
41 #include <curl/curl.h>
42 #endif
43 #ifdef HAVE_GETRLIMIT
44 #include <sys/resource.h>
45 #endif
46 
47 #include "compat.h"
48 
49 #include "timing/timing.h"
50 #include "thread/thread.h"
51 #include "avl/avl.h"
52 #include "net/sock.h"
53 #include "httpp/httpp.h"
54 
55 #include "cfgfile.h"
56 #include "global.h"
57 #include "util.h"
58 #include "connection.h"
59 #include "refbuf.h"
60 #include "client.h"
61 #include "stats.h"
62 #include "logging.h"
63 #include "source.h"
64 #include "format.h"
65 #include "event.h"
66 #include "yp.h"
67 #include "slave.h"
68 
69 #define CATMODULE "slave"
70 
71 #ifdef HAVE_CURL
72 struct master_conn_details
73 {
74     char *server;
75     int port;
76     int ssl_port;
77     int send_auth;
78     int on_demand;
79     int previous;
80     int ok;
81     int max_interval;
82     int run_on;
83     time_t synctime;
84     char *buffer;
85     char *username;
86     char *password;
87     char *bind;
88     char *server_id;
89     char *args;
90 };
91 #endif
92 
93 
94 static void _slave_thread(void);
95 static void redirector_add (const char *server, int port, int interval);
96 static redirect_host *find_slave_host (const char *server, int port);
97 static int  relay_startup (client_t *client);
98 static int  relay_initialise (client_t *client);
99 static int  relay_read (client_t *client);
100 static void relay_release (client_t *client);
101 
102 int slave_running = 0;
103 extern int worker_count;
104 int relays_connecting;
105 int streamlister;
106 time_t relay_barrier_master;
107 time_t relay_barrier_xml;
108 
109 static volatile int update_settings = 0;
110 static volatile int update_all_sources = 0;
111 static volatile int restart_connection_thread = 0;
112 static time_t streamlist_check = 0;
113 static rwlock_t slaves_lock;
114 static spin_t relay_start_lock;
115 static time_t inactivity_timer;
116 static int inactivity_timeout;
117 
118 redirect_host *redirectors;
119 worker_t *workers;
120 rwlock_t workers_lock;
121 
122 
123 struct _client_functions relay_client_ops =
124 {
125     relay_read,
126     relay_release
127 };
128 
129 struct _client_functions relay_startup_ops =
130 {
131     relay_startup,
132     relay_release
133 };
134 
135 struct _client_functions relay_init_ops =
136 {
137     relay_initialise,
138     relay_release
139 };
140 
141 
relay_copy(relay_server * r)142 relay_server *relay_copy (relay_server *r)
143 {
144     relay_server *copy = calloc (1, sizeof (relay_server));
145 
146     if (copy)
147     {
148         relay_server_host *from = r->hosts, **insert = &copy->hosts;
149 
150         while (from)
151         {
152             relay_server_host *to = calloc (1, sizeof (relay_server_host));
153             to->ip = (char *)xmlCharStrdup (from->ip);
154             to->mount = (char *)xmlCharStrdup (from->mount);
155             if (from->bind)
156                 to->bind = (char *)xmlCharStrdup (from->bind);
157             to->port = from->port;
158             to->timeout = from->timeout;
159             *insert = to;
160             from = from->next;
161             insert = &to->next;
162         }
163 
164         copy->localmount = (char *)xmlStrdup (XMLSTR(r->localmount));
165         if (r->username)
166             copy->username = (char *)xmlStrdup (XMLSTR(r->username));
167         if (r->password)
168             copy->password = (char *)xmlStrdup (XMLSTR(r->password));
169         copy->flags = r->flags;
170         copy->flags |= RELAY_RUNNING;
171         copy->interval = r->interval;
172         copy->run_on = r->run_on;
173         r->source = NULL;
174         DEBUG2 ("copy relay %s at %p", copy->localmount, copy);
175     }
176     return copy;
177 }
178 
179 
180 /* force a recheck of the mounts.
181  */
slave_update_mounts(void)182 void slave_update_mounts (void)
183 {
184     thread_spin_lock (&relay_start_lock);
185     update_settings = 1;
186     thread_spin_unlock (&relay_start_lock);
187 }
188 
189 /* force a recheck of the mounts.
190  */
slave_update_all_mounts(void)191 void slave_update_all_mounts (void)
192 {
193     thread_spin_lock (&relay_start_lock);
194     update_settings = 1;
195     update_all_sources = 1;
196     thread_spin_unlock (&relay_start_lock);
197 }
198 
199 
200 /* called on reload, so drop all redirection and trigger source checkup and
201  * rebuild all stat mountpoints
202  */
slave_restart(void)203 void slave_restart (void)
204 {
205     thread_spin_lock (&relay_start_lock);
206     restart_connection_thread = 1;
207     update_settings = 1;
208     update_all_sources = 1;
209     streamlist_check = 0;
210     thread_spin_unlock (&relay_start_lock);
211 }
212 
213 
_compare_relay(void * arg,void * a,void * b)214 static int _compare_relay(void *arg, void *a, void *b)
215 {
216     relay_server *nodea = (relay_server *)a;
217     relay_server *nodeb = (relay_server *)b;
218 
219     return strcmp(nodea->localmount, nodeb->localmount);
220 }
221 
222 
slave_initialize(void)223 void slave_initialize(void)
224 {
225     if (slave_running)
226         return;
227 
228     thread_rwlock_create (&slaves_lock);
229     slave_running = 1;
230     streamlister = 0;
231     streamlist_check = 0;
232     update_settings = 0;
233     update_all_sources = 0;
234     restart_connection_thread = 0;
235     redirectors = NULL;
236     workers = NULL;
237     worker_count = 0;
238     relays_connecting = 0;
239     thread_spin_create (&relay_start_lock);
240     thread_rwlock_create (&workers_lock);
241     global.relays = avl_tree_new (_compare_relay, NULL);
242     inactivity_timeout = 0;
243     inactivity_timer = 0;
244 #ifndef HAVE_CURL
245     ERROR0 ("streamlist request disabled, rebuild with libcurl if required");
246 #endif
247     _slave_thread ();
248     yp_stop ();
249     workers_adjust(0);
250 }
251 
252 
slave_shutdown(void)253 void slave_shutdown(void)
254 {
255     if (slave_running == 0)
256         return;
257     DEBUG0 ("shutting down slave");
258     yp_shutdown();
259     stats_shutdown();
260     fserve_shutdown();
261     config_shutdown();
262     stop_logging();
263     // stall until workers have shut down
264     thread_rwlock_wlock (&global.workers_rw);
265     thread_rwlock_unlock (&global.workers_rw);
266 
267     //INFO0 ("all workers shut down");
268     avl_tree_free (global.relays, NULL);
269     thread_rwlock_destroy (&slaves_lock);
270     thread_rwlock_destroy (&workers_lock);
271     thread_spin_destroy (&relay_start_lock);
272     slave_running = 0;
273 }
274 
275 
redirect_client(const char * mountpoint,client_t * client)276 int redirect_client (const char *mountpoint, client_t *client)
277 {
278     int ret = 0, which;
279     redirect_host *checking, **trail;
280 
281     thread_rwlock_rlock (&slaves_lock);
282     /* select slave entry */
283     if (global.redirect_count == 0)
284     {
285         thread_rwlock_unlock (&slaves_lock);
286         return 0;
287     }
288     which=(int) (((float)global.redirect_count)*rand()/(RAND_MAX+1.0)) + 1;
289     checking = redirectors;
290     trail = &redirectors;
291 
292     DEBUG2 ("random selection %d (out of %d)", which, global.redirect_count);
293     while (checking)
294     {
295         DEBUG2 ("...%s:%d", checking->server, checking->port);
296         if (checking->next_update && checking->next_update+10 < time(NULL))
297         {
298             /* no streamist request, expire slave for now */
299             *trail = checking->next;
300             global.redirect_count--;
301             /* free slave details */
302             INFO2 ("dropping redirector for %s:%d", checking->server, checking->port);
303             free (checking->server);
304             free (checking);
305             checking = *trail;
306             if (which > 0)
307                 which--; /* we are 1 less now */
308             continue;
309         }
310         if (--which == 0)
311         {
312             char *location;
313             /* add enough for "http://" the port ':' and nul */
314             int len = strlen (mountpoint) + strlen (checking->server) + 20;
315             const char *user = client->username;
316             const char *pass = client->password;
317             const char *args = httpp_getvar (client->parser, HTTPP_VAR_QUERYARGS);
318             const char *colon = ":", *at_sign = "@";
319 
320             if (args)
321                 len += strlen (args);
322             else
323                 args = "";
324             if (user && pass)
325                 len += strlen (user) + strlen (pass);
326             else
327                 colon = at_sign = user = pass = "";
328             INFO2 ("redirecting listener to slave server at %s:%d", checking->server, checking->port);
329             location = alloca (len);
330             snprintf (location, len, "%s://%s%s%s%s%s:%d%s%s", httpp_getvar (client->parser, HTTPP_VAR_PROTOCOL),
331                     user, colon, pass, at_sign,
332                     checking->server, checking->port, mountpoint, args);
333             client_send_302 (client, location);
334             ret = 1;
335         }
336         trail = &checking->next;
337         checking = checking->next;
338     }
339     thread_rwlock_unlock (&slaves_lock);
340     return ret;
341 }
342 
343 
344 
get_relay_response(connection_t * con,const char * mount,const char * server,const char * headers)345 static http_parser_t *get_relay_response (connection_t *con, const char *mount,
346         const char *server, const char *headers)
347 {
348     ice_config_t *config = config_get_config ();
349     char *server_id = strdup (config->server_id);
350     http_parser_t *parser = NULL;
351     char response [4096];
352 
353     config_release_config ();
354 
355     /* At this point we may not know if we are relaying an mp3 or vorbis
356      * stream, but only send the icy-metadata header if the relay details
357      * state so (the typical case).  It's harmless in the vorbis case. If
358      * we don't send in this header then relay will not have mp3 metadata.
359      */
360     sock_write (con->sock, "GET %s HTTP/1.0\r\n"
361             "User-Agent: %s\r\n"
362             "Host: %s\r\n"
363             "%s"
364             "\r\n",
365             mount,
366             server_id,
367             server,
368             headers ? headers : "");
369 
370     free (server_id);
371     memset (response, 0, sizeof(response));
372     if (util_read_header (con->sock, response, 4096, READ_ENTIRE_HEADER) == 0)
373     {
374         WARN2 ("Header read failure from %s %s", server, mount);
375         return NULL;
376     }
377     parser = httpp_create_parser();
378     httpp_initialize (parser, NULL);
379     if (! httpp_parse_response (parser, response, strlen(response), mount))
380     {
381         INFO0 ("problem parsing response from relay");
382         httpp_destroy (parser);
383         return NULL;
384     }
385     return parser;
386 }
387 
388 
389 
encode_auth_header(char * userpass,unsigned int remain)390 static void encode_auth_header (char *userpass, unsigned int remain)
391 {
392     if (userpass && userpass[0])
393     {
394         char *esc_authorisation = util_base64_encode (userpass);
395 
396         if (snprintf (userpass, remain, "Authorization: Basic %s\r\n", esc_authorisation) < 0)
397             userpass[0] = '\0';
398         free (esc_authorisation);
399     }
400 }
401 
402 
403 /* Actually open the connection and do some http parsing, handle any 302
404  * responses within here.
405  */
open_relay_connection(client_t * client,relay_server * relay,relay_server_host * host)406 static int open_relay_connection (client_t *client, relay_server *relay, relay_server_host *host)
407 {
408     int redirects = 0;
409     http_parser_t *parser = NULL;
410     connection_t *con = &client->connection;
411     char *server = strdup (host->ip);
412     char *mount = strdup (host->mount);
413     int port = host->port, timeout = host->timeout, remain;
414     char *p, headers[4096] = "";
415 
416     remain = sizeof (headers);
417     if (relay->flags & RELAY_ICY_META)
418         remain -= snprintf (headers, remain, "Icy-MetaData: 1\r\n");
419     p = headers + strlen (headers);
420     if (relay->username && relay->password)
421     {
422         INFO2 ("using username %s for %s", relay->username, relay->localmount);
423         snprintf (p, remain, "%s:%s", relay->username, relay->password);
424         encode_auth_header (p, remain);
425     }
426     while (1)
427     {
428         sock_t streamsock;
429         char *bind = NULL;
430 
431         if (redirects > 10)
432         {
433             WARN1 ("detected too many redirects on %s", relay->localmount);
434             break;
435         }
436         /* policy decision, we assume a source bind even after redirect, possible option */
437         if (host->bind)
438             bind = strdup (host->bind);
439 
440         if (bind)
441             INFO4 ("connecting to %s:%d for %s, bound to %s", server, port, relay->localmount, bind);
442         else
443             INFO3 ("connecting to %s:%d for %s", server, port, relay->localmount);
444 
445         con->con_time = time (NULL);
446         relay->in_use = host;
447         streamsock = sock_connect_wto_bind (server, port, bind, timeout);
448         free (bind);
449         if (connection_init (con, streamsock, server) < 0)
450         {
451             WARN2 ("Failed to connect to %s:%d", server, port);
452             break;
453         }
454 
455         parser = get_relay_response (con, mount, server, headers);
456 
457         if (parser == NULL)
458         {
459             ERROR4 ("Problem trying to start relay on %s (%s:%d%s)", relay->localmount,
460                     server, port, mount);
461             break;
462         }
463         if (strcmp (httpp_getvar (parser, HTTPP_VAR_ERROR_CODE), "302") == 0)
464         {
465             /* better retry the connection again but with different details */
466             const char *uri, *mountpoint;
467             int len;
468 
469             uri = httpp_getvar (parser, "location");
470             INFO2 ("redirect received on %s : %s", relay->localmount, uri);
471             if (strncmp (uri, "http://", 7) != 0)
472                 break;
473             uri += 7;
474             mountpoint = strchr (uri, '/');
475             free (mount);
476             if (mountpoint)
477                 mount = strdup (mountpoint);
478             else
479                 mount = strdup ("/");
480 
481             len = strcspn (uri, "@/");
482             if (uri [len] == '@')
483             {
484                 snprintf (p, remain, "%.*s", len, uri);
485                 encode_auth_header (p, remain);
486                 uri += len + 1;
487             }
488             len = strcspn (uri, ":/");
489             port = 80;
490             if (uri [len] == ':')
491                 port = atoi (uri+len+1);
492             free (server);
493             server = calloc (1, len+1);
494             strncpy (server, uri, len);
495             connection_close (con);
496             httpp_destroy (parser);
497             parser = NULL;
498         }
499         else
500         {
501             if (httpp_getvar (parser, HTTPP_VAR_ERROR_MESSAGE))
502             {
503                 ERROR3 ("Error from relay request on %s (%s %s)", relay->localmount,
504                         host->mount, httpp_getvar(parser, HTTPP_VAR_ERROR_MESSAGE));
505                 client->parser = NULL;
506                 break;
507             }
508             sock_set_blocking (streamsock, 0);
509             thread_rwlock_wlock (&relay->source->lock);
510             client->parser = parser; // old parser will be free in the format clear
511             thread_rwlock_unlock (&relay->source->lock);
512             client->connection.discon.time = 0;
513             client->connection.con_time = time (NULL);
514             client_set_queue (client, NULL);
515             free (server);
516             free (mount);
517 
518             return 0;
519         }
520         redirects++;
521     }
522     /* failed, better clean up */
523     free (server);
524     free (mount);
525     if (parser)
526         httpp_destroy (parser);
527     connection_close (con);
528     con->con_time = time (NULL); // sources count needs to drop in such cases
529     if (relay->in_use) relay->in_use->skip = 1;
530     return -1;
531 }
532 
533 
534 /* This does the actual connection for a relay. A thread is
535  * started off if a connection can be acquired
536  */
open_relay(relay_server * relay)537 int open_relay (relay_server *relay)
538 {
539     source_t *src = relay->source;
540     relay_server_host *host = relay->hosts;
541     client_t *client = src->client;
542     do
543     {
544         int ret;
545 
546         if (host->skip)
547         {
548             INFO3 ("skipping %s:%d for %s", host->ip, host->port, relay->localmount);
549             continue;
550         }
551         thread_rwlock_unlock (&src->lock);
552         ret = open_relay_connection (client, relay, host);
553         thread_rwlock_wlock (&src->lock);
554 
555         if (ret < 0)
556             continue;
557 
558         if (source_format_init (src) < 0)
559         {
560             WARN1 ("Failed to complete initialisation on %s", relay->localmount);
561             continue;
562         }
563         return 1;
564     } while ((host = host->next) && global.running == ICE_RUNNING);
565     return -1;
566 }
567 
start_relay_stream(void * arg)568 static void *start_relay_stream (void *arg)
569 {
570     client_t *client = arg;
571     relay_server *relay;
572     source_t *src;
573     int failed = 1, sources;
574 
575     global_lock();
576     sources = ++global.sources;
577     stats_event_args (NULL, "sources", "%d", global.sources);
578     global_unlock();
579     /* set the start time, because we want to decrease the sources on all failures */
580     client->connection.con_time = time (NULL);
581     do
582     {
583         ice_config_t *config = config_get_config();
584         mount_proxy *mountinfo;
585 
586         relay = client->shared_data;
587         src = relay->source;
588 
589         thread_rwlock_wlock (&src->lock);
590         src->flags |= SOURCE_PAUSE_LISTENERS;
591         if (sources > config->source_limit)
592         {
593             config_release_config();
594             WARN1 ("starting relayed mountpoint \"%s\" requires a higher sources limit", relay->localmount);
595             break;
596         }
597         config_release_config();
598         INFO1("Starting relayed source at mountpoint \"%s\"", relay->localmount);
599 
600         if (open_relay (relay) < 0)
601             break;
602         stats_event_inc (NULL, "source_relay_connections");
603         source_init (src);
604         config = config_get_config();
605         mountinfo = config_find_mount (config, src->mount);
606         source_update_settings (config, src, mountinfo);
607         INFO1 ("source %s is ready to start", src->mount);
608         config_release_config();
609         failed = 0;
610     } while (0);
611 
612     client->ops = &relay_client_ops;
613     client->schedule_ms = timing_get_time();
614 
615     if (failed)
616     {
617         /* failed to start any connection, better clean up and reset */
618         if ((relay->flags & RELAY_ON_DEMAND) == 0)
619         {
620             yp_remove (relay->localmount);
621             src->yp_public = -1;
622         }
623         relay->in_use = NULL;
624         INFO2 ("listener count remaining on %s is %ld", src->mount, src->listeners);
625         src->flags &= ~(SOURCE_PAUSE_LISTENERS|SOURCE_RUNNING);
626     }
627     thread_rwlock_unlock (&src->lock);
628 
629     thread_spin_lock (&relay_start_lock);
630     relays_connecting--;
631     thread_spin_unlock (&relay_start_lock);
632 
633     client->flags |= CLIENT_ACTIVE;
634     worker_wakeup (client->worker);
635     return NULL;
636 }
637 
638 
639 
_drop_relay(void * a)640 static int _drop_relay (void *a)
641 {
642     relay_server *r = (relay_server*)a;
643 
644     if (r->source)
645     {
646         client_t *client = r->source->client;
647         client->schedule_ms = 0;
648     }
649     r->flags &= ~RELAY_IN_LIST;
650     DEBUG2 ("dropped relay %s (%p)", r->localmount, r);
651     return 0;
652 }
653 
654 
_drop_relay_cleanup(void * a)655 static int _drop_relay_cleanup (void *a)
656 {
657     relay_server *r = (relay_server*)a;
658 
659     _drop_relay (a);
660     r->flags |= RELAY_CLEANUP;
661     return 0;
662 }
663 
664 
665 
detach_master_relay(const char * localmount,int cleanup)666 static void detach_master_relay (const char *localmount, int cleanup)
667 {
668     relay_server find;
669 
670     find.localmount = (char*)localmount;
671     avl_delete (global.relays, &find, cleanup ? _drop_relay_cleanup : _drop_relay);
672 }
673 
674 
675 
relay_has_source(relay_server * relay,client_t * client)676 int relay_has_source (relay_server *relay, client_t *client)
677 {
678     source_t *source = relay->source;
679     if (source)
680         thread_rwlock_wlock (&source->lock);
681     else
682     {
683         source = source_reserve (relay->localmount, 0);
684         if (source == NULL)
685             return 0;
686         relay->source = source;
687         source->client = client;
688         source->format->type = relay->type;
689     }
690     if (source_format_init (source) < 0)
691     {
692         detach_master_relay (relay->localmount, 1);
693         thread_rwlock_unlock (&source->lock);
694         return -1;
695     }
696     return 1;
697 }
698 
699 
relay_installed(relay_server * relay)700 static int relay_installed (relay_server *relay)
701 {
702     client_t *client = calloc (1, sizeof (client_t));
703 
704     connection_init (&client->connection, SOCK_ERROR, NULL);
705     switch (relay_has_source (relay, client))
706     {
707         case -1:
708             free (client);
709             return 0;
710         case 1:
711             thread_rwlock_unlock (&relay->source->lock);
712     }
713     global_lock();
714     client_register (client);
715     global_unlock();
716 
717     client->shared_data = relay;
718     client->ops = &relay_init_ops;
719     relay->flags |= RELAY_IN_LIST;
720     avl_insert (global.relays, relay);
721     client->flags |= CLIENT_ACTIVE;
722     client_add_worker (client);
723     DEBUG2 ("adding relay client for %s (%p)", relay->localmount, relay);
724     return 1;
725 }
726 
727 
728 #ifdef HAVE_CURL
create_master_relay(const char * local,const char * remote,format_type_t t,struct master_conn_details * master)729 static relay_server *create_master_relay (const char *local, const char *remote, format_type_t t, struct master_conn_details *master)
730 {
731     relay_server *relay;
732     relay_server_host *m;
733 
734     if (local[0] != '/')
735     {
736         WARN1 ("relay mountpoint \"%s\" does not start with /, skipping", local);
737         return NULL;
738     }
739     relay = calloc (1, sizeof (relay_server));
740 
741     m = calloc (1, sizeof (relay_server_host));
742     m->ip = (char *)xmlStrdup (XMLSTR(master->server));
743     m->port = master->port;
744     if (master->bind)
745         m->bind = (char *)xmlStrdup (XMLSTR(master->bind));
746     // may need to add the admin link later instead of assuming mount is as-is
747     m->mount = (char *)xmlStrdup (XMLSTR(remote));
748     m->timeout = 4;
749     relay->hosts = m;
750 
751     relay->localmount = (char *)xmlStrdup (XMLSTR(local));
752     relay->flags |= (RELAY_RUNNING | RELAY_ICY_META);
753     if (master->on_demand)
754         relay->flags |= RELAY_ON_DEMAND;
755     if (master->on_demand) relay->flags |= RELAY_ON_DEMAND;
756     relay->interval = master->max_interval;
757     relay->run_on = master->run_on;
758     if (master->send_auth)
759     {
760         relay->username = (char *)xmlStrdup (XMLSTR(master->username));
761         relay->password = (char *)xmlStrdup (XMLSTR(master->password));
762     }
763     relay->updated = master->synctime;
764     relay->flags |= RELAY_FROM_MASTER;
765     return relay;
766 }
767 
768 
add_master_relay(const char * mount,const char * type,struct master_conn_details * master)769 static int add_master_relay (const char *mount, const char *type, struct master_conn_details *master)
770 {
771     int ret = -1, notfound;
772     relay_server *result = NULL, find;
773 
774     if (strncmp (mount, "/admin/streams?mount=/", 22) == 0)
775         find.localmount = (char *)(mount+21);
776     else
777         find.localmount = (char *)mount;
778 
779     notfound = avl_get_by_key (global.relays, &find, (void*)&result);
780     if (notfound || (result->flags & RELAY_CLEANUP))
781     {
782         relay_server *new_relay = create_master_relay (find.localmount, mount, format_get_type (type), master);
783 
784         if (new_relay)
785         {
786             if (result && result->flags & RELAY_CLEANUP)
787             {
788                 // drop this now, to avoid a duplicate relay that may match later
789                 detach_master_relay (find.localmount, 0);
790             }
791             if (relay_installed (new_relay))
792                 ret = new_relay->source ? 2 : 1;
793             else
794             {
795                 config_clear_relay (new_relay);
796                 ret = 0;
797             }
798         }
799     }
800     else
801     {
802         if (notfound == 0)
803             result->updated = master->synctime; // avoid relay expiry
804         if (streamlist_check == 0)
805             INFO1 ("relay \"%s\" already in use, ignoring", mount);
806     }
807     return ret;
808 }
809 
810 
811 /* process a single HTTP header from streamlist response */
streamlist_header(void * ptr,size_t size,size_t nmemb,void * stream)812 static size_t streamlist_header (void *ptr, size_t size, size_t nmemb, void *stream)
813 {
814     size_t passed_len = size*nmemb;
815     char *eol = memchr (ptr, '\r', passed_len);
816     struct master_conn_details *master = stream;
817 
818     /* drop EOL chars if any */
819     if (eol)
820         *eol = '\0';
821     else
822     {
823         eol = memchr (ptr, '\n', passed_len);
824         if (eol)
825             *eol = '\0';
826         else
827             return -1;
828     }
829     if (strncmp (ptr, "HTTP", 4) == 0)
830     {
831         int respcode = 0;
832         if (sscanf (ptr, "HTTP%*s %d OK", &respcode) == 1 && respcode == 200)
833             master->ok = 1;  // needed if resetting master relays ???
834         else
835             WARN1 ("Failed response from master \"%s\"", (char*)ptr);
836     }
837     //DEBUG1 ("header is %s", ptr);
838     return passed_len;
839 }
840 
841 
842 /* process mountpoint list from master server. This may be called multiple
843  * times so watch for the last line in this block as it may be incomplete
844  */
streamlist_data(void * ptr,size_t size,size_t nmemb,void * stream)845 static size_t streamlist_data (void *ptr, size_t size, size_t nmemb, void *stream)
846 {
847     struct master_conn_details *master = stream;
848     size_t passed_len = size*nmemb;
849     size_t len = passed_len;
850     char *buffer = ptr, *buf = ptr;
851     int prev = 0;
852 
853     if (master->ok == 0)
854         return passed_len;
855     if (master->previous)
856     {
857         char *eol = memchr (ptr, '\n', passed_len < 150 ? passed_len : 150);
858         if (eol == NULL)
859         {
860             if (passed_len > 150 || master->previous > 200)
861             {
862                 WARN1 ("long line received for append, ignoring %ld", (long)passed_len);
863                 return (master->ok = 0);
864             }
865             buffer = realloc (master->buffer, len + 1);
866             if (buffer == NULL) return 0;
867             master->buffer = buffer;
868             memcpy (master->buffer + master->previous, ptr, passed_len);
869             master->buffer [len] = '\0';
870             master->previous = len;
871             return passed_len;
872         }
873         // just fill out enough for 1 entry
874         len = (eol - buffer) + 1 + master->previous;
875         buffer = realloc (master->buffer, len + 1);
876         if (buffer == NULL) return 0;
877         master->buffer = buffer;
878         prev = len - master->previous;
879         memcpy (buffer+master->previous, ptr, prev);
880         buffer [len] = '\0';
881         buf = buffer;
882     }
883 
884     avl_tree_wlock (global.relays);
885     while (len)
886     {
887         int offset;
888         char *eol = strchr (buf, '\n');
889         if (eol)
890         {
891             offset = (eol - buf) + 1;
892             *eol = '\0';
893             eol = strchr (buf, '\r');
894             if (eol) *eol = '\0';
895         }
896         else
897         {
898             /* incomplete line, the rest may be in the next read */
899             master->buffer = calloc (1, len + 1);
900             memcpy (master->buffer, buf, len);
901             master->previous = len;
902             break;
903         }
904 
905         if (*buf == '/')
906         {
907             DEBUG1 ("read from master \"%s\"", buf);
908             add_master_relay (buf, NULL, master);
909         }
910         else
911             DEBUG1 ("skipping \"%s\"", buf);
912         buf += offset;
913         len -= offset;
914         if (len == 0 && prev)
915         {
916             buf = ptr + prev;
917             len =  passed_len - prev;
918             free (master->buffer);
919             master->buffer = NULL;
920             master->previous = 0;
921             prev = 0;
922         }
923     }
924     avl_tree_unlock (global.relays);
925     return passed_len;
926 }
927 
928 
929 /* retrieve streamlist from master server. The streamlist can be retrieved
930  * from an SSL port if curl is capable and the config is aware of the port
931  * to use
932  */
streamlist_thread(void * arg)933 static void *streamlist_thread (void *arg)
934 {
935     struct master_conn_details *master = arg;
936     CURL *handle;
937     const char *protocol = "http";
938     int port = master->port;
939     char error [CURL_ERROR_SIZE];
940     char url [1024], auth [100];
941 
942     DEBUG0 ("checking master stream list");
943     if (master->ssl_port)
944     {
945         protocol = "https";
946         port = master->ssl_port;
947     }
948     snprintf (auth, sizeof (auth), "%s:%s", master->username, master->password);
949     snprintf (url, sizeof (url), "%s://%s:%d/admin/streams%s",
950             protocol, master->server, port, master->args);
951     handle = curl_easy_init ();
952     curl_easy_setopt (handle, CURLOPT_USERAGENT, master->server_id);
953     curl_easy_setopt (handle, CURLOPT_URL, url);
954     curl_easy_setopt (handle, CURLOPT_HEADERFUNCTION, streamlist_header);
955     curl_easy_setopt (handle, CURLOPT_HEADERDATA, master);
956     curl_easy_setopt (handle, CURLOPT_WRITEFUNCTION, streamlist_data);
957     curl_easy_setopt (handle, CURLOPT_WRITEDATA, master);
958     curl_easy_setopt (handle, CURLOPT_USERPWD, auth);
959     curl_easy_setopt (handle, CURLOPT_ERRORBUFFER, error);
960     curl_easy_setopt (handle, CURLOPT_SSL_VERIFYPEER, 0L);
961     curl_easy_setopt (handle, CURLOPT_NOSIGNAL, 1L);
962     curl_easy_setopt (handle, CURLOPT_CONNECTTIMEOUT, 5L);
963     curl_easy_setopt (handle, CURLOPT_TIMEOUT, 120L);
964     if (master->bind)
965         curl_easy_setopt (handle, CURLOPT_INTERFACE, master->bind);
966 
967     master->ok = 0;
968     master->synctime = time(NULL);
969     if (curl_easy_perform (handle) != 0 || master->ok == 0)
970     {
971         /* fall back to traditional request */
972         INFO0 ("/admin/streams failed trying streamlist");
973         snprintf (url, sizeof (url), "%s://%s:%d/admin/streamlist.txt%s",
974                 protocol, master->server, port, master->args);
975         curl_easy_setopt (handle, CURLOPT_URL, url);
976         if (curl_easy_perform (handle) != 0)
977             WARN2 ("Failed URL access \"%s\" (%s)", url, error);
978     }
979     if (master->ok)
980         relay_barrier_master = master->synctime;
981 
982     curl_easy_cleanup (handle);
983     free (master->server);
984     free (master->username);
985     free (master->password);
986     free (master->buffer);
987     free (master->server_id);
988     free (master->args);
989     free (master);
990     streamlister = 0;
991     return NULL;
992 }
993 #endif
994 
995 
update_relays(ice_config_t * config)996 void update_relays (ice_config_t *config)
997 {
998     int notfound, trap = 10;
999     relay_server *relay, *result, *copy, find;
1000     time_t sync_time = time (NULL);
1001 
1002     avl_tree_wlock (global.relays);
1003     relay = config->relays;
1004     while (relay)
1005     {
1006         find.localmount = relay->localmount;
1007         notfound = avl_get_by_key (global.relays, &find, (void*)&result);
1008         if (notfound)
1009         {
1010             relay_server *new_relay = relay_copy (relay);
1011             if (new_relay)
1012             {
1013                 new_relay->updated = sync_time;
1014                 if (! relay_installed (new_relay))
1015                     config_clear_relay (new_relay);
1016             }
1017         }
1018         else
1019         {
1020             detach_master_relay (find.localmount, 0); // drop current one from tree
1021             if (result->flags & RELAY_CLEANUP)
1022             {
1023                 // should be rare but a relay could be leaving
1024                 DEBUG1 ("old relay with cleanup flagged detected %s", result->localmount);
1025                 if (--trap)
1026                     continue;
1027                 WARN1 ("Detected loop with lookup of %s", find.localmount);
1028                 break;
1029             }
1030             if (result->source == NULL)
1031             {
1032                 INFO1 ("current relay %s not initialised, removed", result->localmount);
1033                 continue;
1034             }
1035             copy = relay_copy (relay);
1036             DEBUG2 ("adding new relay %s (%p) into tree", relay->localmount, copy);
1037             // let client trigger the switchover for new details
1038             result->new_details = copy;
1039             copy->updated = sync_time;
1040             copy->flags |= RELAY_IN_LIST;
1041             avl_insert (global.relays, copy);
1042         }
1043         trap = 10;
1044         relay = relay->new_details;
1045     }
1046     relay_barrier_xml = sync_time;
1047     avl_tree_unlock (global.relays);
1048 }
1049 
1050 
update_from_master(ice_config_t * config)1051 static void update_from_master (ice_config_t *config)
1052 {
1053 #ifdef HAVE_CURL
1054     struct master_conn_details *details;
1055 
1056     if (config->master_password == NULL || config->master_server == NULL ||
1057             config->master_server_port == 0)
1058         return;
1059     if (streamlister) return;
1060     streamlister = 1;
1061     details = calloc (1, sizeof (*details));
1062     details->server = strdup (config->master_server);
1063     details->port = config->master_server_port;
1064     details->ssl_port = config->master_ssl_port;
1065     details->username = strdup (config->master_username);
1066     details->password = strdup (config->master_password);
1067     details->send_auth = config->master_relay_auth;
1068     details->bind = (config->master_bind) ? strdup (config->master_bind) : NULL;
1069     details->on_demand = config->on_demand;
1070     details->server_id = strdup (config->server_id);
1071     details->max_interval = config->master_relay_retry;
1072     details->run_on = config->master_run_on;
1073     if (config->master_redirect)
1074     {
1075         details->args = malloc (4096);
1076         snprintf (details->args, 4096, "?rserver=%s&rport=%d&interval=%d",
1077                 config->hostname, config->port, config->master_update_interval);
1078     }
1079     else
1080         details->args = strdup ("");
1081 
1082     thread_create ("streamlist", streamlist_thread, details, THREAD_DETACHED);
1083 #endif
1084 }
1085 
1086 
update_master_as_slave(ice_config_t * config)1087 static void update_master_as_slave (ice_config_t *config)
1088 {
1089     redirect_host *redirect;
1090 
1091     if (config->master_server == NULL || config->master_redirect == 0 || config->max_redirects == 0)
1092          return;
1093 
1094     thread_rwlock_wlock (&slaves_lock);
1095     redirect = find_slave_host (config->master_server, config->master_server_port);
1096     if (redirect == NULL)
1097     {
1098         INFO2 ("adding master %s:%d", config->master_server, config->master_server_port);
1099         redirector_add (config->master_server, config->master_server_port, 0);
1100     }
1101     else
1102         redirect->next_update += config->master_update_interval;
1103     thread_rwlock_unlock (&slaves_lock);
1104 }
1105 
1106 
slave_startup(void)1107 static void slave_startup (void)
1108 {
1109     ice_config_t *config = config_get_config();
1110 
1111 #ifdef HAVE_GETRLIMIT
1112     struct rlimit rlimit;
1113     if (getrlimit (RLIMIT_NOFILE, &rlimit) == 0)
1114     {
1115         if (rlimit.rlim_cur < rlimit.rlim_max)
1116         {
1117             long old = rlimit.rlim_cur;
1118             rlimit.rlim_cur = rlimit.rlim_max;
1119             if (setrlimit (RLIMIT_NOFILE, &rlimit) < 0)
1120                 rlimit.rlim_cur = old;
1121         }
1122         WARN1 ("process has %ld max file descriptor limit", (long)rlimit.rlim_cur);
1123     }
1124     if (getrlimit (RLIMIT_CORE, &rlimit) == 0)
1125     {
1126         if (rlimit.rlim_cur < rlimit.rlim_max)
1127         {
1128             rlimit.rlim_cur = rlimit.rlim_max;
1129             setrlimit (RLIMIT_CORE, &rlimit);
1130         }
1131     }
1132 #endif
1133 
1134     update_settings = 0;
1135     update_all_sources = 0;
1136 
1137     redirector_setup (config);
1138     stats_global (config);
1139     workers_adjust (config->workers_count);
1140     yp_initialize (config);
1141     update_relays (config);
1142     config_release_config();
1143 
1144     source_recheck_mounts (1);
1145     connection_thread_startup();
1146 }
1147 
_slave_thread(void)1148 static void _slave_thread(void)
1149 {
1150     slave_startup();
1151 
1152     while (1)
1153     {
1154         struct timespec current;
1155         int do_reread = 0;
1156 
1157         thread_get_timespec (&current);
1158 
1159         global_lock();
1160         if (global.running != ICE_RUNNING)
1161             break;
1162         /* re-read xml file if requested */
1163         if (global . schedule_config_reread)
1164         {
1165             global . schedule_config_reread = 0;
1166             do_reread = 1;
1167         }
1168 
1169         if (global.new_connections_slowdown)
1170             global.new_connections_slowdown--;
1171         if (global.new_connections_slowdown > 30)
1172             global.new_connections_slowdown = 30;
1173 
1174         global_unlock();
1175 
1176         global_add_bitrates (global.out_bitrate, 0L, THREAD_TIME_MS(&current));
1177         if (do_reread)
1178             event_config_read ();
1179 
1180         if (streamlist_check <= current.tv_sec)
1181         {
1182             ice_config_t *config = config_get_config();
1183 
1184             streamlist_check = current.tv_sec + config->master_update_interval;
1185             update_master_as_slave (config);
1186 
1187             update_from_master (config);
1188 
1189             config_release_config();
1190         }
1191 
1192         int update = 0, update_all = 0, restart = 0;
1193         thread_spin_lock (&relay_start_lock);
1194         if (update_settings)
1195         {
1196             update = update_settings;
1197             update_all = update_all_sources;
1198             if (update_all_sources || current.tv_sec%5 == 0)
1199             {
1200                 update_settings = 0;
1201                 update_all_sources = 0;
1202             }
1203             if (restart_connection_thread)
1204             {
1205                 restart = restart_connection_thread;
1206                 restart_connection_thread = 0;
1207             }
1208         }
1209         thread_spin_unlock (&relay_start_lock);
1210 
1211         if (update)
1212             source_recheck_mounts (update_all);
1213         if (restart)
1214         {
1215             connection_thread_shutdown();
1216             connection_thread_startup();
1217         }
1218 
1219         stats_global_calc (current.tv_sec);
1220         fserve_scan (current.tv_sec);
1221 
1222         /* allow for terminating icecast if no streams running */
1223         if (inactivity_timer)
1224         {
1225             if (global.sources)
1226             {
1227                 inactivity_timer = 0;
1228                 INFO0 ("inactivity timeout cancelled");
1229             }
1230             else if (inactivity_timer <= current.tv_sec)
1231             {
1232                 INFO0 ("inactivity timeout reached, terminating server");
1233                 global.running = ICE_HALTING;
1234             }
1235         }
1236         else
1237         {
1238             if (inactivity_timeout && global.sources == 0)
1239             {
1240                 inactivity_timer = current.tv_sec + inactivity_timeout;
1241                 INFO1 ("inactivity timeout started, terminate in %d seconds", inactivity_timeout);
1242             }
1243         }
1244         worker_balance_trigger (current.tv_sec);
1245         thread_sleep (1000000);
1246     }
1247     global_unlock();
1248     connection_thread_shutdown();
1249     fserve_running = 0;
1250     stats_clients_wakeup ();
1251     INFO0 ("shutting down current relays");
1252     time_t next = time(NULL) + 1000;
1253     thread_spin_lock (&relay_start_lock);
1254     relay_barrier_xml = next;
1255     relay_barrier_master = relay_barrier_xml;
1256     thread_spin_unlock (&relay_start_lock);
1257     redirector_clearall();
1258 
1259     INFO0 ("Slave thread shutdown complete");
1260 }
1261 
1262 
slave_find_relay(const char * mount)1263 relay_server *slave_find_relay (const char *mount)
1264 {
1265     relay_server *result, find;
1266 
1267     find.localmount = (char*)mount;
1268     if (avl_get_by_key (global.relays, &find, (void*)&result))
1269         result = NULL;
1270     return result;
1271 }
1272 
1273 
1274 
1275 /* drop all redirection details.
1276  */
redirector_clearall(void)1277 void redirector_clearall (void)
1278 {
1279     thread_rwlock_wlock (&slaves_lock);
1280     while (redirectors)
1281     {
1282         redirect_host *current = redirectors;
1283         redirectors = current->next;
1284         INFO2 ("removing %s:%d", current->server, current->port);
1285         free (current->server);
1286         free (current);
1287     }
1288     global.redirect_count = 0;
1289     thread_rwlock_unlock (&slaves_lock);
1290 }
1291 
1292 
redirector_setup(ice_config_t * config)1293 void redirector_setup (ice_config_t *config)
1294 {
1295     redirect_host *redir = config->redirect_hosts;
1296 
1297     thread_rwlock_wlock (&slaves_lock);
1298     while (redir)
1299     {
1300         redirector_add (redir->server, redir->port, 0);
1301         redir = redir->next;
1302     }
1303     thread_rwlock_unlock (&slaves_lock);
1304 
1305     inactivity_timeout = config->inactivity_timeout;
1306     inactivity_timer = 0;
1307 }
1308 
1309 
1310 /* Add new redirectors or update any existing ones
1311  */
redirector_update(client_t * client)1312 void redirector_update (client_t *client)
1313 {
1314     redirect_host *redirect;
1315     const char *rserver = httpp_get_query_param (client->parser, "rserver");
1316     const char *value;
1317     int rport = 0, interval = 0;
1318 
1319     if (rserver==NULL) return;
1320     value = httpp_get_query_param (client->parser, "rport");
1321     if (value == NULL) return;
1322     rport = atoi (value);
1323     if (rport <= 0) return;
1324     value = httpp_get_query_param (client->parser, "interval");
1325     if (value == NULL) return;
1326     interval = atoi (value);
1327     if (interval < 5) return;
1328 
1329     thread_rwlock_wlock (&slaves_lock);
1330     redirect = find_slave_host (rserver, rport);
1331     if (redirect == NULL)
1332     {
1333         ice_config_t *config = config_get_config();
1334         unsigned int allowed = config->max_redirects;
1335 
1336         config_release_config();
1337 
1338         if (global.redirect_count < allowed)
1339             redirector_add (rserver, rport, interval);
1340         else
1341             INFO2 ("redirect to slave limit reached (%d, %d)", global.redirect_count, allowed);
1342     }
1343     else
1344     {
1345         DEBUG2 ("touch update on %s:%d", redirect->server, redirect->port);
1346         redirect->next_update = time(NULL) + interval;
1347     }
1348     thread_rwlock_unlock (&slaves_lock);
1349 }
1350 
1351 
1352 
1353 /* search list of redirectors for a matching entry, lock must be held before
1354  * invoking this function
1355  */
find_slave_host(const char * server,int port)1356 static redirect_host *find_slave_host (const char *server, int port)
1357 {
1358     redirect_host *redirect = redirectors;
1359     while (redirect)
1360     {
1361         if (strcmp (redirect->server, server) == 0 && redirect->port == port)
1362             break;
1363         redirect = redirect->next;
1364     }
1365     return redirect;
1366 }
1367 
1368 
redirector_add(const char * server,int port,int interval)1369 static void redirector_add (const char *server, int port, int interval)
1370 {
1371     redirect_host *redirect = calloc (1, sizeof (redirect_host));
1372     if (redirect == NULL)
1373         abort();
1374     redirect->server = strdup (server);
1375     redirect->port = port;
1376     if (interval == 0)
1377         redirect->next_update = (time_t)0;
1378     else
1379         redirect->next_update = time(NULL) + interval;
1380     redirect->next = redirectors;
1381     redirectors = redirect;
1382     global.redirect_count++;
1383     INFO3 ("slave (%d) at %s:%d added", global.redirect_count,
1384             redirect->server, redirect->port);
1385 }
1386 
1387 
1388 
relay_expired(relay_server * relay)1389 static int relay_expired (relay_server *relay)
1390 {
1391     thread_spin_lock (&relay_start_lock);
1392     time_t t = (relay->flags & RELAY_FROM_MASTER) ? relay_barrier_master : relay_barrier_xml;
1393     thread_spin_unlock (&relay_start_lock);
1394 
1395     return (relay->updated < t) ? 1 : 0;
1396 }
1397 
1398 
get_relay_details(client_t * client)1399 static relay_server *get_relay_details (client_t *client)
1400 {
1401     relay_server *relay = client->shared_data;
1402 
1403     avl_tree_rlock (global.relays);
1404     if (relay->new_details)
1405     {
1406         relay_server *old_details = relay;
1407 
1408         INFO1 ("Detected change in relay details for %s", relay->localmount);
1409         client->shared_data = relay->new_details;
1410         relay = client->shared_data;
1411         relay->source = old_details->source;
1412         old_details->source = NULL;
1413         config_clear_relay (old_details);
1414     }
1415     if (relay_expired (relay))
1416     {
1417         DEBUG1 ("relay expired %s", relay->localmount);
1418         relay->flags |= RELAY_CLEANUP;
1419     }
1420     avl_tree_unlock (global.relays);
1421     if (relay->flags & RELAY_CLEANUP)
1422         relay->flags &= ~RELAY_RUNNING;
1423     return relay;
1424 }
1425 
1426 
relay_reset(relay_server * relay)1427 static void relay_reset (relay_server *relay)
1428 {
1429     relay_server_host *server = relay->hosts;
1430 
1431     for (; server; server = server->next)
1432        server->skip = 0;
1433     INFO1 ("servers to be retried on %s", relay->localmount);
1434 }
1435 
1436 
relay_read(client_t * client)1437 static int relay_read (client_t *client)
1438 {
1439     relay_server *relay = get_relay_details (client);
1440     source_t *source = relay->source;
1441 
1442     thread_rwlock_wlock (&source->lock);
1443     if (source_running (source))
1444     {
1445         if ((relay->flags & RELAY_RUNNING) == 0)
1446             source->flags &= ~SOURCE_RUNNING;
1447         if (source->listeners == 0 && (relay->flags & RELAY_ON_DEMAND))
1448         {
1449             if (client->connection.discon.time == 0)
1450                 client->connection.discon.time = client->worker->current_time.tv_sec + relay->run_on;
1451 
1452             if (client->worker->current_time.tv_sec > client->connection.discon.time)
1453                 source->flags &= ~SOURCE_RUNNING;
1454         }
1455         if (source_read (source) > 0)
1456             return 1;
1457         if (source_running (source))
1458         {
1459             thread_rwlock_unlock (&source->lock);
1460             return 0;
1461         }
1462     }
1463     if ((source->flags & SOURCE_TERMINATING) == 0)
1464     {
1465         /* this section is for once through code */
1466         int fallback = global.running == ICE_RUNNING ? 1 : 0;
1467         if (client->connection.con_time && global.running == ICE_RUNNING)
1468         {
1469             if ((relay->flags & RELAY_RUNNING) && relay->in_use)
1470                 fallback = 0;
1471             if ((relay->flags & RELAY_ON_DEMAND) == 0 &&
1472                     client->worker->current_time.tv_sec - client->connection.con_time < 60)
1473             {
1474                 /* force a server skip if a stream cannot be maintained for 1 min */
1475                 WARN1 ("stream for %s died too quickly, skipping server for now", relay->localmount);
1476                 if (relay->in_use) relay->in_use->skip = 1;
1477             }
1478             else
1479             {
1480                 if (client->connection.sent_bytes < 500000 && source->flags & SOURCE_TIMEOUT)
1481                 {
1482                     WARN1 ("stream for %s timed out, skipping server for now", relay->localmount);
1483                     if (relay->in_use) relay->in_use->skip = 1;
1484                 }
1485                 else
1486                     relay_reset (relay); // spent some time on this so give other servers a chance
1487             }
1488         }
1489         /* don't pause listeners if relay shutting down */
1490         if ((relay->flags & RELAY_RUNNING) == 0)
1491             source->flags &= ~SOURCE_PAUSE_LISTENERS;
1492         // fallback listeners unless relay is to be retried
1493         INFO2 ("fallback on %s %sattempted", source->mount, fallback ? "" : "not ");
1494         source_shutdown (source, fallback);
1495     }
1496     if (source->termination_count && source->termination_count <= source->listeners)
1497     {
1498         client->schedule_ms = client->worker->time_ms + 150;
1499         if (client->timer_start + 1500 < client->worker->time_ms)
1500         {
1501             WARN2 ("%ld listeners still to process in terminating %s", source->termination_count, source->mount);
1502             source->flags &= ~SOURCE_TERMINATING;
1503         }
1504         else
1505             DEBUG3 ("%s waiting (%lu, %lu)", source->mount, source->termination_count, source->listeners);
1506         thread_rwlock_unlock (&source->lock);
1507         return 0;
1508     }
1509     DEBUG1 ("all listeners have now been checked on %s", relay->localmount);
1510     if (client->connection.con_time)
1511     {
1512         global_lock();
1513         global.sources--;
1514         stats_event_args (NULL, "sources", "%d", global.sources);
1515         global_unlock();
1516         global_reduce_bitrate_sampling (global.out_bitrate);
1517     }
1518     client->timer_start = 0;
1519     client->parser = NULL;
1520     free (source->fallback.mount);
1521     source->fallback.mount = NULL;
1522     source->flags &= ~(SOURCE_TERMINATING|SOURCE_LISTENERS_SYNC|SOURCE_ON_DEMAND);
1523     if (relay->flags & RELAY_CLEANUP)
1524     {
1525         connection_close (&client->connection);
1526         if (source->listeners)
1527         {
1528             INFO1 ("listeners on terminating relay %s, rechecking", relay->localmount);
1529             client->timer_start = client->worker->time_ms;
1530             source->termination_count = source->listeners;
1531             source->flags &= ~SOURCE_PAUSE_LISTENERS;
1532             source->flags |= SOURCE_LISTENERS_SYNC;
1533             source_listeners_wakeup (source);
1534             thread_rwlock_unlock (&source->lock);
1535             return 0; /* listeners may be paused, recheck and let them leave this stream */
1536         }
1537         INFO1 ("shutting down relay %s", relay->localmount);
1538         if (relay->flags & RELAY_IN_LIST)
1539         {
1540             avl_tree_wlock (global.relays);
1541             detach_master_relay (relay->localmount, 1);
1542             avl_tree_unlock (global.relays);
1543         }
1544         stats_lock (source->stats, NULL);
1545         stats_set_args (source->stats, "listeners", "%lu", source->listeners);
1546         stats_set (source->stats, NULL, NULL);
1547         source->stats = 0;
1548         thread_rwlock_unlock (&source->lock);
1549         slave_update_mounts();
1550         return -1;
1551     }
1552     client->ops = &relay_init_ops;
1553     do {
1554         if (relay->flags & RELAY_RUNNING)
1555         {
1556             if (client->connection.con_time && relay->in_use)
1557             {
1558                 INFO1 ("standing by to restart relay on %s", relay->localmount);
1559                 stats_flush (source->stats);
1560                 if (relay->flags & RELAY_ON_DEMAND && source->listeners == 0 && relay->in_use->next)
1561                 {
1562                     source_clear_source (relay->source);
1563                     relay_reset (relay);
1564                 }
1565                 break;
1566             }
1567             if (relay->interval < 3)
1568                 relay->interval = 60; // if set too low then give a decent retry delay
1569             client->schedule_ms = client->worker->time_ms + (relay->interval * 1000);
1570             INFO2 ("standing by to restart relay on %s in %d seconds", relay->localmount, relay->interval);
1571         }
1572         else
1573         {
1574             INFO1 ("Relay %s is disabled", relay->localmount);
1575             client->schedule_ms = client->worker->time_ms + 3600000;
1576         }
1577         stats_lock (source->stats, NULL);
1578         stats_set_args (source->stats, "listeners", "%lu", source->listeners);
1579         source_clear_source (relay->source);
1580         relay_reset (relay);
1581         stats_set (source->stats, NULL, NULL);
1582         source->stats = 0;
1583         slave_update_mounts();
1584     } while (0);
1585 
1586     thread_rwlock_unlock (&source->lock);
1587     connection_close (&client->connection);
1588     return 0;
1589 }
1590 
1591 
relay_release(client_t * client)1592 static void relay_release (client_t *client)
1593 {
1594     relay_server *relay = client->shared_data;
1595     DEBUG2("freeing relay %s (%p)", relay->localmount, relay);
1596     if (relay->source)
1597         source_free_source (relay->source);
1598     relay->source = NULL;
1599     config_clear_relay (relay);
1600     client_destroy (client);
1601 }
1602 
1603 
1604 
1605 // This is a special case one, to act as a once through, to get the source reserved and stat initialised
relay_initialise(client_t * client)1606 static int relay_initialise (client_t *client)
1607 {
1608     relay_server *relay = get_relay_details (client);
1609     int rc = relay_has_source (relay, client);
1610     source_t *source = relay->source;
1611 
1612     if (rc < 0)  return -1;
1613     if (rc == 0)  // in cases where relay was added ok but source in use, should be rare
1614     {
1615         WARN1 ("relay for \"%s\" cannot get started, mountpoint in use, waiting", relay->localmount);
1616         client->schedule_ms = client->worker->time_ms + 120000;
1617         return 0;
1618     }
1619     do
1620     {
1621         if (relay->flags & RELAY_RUNNING)
1622         {
1623             if (relay->flags & RELAY_ON_DEMAND)
1624             {
1625                 ice_config_t *config;
1626                 mount_proxy *mountinfo;
1627 
1628                 source_clear_source (source);
1629                 config = config_get_config();
1630                 mountinfo = config_find_mount (config, source->mount);
1631                 source->flags |= SOURCE_ON_DEMAND;
1632                 if (source->stats == 0)
1633                 {
1634                     source->stats = stats_lock (source->stats, source->mount);
1635                     stats_release (source->stats);
1636                 }
1637                 source_update_settings (config, source, mountinfo);
1638                 config_release_config();
1639                 slave_update_mounts();
1640                 stats_set_flags (source->stats, "listener_connections", "0", STATS_COUNTERS);
1641             }
1642             break;
1643         }
1644         thread_rwlock_unlock (&source->lock);
1645         if (relay->flags & RELAY_CLEANUP)
1646             return relay_read (client);
1647         client->schedule_ms = client->worker->time_ms + 1000000;
1648         return 0;
1649     } while(0);
1650     thread_rwlock_unlock (&source->lock);
1651     client->ops = &relay_startup_ops;
1652     return client->ops->process (client);
1653 }
1654 
1655 
relay_startup(client_t * client)1656 static int relay_startup (client_t *client)
1657 {
1658     relay_server *relay = get_relay_details (client);
1659     worker_t *worker = client->worker;
1660 
1661     if ((relay->flags & RELAY_RUNNING) == 0)
1662     {
1663         if (relay->source == NULL) { WARN1 ("odd case for %s", relay->localmount); return -1; }
1664         client->ops = &relay_client_ops;
1665         client->schedule_ms = worker->time_ms + 10;
1666         DEBUG1 ("relay %s disabled", relay->localmount);
1667         return client->ops->process (client);
1668     }
1669     global_lock();
1670     if (global.running != ICE_RUNNING)  /* wait for cleanup */
1671     {
1672         global_unlock();
1673         client->schedule_ms = client->worker->time_ms + 50;
1674         return 0;
1675     }
1676     global_unlock();
1677     if (worker->move_allocations)
1678     {
1679         int ret = 0;
1680         worker_t *dest_worker;
1681 
1682         thread_rwlock_rlock (&workers_lock);
1683         dest_worker = worker_selected ();
1684         if (dest_worker != worker)
1685         {
1686             long diff = worker->count - dest_worker->count;
1687             if (diff > 5)
1688             {
1689                 worker->move_allocations--;
1690                 ret = client_change_worker (client, dest_worker);
1691             }
1692         }
1693         thread_rwlock_unlock (&workers_lock);
1694         if (ret)
1695             return ret;
1696     }
1697 
1698     if (relay->flags & RELAY_ON_DEMAND)
1699     {
1700         source_t *source = relay->source;
1701         int start_relay;
1702         mount_proxy *mountinfo;
1703 
1704         thread_rwlock_wlock (&source->lock);
1705         start_relay = source->listeners; // 0 or non-zero
1706         source->flags |= SOURCE_ON_DEMAND;
1707         thread_rwlock_unlock (&source->lock);
1708         mountinfo = config_find_mount (config_get_config(), source->mount);
1709 
1710         if (mountinfo && mountinfo->fallback_mount)
1711         {
1712             avl_tree_rlock (global.source_tree);
1713             if (fallback_count (config_get_config_unlocked(), mountinfo->fallback_mount) > 0)
1714                 start_relay = 1;
1715             avl_tree_unlock (global.source_tree);
1716         }
1717         config_release_config();
1718         if (start_relay == 0)
1719         {
1720             if (source->stats == 0)
1721             {
1722                 source->stats = stats_lock (source->stats, source->mount);
1723                 stats_release (source->stats);
1724                 slave_update_mounts();
1725             }
1726             client->schedule_ms = (worker->time_ms + 1000) | 0xff;
1727             return 0;
1728         }
1729         INFO1 ("starting on-demand relay %s", relay->localmount);
1730     }
1731 
1732     /* limit the number of relays starting up at the same time */
1733     thread_spin_lock (&relay_start_lock);
1734     if (relays_connecting > 3)
1735     {
1736         thread_spin_unlock (&relay_start_lock);
1737         client->schedule_ms = worker->time_ms + 200;
1738         if (global.new_connections_slowdown < 5)
1739             global.new_connections_slowdown++;
1740         return 0;
1741     }
1742     relays_connecting++;
1743     thread_spin_unlock (&relay_start_lock);
1744 
1745     client->flags &= ~CLIENT_ACTIVE;
1746     thread_create ("Relay Thread", start_relay_stream, client, THREAD_DETACHED);
1747     return 0;
1748 }
1749 
1750 
fallback_count(ice_config_t * config,const char * mount)1751 int fallback_count (ice_config_t *config, const char *mount)
1752 {
1753     int count = -1, loop = 10;
1754     const char *m = mount;
1755     char buffer[4096];
1756 
1757     if (mount == NULL) return -1;
1758     if (strstr (mount, "${")) return -1;
1759     while (m && loop--)
1760     {
1761         source_t *fallback = source_find_mount_raw (m);
1762         if (fallback == NULL || source_running (fallback) == 0)
1763         {
1764             unsigned int len;
1765             mount_proxy *mountinfo = config_find_mount (config, m);
1766             if (fallback == NULL)
1767             {
1768                 fbinfo finfo;
1769 
1770                 memset (&finfo, 0, sizeof (finfo));
1771                 finfo.flags = FS_FALLBACK;
1772                 finfo.mount = (char *)m;
1773                 finfo.fallback = NULL;
1774                 finfo.limit = mountinfo ? mountinfo->limit_rate/8 : 0;
1775                 if (finfo.limit == 0)
1776                 {
1777                     unsigned int rate;
1778                     if (sscanf (m, "%*[^[][%u]", &rate) == 1)
1779                        finfo.limit = rate * 1000 / 8;
1780                 }
1781                 count = fserve_query_count (&finfo);
1782             }
1783             if (mountinfo == NULL)
1784                 break;
1785             len = sizeof buffer;
1786             if (util_expand_pattern (m, mountinfo->fallback_mount, buffer, &len) < 0)
1787                 break;
1788             m = buffer;
1789             continue;
1790         }
1791         count = fallback->listeners;
1792         break;
1793     }
1794     return count;
1795 }
1796