1 /* Icecast
2 *
3 * This program is distributed under the GNU General Public License, version 2.
4 * A copy of this license is included with this source.
5 *
6 * Copyright 2000-2004, Jack Moffitt <jack@xiph.org,
7 * Michael Smith <msmith@xiph.org>,
8 * oddsock <oddsock@xiph.org>,
9 * Karl Heyes <karl@xiph.org>
10 * and others (see AUTHORS for details).
11 */
12
13 /* -*- c-basic-offset: 4; indent-tabs-mode: nil; -*- */
14 /* slave.c
15 * by Ciaran Anscomb <ciaran.anscomb@6809.org.uk>
16 *
17 * Periodically requests a list of streams from a master server
18 * and creates source threads for any it doesn't already have.
19 * */
20
21 #ifdef HAVE_CONFIG_H
22 #include <config.h>
23 #endif
24
25 #ifdef _WIN32
26 #include <winsock2.h>
27 #endif
28
29 #include <stdio.h>
30 #include <stdlib.h>
31 #include <string.h>
32 #include <sys/types.h>
33
34 #ifdef HAVE_SYS_SOCKET_H
35 #include <sys/socket.h>
36 #endif
37 #ifdef HAVE_NETINET_IN_H
38 #include <netinet/in.h>
39 #endif
40 #ifdef HAVE_CURL
41 #include <curl/curl.h>
42 #endif
43 #ifdef HAVE_GETRLIMIT
44 #include <sys/resource.h>
45 #endif
46
47 #include "compat.h"
48
49 #include "timing/timing.h"
50 #include "thread/thread.h"
51 #include "avl/avl.h"
52 #include "net/sock.h"
53 #include "httpp/httpp.h"
54
55 #include "cfgfile.h"
56 #include "global.h"
57 #include "util.h"
58 #include "connection.h"
59 #include "refbuf.h"
60 #include "client.h"
61 #include "stats.h"
62 #include "logging.h"
63 #include "source.h"
64 #include "format.h"
65 #include "event.h"
66 #include "yp.h"
67 #include "slave.h"
68
69 #define CATMODULE "slave"
70
71 #ifdef HAVE_CURL
72 struct master_conn_details
73 {
74 char *server;
75 int port;
76 int ssl_port;
77 int send_auth;
78 int on_demand;
79 int previous;
80 int ok;
81 int max_interval;
82 int run_on;
83 time_t synctime;
84 char *buffer;
85 char *username;
86 char *password;
87 char *bind;
88 char *server_id;
89 char *args;
90 };
91 #endif
92
93
94 static void _slave_thread(void);
95 static void redirector_add (const char *server, int port, int interval);
96 static redirect_host *find_slave_host (const char *server, int port);
97 static int relay_startup (client_t *client);
98 static int relay_initialise (client_t *client);
99 static int relay_read (client_t *client);
100 static void relay_release (client_t *client);
101
102 int slave_running = 0;
103 extern int worker_count;
104 int relays_connecting;
105 int streamlister;
106 time_t relay_barrier_master;
107 time_t relay_barrier_xml;
108
109 static volatile int update_settings = 0;
110 static volatile int update_all_sources = 0;
111 static volatile int restart_connection_thread = 0;
112 static time_t streamlist_check = 0;
113 static rwlock_t slaves_lock;
114 static spin_t relay_start_lock;
115 static time_t inactivity_timer;
116 static int inactivity_timeout;
117
118 redirect_host *redirectors;
119 worker_t *workers;
120 rwlock_t workers_lock;
121
122
123 struct _client_functions relay_client_ops =
124 {
125 relay_read,
126 relay_release
127 };
128
129 struct _client_functions relay_startup_ops =
130 {
131 relay_startup,
132 relay_release
133 };
134
135 struct _client_functions relay_init_ops =
136 {
137 relay_initialise,
138 relay_release
139 };
140
141
relay_copy(relay_server * r)142 relay_server *relay_copy (relay_server *r)
143 {
144 relay_server *copy = calloc (1, sizeof (relay_server));
145
146 if (copy)
147 {
148 relay_server_host *from = r->hosts, **insert = ©->hosts;
149
150 while (from)
151 {
152 relay_server_host *to = calloc (1, sizeof (relay_server_host));
153 to->ip = (char *)xmlCharStrdup (from->ip);
154 to->mount = (char *)xmlCharStrdup (from->mount);
155 if (from->bind)
156 to->bind = (char *)xmlCharStrdup (from->bind);
157 to->port = from->port;
158 to->timeout = from->timeout;
159 *insert = to;
160 from = from->next;
161 insert = &to->next;
162 }
163
164 copy->localmount = (char *)xmlStrdup (XMLSTR(r->localmount));
165 if (r->username)
166 copy->username = (char *)xmlStrdup (XMLSTR(r->username));
167 if (r->password)
168 copy->password = (char *)xmlStrdup (XMLSTR(r->password));
169 copy->flags = r->flags;
170 copy->flags |= RELAY_RUNNING;
171 copy->interval = r->interval;
172 copy->run_on = r->run_on;
173 r->source = NULL;
174 DEBUG2 ("copy relay %s at %p", copy->localmount, copy);
175 }
176 return copy;
177 }
178
179
180 /* force a recheck of the mounts.
181 */
slave_update_mounts(void)182 void slave_update_mounts (void)
183 {
184 thread_spin_lock (&relay_start_lock);
185 update_settings = 1;
186 thread_spin_unlock (&relay_start_lock);
187 }
188
189 /* force a recheck of the mounts.
190 */
slave_update_all_mounts(void)191 void slave_update_all_mounts (void)
192 {
193 thread_spin_lock (&relay_start_lock);
194 update_settings = 1;
195 update_all_sources = 1;
196 thread_spin_unlock (&relay_start_lock);
197 }
198
199
200 /* called on reload, so drop all redirection and trigger source checkup and
201 * rebuild all stat mountpoints
202 */
slave_restart(void)203 void slave_restart (void)
204 {
205 thread_spin_lock (&relay_start_lock);
206 restart_connection_thread = 1;
207 update_settings = 1;
208 update_all_sources = 1;
209 streamlist_check = 0;
210 thread_spin_unlock (&relay_start_lock);
211 }
212
213
_compare_relay(void * arg,void * a,void * b)214 static int _compare_relay(void *arg, void *a, void *b)
215 {
216 relay_server *nodea = (relay_server *)a;
217 relay_server *nodeb = (relay_server *)b;
218
219 return strcmp(nodea->localmount, nodeb->localmount);
220 }
221
222
slave_initialize(void)223 void slave_initialize(void)
224 {
225 if (slave_running)
226 return;
227
228 thread_rwlock_create (&slaves_lock);
229 slave_running = 1;
230 streamlister = 0;
231 streamlist_check = 0;
232 update_settings = 0;
233 update_all_sources = 0;
234 restart_connection_thread = 0;
235 redirectors = NULL;
236 workers = NULL;
237 worker_count = 0;
238 relays_connecting = 0;
239 thread_spin_create (&relay_start_lock);
240 thread_rwlock_create (&workers_lock);
241 global.relays = avl_tree_new (_compare_relay, NULL);
242 inactivity_timeout = 0;
243 inactivity_timer = 0;
244 #ifndef HAVE_CURL
245 ERROR0 ("streamlist request disabled, rebuild with libcurl if required");
246 #endif
247 _slave_thread ();
248 yp_stop ();
249 workers_adjust(0);
250 }
251
252
slave_shutdown(void)253 void slave_shutdown(void)
254 {
255 if (slave_running == 0)
256 return;
257 DEBUG0 ("shutting down slave");
258 yp_shutdown();
259 stats_shutdown();
260 fserve_shutdown();
261 config_shutdown();
262 stop_logging();
263 // stall until workers have shut down
264 thread_rwlock_wlock (&global.workers_rw);
265 thread_rwlock_unlock (&global.workers_rw);
266
267 //INFO0 ("all workers shut down");
268 avl_tree_free (global.relays, NULL);
269 thread_rwlock_destroy (&slaves_lock);
270 thread_rwlock_destroy (&workers_lock);
271 thread_spin_destroy (&relay_start_lock);
272 slave_running = 0;
273 }
274
275
redirect_client(const char * mountpoint,client_t * client)276 int redirect_client (const char *mountpoint, client_t *client)
277 {
278 int ret = 0, which;
279 redirect_host *checking, **trail;
280
281 thread_rwlock_rlock (&slaves_lock);
282 /* select slave entry */
283 if (global.redirect_count == 0)
284 {
285 thread_rwlock_unlock (&slaves_lock);
286 return 0;
287 }
288 which=(int) (((float)global.redirect_count)*rand()/(RAND_MAX+1.0)) + 1;
289 checking = redirectors;
290 trail = &redirectors;
291
292 DEBUG2 ("random selection %d (out of %d)", which, global.redirect_count);
293 while (checking)
294 {
295 DEBUG2 ("...%s:%d", checking->server, checking->port);
296 if (checking->next_update && checking->next_update+10 < time(NULL))
297 {
298 /* no streamist request, expire slave for now */
299 *trail = checking->next;
300 global.redirect_count--;
301 /* free slave details */
302 INFO2 ("dropping redirector for %s:%d", checking->server, checking->port);
303 free (checking->server);
304 free (checking);
305 checking = *trail;
306 if (which > 0)
307 which--; /* we are 1 less now */
308 continue;
309 }
310 if (--which == 0)
311 {
312 char *location;
313 /* add enough for "http://" the port ':' and nul */
314 int len = strlen (mountpoint) + strlen (checking->server) + 20;
315 const char *user = client->username;
316 const char *pass = client->password;
317 const char *args = httpp_getvar (client->parser, HTTPP_VAR_QUERYARGS);
318 const char *colon = ":", *at_sign = "@";
319
320 if (args)
321 len += strlen (args);
322 else
323 args = "";
324 if (user && pass)
325 len += strlen (user) + strlen (pass);
326 else
327 colon = at_sign = user = pass = "";
328 INFO2 ("redirecting listener to slave server at %s:%d", checking->server, checking->port);
329 location = alloca (len);
330 snprintf (location, len, "%s://%s%s%s%s%s:%d%s%s", httpp_getvar (client->parser, HTTPP_VAR_PROTOCOL),
331 user, colon, pass, at_sign,
332 checking->server, checking->port, mountpoint, args);
333 client_send_302 (client, location);
334 ret = 1;
335 }
336 trail = &checking->next;
337 checking = checking->next;
338 }
339 thread_rwlock_unlock (&slaves_lock);
340 return ret;
341 }
342
343
344
get_relay_response(connection_t * con,const char * mount,const char * server,const char * headers)345 static http_parser_t *get_relay_response (connection_t *con, const char *mount,
346 const char *server, const char *headers)
347 {
348 ice_config_t *config = config_get_config ();
349 char *server_id = strdup (config->server_id);
350 http_parser_t *parser = NULL;
351 char response [4096];
352
353 config_release_config ();
354
355 /* At this point we may not know if we are relaying an mp3 or vorbis
356 * stream, but only send the icy-metadata header if the relay details
357 * state so (the typical case). It's harmless in the vorbis case. If
358 * we don't send in this header then relay will not have mp3 metadata.
359 */
360 sock_write (con->sock, "GET %s HTTP/1.0\r\n"
361 "User-Agent: %s\r\n"
362 "Host: %s\r\n"
363 "%s"
364 "\r\n",
365 mount,
366 server_id,
367 server,
368 headers ? headers : "");
369
370 free (server_id);
371 memset (response, 0, sizeof(response));
372 if (util_read_header (con->sock, response, 4096, READ_ENTIRE_HEADER) == 0)
373 {
374 WARN2 ("Header read failure from %s %s", server, mount);
375 return NULL;
376 }
377 parser = httpp_create_parser();
378 httpp_initialize (parser, NULL);
379 if (! httpp_parse_response (parser, response, strlen(response), mount))
380 {
381 INFO0 ("problem parsing response from relay");
382 httpp_destroy (parser);
383 return NULL;
384 }
385 return parser;
386 }
387
388
389
encode_auth_header(char * userpass,unsigned int remain)390 static void encode_auth_header (char *userpass, unsigned int remain)
391 {
392 if (userpass && userpass[0])
393 {
394 char *esc_authorisation = util_base64_encode (userpass);
395
396 if (snprintf (userpass, remain, "Authorization: Basic %s\r\n", esc_authorisation) < 0)
397 userpass[0] = '\0';
398 free (esc_authorisation);
399 }
400 }
401
402
403 /* Actually open the connection and do some http parsing, handle any 302
404 * responses within here.
405 */
open_relay_connection(client_t * client,relay_server * relay,relay_server_host * host)406 static int open_relay_connection (client_t *client, relay_server *relay, relay_server_host *host)
407 {
408 int redirects = 0;
409 http_parser_t *parser = NULL;
410 connection_t *con = &client->connection;
411 char *server = strdup (host->ip);
412 char *mount = strdup (host->mount);
413 int port = host->port, timeout = host->timeout, remain;
414 char *p, headers[4096] = "";
415
416 remain = sizeof (headers);
417 if (relay->flags & RELAY_ICY_META)
418 remain -= snprintf (headers, remain, "Icy-MetaData: 1\r\n");
419 p = headers + strlen (headers);
420 if (relay->username && relay->password)
421 {
422 INFO2 ("using username %s for %s", relay->username, relay->localmount);
423 snprintf (p, remain, "%s:%s", relay->username, relay->password);
424 encode_auth_header (p, remain);
425 }
426 while (1)
427 {
428 sock_t streamsock;
429 char *bind = NULL;
430
431 if (redirects > 10)
432 {
433 WARN1 ("detected too many redirects on %s", relay->localmount);
434 break;
435 }
436 /* policy decision, we assume a source bind even after redirect, possible option */
437 if (host->bind)
438 bind = strdup (host->bind);
439
440 if (bind)
441 INFO4 ("connecting to %s:%d for %s, bound to %s", server, port, relay->localmount, bind);
442 else
443 INFO3 ("connecting to %s:%d for %s", server, port, relay->localmount);
444
445 con->con_time = time (NULL);
446 relay->in_use = host;
447 streamsock = sock_connect_wto_bind (server, port, bind, timeout);
448 free (bind);
449 if (connection_init (con, streamsock, server) < 0)
450 {
451 WARN2 ("Failed to connect to %s:%d", server, port);
452 break;
453 }
454
455 parser = get_relay_response (con, mount, server, headers);
456
457 if (parser == NULL)
458 {
459 ERROR4 ("Problem trying to start relay on %s (%s:%d%s)", relay->localmount,
460 server, port, mount);
461 break;
462 }
463 if (strcmp (httpp_getvar (parser, HTTPP_VAR_ERROR_CODE), "302") == 0)
464 {
465 /* better retry the connection again but with different details */
466 const char *uri, *mountpoint;
467 int len;
468
469 uri = httpp_getvar (parser, "location");
470 INFO2 ("redirect received on %s : %s", relay->localmount, uri);
471 if (strncmp (uri, "http://", 7) != 0)
472 break;
473 uri += 7;
474 mountpoint = strchr (uri, '/');
475 free (mount);
476 if (mountpoint)
477 mount = strdup (mountpoint);
478 else
479 mount = strdup ("/");
480
481 len = strcspn (uri, "@/");
482 if (uri [len] == '@')
483 {
484 snprintf (p, remain, "%.*s", len, uri);
485 encode_auth_header (p, remain);
486 uri += len + 1;
487 }
488 len = strcspn (uri, ":/");
489 port = 80;
490 if (uri [len] == ':')
491 port = atoi (uri+len+1);
492 free (server);
493 server = calloc (1, len+1);
494 strncpy (server, uri, len);
495 connection_close (con);
496 httpp_destroy (parser);
497 parser = NULL;
498 }
499 else
500 {
501 if (httpp_getvar (parser, HTTPP_VAR_ERROR_MESSAGE))
502 {
503 ERROR3 ("Error from relay request on %s (%s %s)", relay->localmount,
504 host->mount, httpp_getvar(parser, HTTPP_VAR_ERROR_MESSAGE));
505 client->parser = NULL;
506 break;
507 }
508 sock_set_blocking (streamsock, 0);
509 thread_rwlock_wlock (&relay->source->lock);
510 client->parser = parser; // old parser will be free in the format clear
511 thread_rwlock_unlock (&relay->source->lock);
512 client->connection.discon.time = 0;
513 client->connection.con_time = time (NULL);
514 client_set_queue (client, NULL);
515 free (server);
516 free (mount);
517
518 return 0;
519 }
520 redirects++;
521 }
522 /* failed, better clean up */
523 free (server);
524 free (mount);
525 if (parser)
526 httpp_destroy (parser);
527 connection_close (con);
528 con->con_time = time (NULL); // sources count needs to drop in such cases
529 if (relay->in_use) relay->in_use->skip = 1;
530 return -1;
531 }
532
533
534 /* This does the actual connection for a relay. A thread is
535 * started off if a connection can be acquired
536 */
open_relay(relay_server * relay)537 int open_relay (relay_server *relay)
538 {
539 source_t *src = relay->source;
540 relay_server_host *host = relay->hosts;
541 client_t *client = src->client;
542 do
543 {
544 int ret;
545
546 if (host->skip)
547 {
548 INFO3 ("skipping %s:%d for %s", host->ip, host->port, relay->localmount);
549 continue;
550 }
551 thread_rwlock_unlock (&src->lock);
552 ret = open_relay_connection (client, relay, host);
553 thread_rwlock_wlock (&src->lock);
554
555 if (ret < 0)
556 continue;
557
558 if (source_format_init (src) < 0)
559 {
560 WARN1 ("Failed to complete initialisation on %s", relay->localmount);
561 continue;
562 }
563 return 1;
564 } while ((host = host->next) && global.running == ICE_RUNNING);
565 return -1;
566 }
567
start_relay_stream(void * arg)568 static void *start_relay_stream (void *arg)
569 {
570 client_t *client = arg;
571 relay_server *relay;
572 source_t *src;
573 int failed = 1, sources;
574
575 global_lock();
576 sources = ++global.sources;
577 stats_event_args (NULL, "sources", "%d", global.sources);
578 global_unlock();
579 /* set the start time, because we want to decrease the sources on all failures */
580 client->connection.con_time = time (NULL);
581 do
582 {
583 ice_config_t *config = config_get_config();
584 mount_proxy *mountinfo;
585
586 relay = client->shared_data;
587 src = relay->source;
588
589 thread_rwlock_wlock (&src->lock);
590 src->flags |= SOURCE_PAUSE_LISTENERS;
591 if (sources > config->source_limit)
592 {
593 config_release_config();
594 WARN1 ("starting relayed mountpoint \"%s\" requires a higher sources limit", relay->localmount);
595 break;
596 }
597 config_release_config();
598 INFO1("Starting relayed source at mountpoint \"%s\"", relay->localmount);
599
600 if (open_relay (relay) < 0)
601 break;
602 stats_event_inc (NULL, "source_relay_connections");
603 source_init (src);
604 config = config_get_config();
605 mountinfo = config_find_mount (config, src->mount);
606 source_update_settings (config, src, mountinfo);
607 INFO1 ("source %s is ready to start", src->mount);
608 config_release_config();
609 failed = 0;
610 } while (0);
611
612 client->ops = &relay_client_ops;
613 client->schedule_ms = timing_get_time();
614
615 if (failed)
616 {
617 /* failed to start any connection, better clean up and reset */
618 if ((relay->flags & RELAY_ON_DEMAND) == 0)
619 {
620 yp_remove (relay->localmount);
621 src->yp_public = -1;
622 }
623 relay->in_use = NULL;
624 INFO2 ("listener count remaining on %s is %ld", src->mount, src->listeners);
625 src->flags &= ~(SOURCE_PAUSE_LISTENERS|SOURCE_RUNNING);
626 }
627 thread_rwlock_unlock (&src->lock);
628
629 thread_spin_lock (&relay_start_lock);
630 relays_connecting--;
631 thread_spin_unlock (&relay_start_lock);
632
633 client->flags |= CLIENT_ACTIVE;
634 worker_wakeup (client->worker);
635 return NULL;
636 }
637
638
639
_drop_relay(void * a)640 static int _drop_relay (void *a)
641 {
642 relay_server *r = (relay_server*)a;
643
644 if (r->source)
645 {
646 client_t *client = r->source->client;
647 client->schedule_ms = 0;
648 }
649 r->flags &= ~RELAY_IN_LIST;
650 DEBUG2 ("dropped relay %s (%p)", r->localmount, r);
651 return 0;
652 }
653
654
_drop_relay_cleanup(void * a)655 static int _drop_relay_cleanup (void *a)
656 {
657 relay_server *r = (relay_server*)a;
658
659 _drop_relay (a);
660 r->flags |= RELAY_CLEANUP;
661 return 0;
662 }
663
664
665
detach_master_relay(const char * localmount,int cleanup)666 static void detach_master_relay (const char *localmount, int cleanup)
667 {
668 relay_server find;
669
670 find.localmount = (char*)localmount;
671 avl_delete (global.relays, &find, cleanup ? _drop_relay_cleanup : _drop_relay);
672 }
673
674
675
relay_has_source(relay_server * relay,client_t * client)676 int relay_has_source (relay_server *relay, client_t *client)
677 {
678 source_t *source = relay->source;
679 if (source)
680 thread_rwlock_wlock (&source->lock);
681 else
682 {
683 source = source_reserve (relay->localmount, 0);
684 if (source == NULL)
685 return 0;
686 relay->source = source;
687 source->client = client;
688 source->format->type = relay->type;
689 }
690 if (source_format_init (source) < 0)
691 {
692 detach_master_relay (relay->localmount, 1);
693 thread_rwlock_unlock (&source->lock);
694 return -1;
695 }
696 return 1;
697 }
698
699
relay_installed(relay_server * relay)700 static int relay_installed (relay_server *relay)
701 {
702 client_t *client = calloc (1, sizeof (client_t));
703
704 connection_init (&client->connection, SOCK_ERROR, NULL);
705 switch (relay_has_source (relay, client))
706 {
707 case -1:
708 free (client);
709 return 0;
710 case 1:
711 thread_rwlock_unlock (&relay->source->lock);
712 }
713 global_lock();
714 client_register (client);
715 global_unlock();
716
717 client->shared_data = relay;
718 client->ops = &relay_init_ops;
719 relay->flags |= RELAY_IN_LIST;
720 avl_insert (global.relays, relay);
721 client->flags |= CLIENT_ACTIVE;
722 client_add_worker (client);
723 DEBUG2 ("adding relay client for %s (%p)", relay->localmount, relay);
724 return 1;
725 }
726
727
728 #ifdef HAVE_CURL
create_master_relay(const char * local,const char * remote,format_type_t t,struct master_conn_details * master)729 static relay_server *create_master_relay (const char *local, const char *remote, format_type_t t, struct master_conn_details *master)
730 {
731 relay_server *relay;
732 relay_server_host *m;
733
734 if (local[0] != '/')
735 {
736 WARN1 ("relay mountpoint \"%s\" does not start with /, skipping", local);
737 return NULL;
738 }
739 relay = calloc (1, sizeof (relay_server));
740
741 m = calloc (1, sizeof (relay_server_host));
742 m->ip = (char *)xmlStrdup (XMLSTR(master->server));
743 m->port = master->port;
744 if (master->bind)
745 m->bind = (char *)xmlStrdup (XMLSTR(master->bind));
746 // may need to add the admin link later instead of assuming mount is as-is
747 m->mount = (char *)xmlStrdup (XMLSTR(remote));
748 m->timeout = 4;
749 relay->hosts = m;
750
751 relay->localmount = (char *)xmlStrdup (XMLSTR(local));
752 relay->flags |= (RELAY_RUNNING | RELAY_ICY_META);
753 if (master->on_demand)
754 relay->flags |= RELAY_ON_DEMAND;
755 if (master->on_demand) relay->flags |= RELAY_ON_DEMAND;
756 relay->interval = master->max_interval;
757 relay->run_on = master->run_on;
758 if (master->send_auth)
759 {
760 relay->username = (char *)xmlStrdup (XMLSTR(master->username));
761 relay->password = (char *)xmlStrdup (XMLSTR(master->password));
762 }
763 relay->updated = master->synctime;
764 relay->flags |= RELAY_FROM_MASTER;
765 return relay;
766 }
767
768
add_master_relay(const char * mount,const char * type,struct master_conn_details * master)769 static int add_master_relay (const char *mount, const char *type, struct master_conn_details *master)
770 {
771 int ret = -1, notfound;
772 relay_server *result = NULL, find;
773
774 if (strncmp (mount, "/admin/streams?mount=/", 22) == 0)
775 find.localmount = (char *)(mount+21);
776 else
777 find.localmount = (char *)mount;
778
779 notfound = avl_get_by_key (global.relays, &find, (void*)&result);
780 if (notfound || (result->flags & RELAY_CLEANUP))
781 {
782 relay_server *new_relay = create_master_relay (find.localmount, mount, format_get_type (type), master);
783
784 if (new_relay)
785 {
786 if (result && result->flags & RELAY_CLEANUP)
787 {
788 // drop this now, to avoid a duplicate relay that may match later
789 detach_master_relay (find.localmount, 0);
790 }
791 if (relay_installed (new_relay))
792 ret = new_relay->source ? 2 : 1;
793 else
794 {
795 config_clear_relay (new_relay);
796 ret = 0;
797 }
798 }
799 }
800 else
801 {
802 if (notfound == 0)
803 result->updated = master->synctime; // avoid relay expiry
804 if (streamlist_check == 0)
805 INFO1 ("relay \"%s\" already in use, ignoring", mount);
806 }
807 return ret;
808 }
809
810
811 /* process a single HTTP header from streamlist response */
streamlist_header(void * ptr,size_t size,size_t nmemb,void * stream)812 static size_t streamlist_header (void *ptr, size_t size, size_t nmemb, void *stream)
813 {
814 size_t passed_len = size*nmemb;
815 char *eol = memchr (ptr, '\r', passed_len);
816 struct master_conn_details *master = stream;
817
818 /* drop EOL chars if any */
819 if (eol)
820 *eol = '\0';
821 else
822 {
823 eol = memchr (ptr, '\n', passed_len);
824 if (eol)
825 *eol = '\0';
826 else
827 return -1;
828 }
829 if (strncmp (ptr, "HTTP", 4) == 0)
830 {
831 int respcode = 0;
832 if (sscanf (ptr, "HTTP%*s %d OK", &respcode) == 1 && respcode == 200)
833 master->ok = 1; // needed if resetting master relays ???
834 else
835 WARN1 ("Failed response from master \"%s\"", (char*)ptr);
836 }
837 //DEBUG1 ("header is %s", ptr);
838 return passed_len;
839 }
840
841
842 /* process mountpoint list from master server. This may be called multiple
843 * times so watch for the last line in this block as it may be incomplete
844 */
streamlist_data(void * ptr,size_t size,size_t nmemb,void * stream)845 static size_t streamlist_data (void *ptr, size_t size, size_t nmemb, void *stream)
846 {
847 struct master_conn_details *master = stream;
848 size_t passed_len = size*nmemb;
849 size_t len = passed_len;
850 char *buffer = ptr, *buf = ptr;
851 int prev = 0;
852
853 if (master->ok == 0)
854 return passed_len;
855 if (master->previous)
856 {
857 char *eol = memchr (ptr, '\n', passed_len < 150 ? passed_len : 150);
858 if (eol == NULL)
859 {
860 if (passed_len > 150 || master->previous > 200)
861 {
862 WARN1 ("long line received for append, ignoring %ld", (long)passed_len);
863 return (master->ok = 0);
864 }
865 buffer = realloc (master->buffer, len + 1);
866 if (buffer == NULL) return 0;
867 master->buffer = buffer;
868 memcpy (master->buffer + master->previous, ptr, passed_len);
869 master->buffer [len] = '\0';
870 master->previous = len;
871 return passed_len;
872 }
873 // just fill out enough for 1 entry
874 len = (eol - buffer) + 1 + master->previous;
875 buffer = realloc (master->buffer, len + 1);
876 if (buffer == NULL) return 0;
877 master->buffer = buffer;
878 prev = len - master->previous;
879 memcpy (buffer+master->previous, ptr, prev);
880 buffer [len] = '\0';
881 buf = buffer;
882 }
883
884 avl_tree_wlock (global.relays);
885 while (len)
886 {
887 int offset;
888 char *eol = strchr (buf, '\n');
889 if (eol)
890 {
891 offset = (eol - buf) + 1;
892 *eol = '\0';
893 eol = strchr (buf, '\r');
894 if (eol) *eol = '\0';
895 }
896 else
897 {
898 /* incomplete line, the rest may be in the next read */
899 master->buffer = calloc (1, len + 1);
900 memcpy (master->buffer, buf, len);
901 master->previous = len;
902 break;
903 }
904
905 if (*buf == '/')
906 {
907 DEBUG1 ("read from master \"%s\"", buf);
908 add_master_relay (buf, NULL, master);
909 }
910 else
911 DEBUG1 ("skipping \"%s\"", buf);
912 buf += offset;
913 len -= offset;
914 if (len == 0 && prev)
915 {
916 buf = ptr + prev;
917 len = passed_len - prev;
918 free (master->buffer);
919 master->buffer = NULL;
920 master->previous = 0;
921 prev = 0;
922 }
923 }
924 avl_tree_unlock (global.relays);
925 return passed_len;
926 }
927
928
929 /* retrieve streamlist from master server. The streamlist can be retrieved
930 * from an SSL port if curl is capable and the config is aware of the port
931 * to use
932 */
streamlist_thread(void * arg)933 static void *streamlist_thread (void *arg)
934 {
935 struct master_conn_details *master = arg;
936 CURL *handle;
937 const char *protocol = "http";
938 int port = master->port;
939 char error [CURL_ERROR_SIZE];
940 char url [1024], auth [100];
941
942 DEBUG0 ("checking master stream list");
943 if (master->ssl_port)
944 {
945 protocol = "https";
946 port = master->ssl_port;
947 }
948 snprintf (auth, sizeof (auth), "%s:%s", master->username, master->password);
949 snprintf (url, sizeof (url), "%s://%s:%d/admin/streams%s",
950 protocol, master->server, port, master->args);
951 handle = curl_easy_init ();
952 curl_easy_setopt (handle, CURLOPT_USERAGENT, master->server_id);
953 curl_easy_setopt (handle, CURLOPT_URL, url);
954 curl_easy_setopt (handle, CURLOPT_HEADERFUNCTION, streamlist_header);
955 curl_easy_setopt (handle, CURLOPT_HEADERDATA, master);
956 curl_easy_setopt (handle, CURLOPT_WRITEFUNCTION, streamlist_data);
957 curl_easy_setopt (handle, CURLOPT_WRITEDATA, master);
958 curl_easy_setopt (handle, CURLOPT_USERPWD, auth);
959 curl_easy_setopt (handle, CURLOPT_ERRORBUFFER, error);
960 curl_easy_setopt (handle, CURLOPT_SSL_VERIFYPEER, 0L);
961 curl_easy_setopt (handle, CURLOPT_NOSIGNAL, 1L);
962 curl_easy_setopt (handle, CURLOPT_CONNECTTIMEOUT, 5L);
963 curl_easy_setopt (handle, CURLOPT_TIMEOUT, 120L);
964 if (master->bind)
965 curl_easy_setopt (handle, CURLOPT_INTERFACE, master->bind);
966
967 master->ok = 0;
968 master->synctime = time(NULL);
969 if (curl_easy_perform (handle) != 0 || master->ok == 0)
970 {
971 /* fall back to traditional request */
972 INFO0 ("/admin/streams failed trying streamlist");
973 snprintf (url, sizeof (url), "%s://%s:%d/admin/streamlist.txt%s",
974 protocol, master->server, port, master->args);
975 curl_easy_setopt (handle, CURLOPT_URL, url);
976 if (curl_easy_perform (handle) != 0)
977 WARN2 ("Failed URL access \"%s\" (%s)", url, error);
978 }
979 if (master->ok)
980 relay_barrier_master = master->synctime;
981
982 curl_easy_cleanup (handle);
983 free (master->server);
984 free (master->username);
985 free (master->password);
986 free (master->buffer);
987 free (master->server_id);
988 free (master->args);
989 free (master);
990 streamlister = 0;
991 return NULL;
992 }
993 #endif
994
995
update_relays(ice_config_t * config)996 void update_relays (ice_config_t *config)
997 {
998 int notfound, trap = 10;
999 relay_server *relay, *result, *copy, find;
1000 time_t sync_time = time (NULL);
1001
1002 avl_tree_wlock (global.relays);
1003 relay = config->relays;
1004 while (relay)
1005 {
1006 find.localmount = relay->localmount;
1007 notfound = avl_get_by_key (global.relays, &find, (void*)&result);
1008 if (notfound)
1009 {
1010 relay_server *new_relay = relay_copy (relay);
1011 if (new_relay)
1012 {
1013 new_relay->updated = sync_time;
1014 if (! relay_installed (new_relay))
1015 config_clear_relay (new_relay);
1016 }
1017 }
1018 else
1019 {
1020 detach_master_relay (find.localmount, 0); // drop current one from tree
1021 if (result->flags & RELAY_CLEANUP)
1022 {
1023 // should be rare but a relay could be leaving
1024 DEBUG1 ("old relay with cleanup flagged detected %s", result->localmount);
1025 if (--trap)
1026 continue;
1027 WARN1 ("Detected loop with lookup of %s", find.localmount);
1028 break;
1029 }
1030 if (result->source == NULL)
1031 {
1032 INFO1 ("current relay %s not initialised, removed", result->localmount);
1033 continue;
1034 }
1035 copy = relay_copy (relay);
1036 DEBUG2 ("adding new relay %s (%p) into tree", relay->localmount, copy);
1037 // let client trigger the switchover for new details
1038 result->new_details = copy;
1039 copy->updated = sync_time;
1040 copy->flags |= RELAY_IN_LIST;
1041 avl_insert (global.relays, copy);
1042 }
1043 trap = 10;
1044 relay = relay->new_details;
1045 }
1046 relay_barrier_xml = sync_time;
1047 avl_tree_unlock (global.relays);
1048 }
1049
1050
update_from_master(ice_config_t * config)1051 static void update_from_master (ice_config_t *config)
1052 {
1053 #ifdef HAVE_CURL
1054 struct master_conn_details *details;
1055
1056 if (config->master_password == NULL || config->master_server == NULL ||
1057 config->master_server_port == 0)
1058 return;
1059 if (streamlister) return;
1060 streamlister = 1;
1061 details = calloc (1, sizeof (*details));
1062 details->server = strdup (config->master_server);
1063 details->port = config->master_server_port;
1064 details->ssl_port = config->master_ssl_port;
1065 details->username = strdup (config->master_username);
1066 details->password = strdup (config->master_password);
1067 details->send_auth = config->master_relay_auth;
1068 details->bind = (config->master_bind) ? strdup (config->master_bind) : NULL;
1069 details->on_demand = config->on_demand;
1070 details->server_id = strdup (config->server_id);
1071 details->max_interval = config->master_relay_retry;
1072 details->run_on = config->master_run_on;
1073 if (config->master_redirect)
1074 {
1075 details->args = malloc (4096);
1076 snprintf (details->args, 4096, "?rserver=%s&rport=%d&interval=%d",
1077 config->hostname, config->port, config->master_update_interval);
1078 }
1079 else
1080 details->args = strdup ("");
1081
1082 thread_create ("streamlist", streamlist_thread, details, THREAD_DETACHED);
1083 #endif
1084 }
1085
1086
update_master_as_slave(ice_config_t * config)1087 static void update_master_as_slave (ice_config_t *config)
1088 {
1089 redirect_host *redirect;
1090
1091 if (config->master_server == NULL || config->master_redirect == 0 || config->max_redirects == 0)
1092 return;
1093
1094 thread_rwlock_wlock (&slaves_lock);
1095 redirect = find_slave_host (config->master_server, config->master_server_port);
1096 if (redirect == NULL)
1097 {
1098 INFO2 ("adding master %s:%d", config->master_server, config->master_server_port);
1099 redirector_add (config->master_server, config->master_server_port, 0);
1100 }
1101 else
1102 redirect->next_update += config->master_update_interval;
1103 thread_rwlock_unlock (&slaves_lock);
1104 }
1105
1106
slave_startup(void)1107 static void slave_startup (void)
1108 {
1109 ice_config_t *config = config_get_config();
1110
1111 #ifdef HAVE_GETRLIMIT
1112 struct rlimit rlimit;
1113 if (getrlimit (RLIMIT_NOFILE, &rlimit) == 0)
1114 {
1115 if (rlimit.rlim_cur < rlimit.rlim_max)
1116 {
1117 long old = rlimit.rlim_cur;
1118 rlimit.rlim_cur = rlimit.rlim_max;
1119 if (setrlimit (RLIMIT_NOFILE, &rlimit) < 0)
1120 rlimit.rlim_cur = old;
1121 }
1122 WARN1 ("process has %ld max file descriptor limit", (long)rlimit.rlim_cur);
1123 }
1124 if (getrlimit (RLIMIT_CORE, &rlimit) == 0)
1125 {
1126 if (rlimit.rlim_cur < rlimit.rlim_max)
1127 {
1128 rlimit.rlim_cur = rlimit.rlim_max;
1129 setrlimit (RLIMIT_CORE, &rlimit);
1130 }
1131 }
1132 #endif
1133
1134 update_settings = 0;
1135 update_all_sources = 0;
1136
1137 redirector_setup (config);
1138 stats_global (config);
1139 workers_adjust (config->workers_count);
1140 yp_initialize (config);
1141 update_relays (config);
1142 config_release_config();
1143
1144 source_recheck_mounts (1);
1145 connection_thread_startup();
1146 }
1147
_slave_thread(void)1148 static void _slave_thread(void)
1149 {
1150 slave_startup();
1151
1152 while (1)
1153 {
1154 struct timespec current;
1155 int do_reread = 0;
1156
1157 thread_get_timespec (¤t);
1158
1159 global_lock();
1160 if (global.running != ICE_RUNNING)
1161 break;
1162 /* re-read xml file if requested */
1163 if (global . schedule_config_reread)
1164 {
1165 global . schedule_config_reread = 0;
1166 do_reread = 1;
1167 }
1168
1169 if (global.new_connections_slowdown)
1170 global.new_connections_slowdown--;
1171 if (global.new_connections_slowdown > 30)
1172 global.new_connections_slowdown = 30;
1173
1174 global_unlock();
1175
1176 global_add_bitrates (global.out_bitrate, 0L, THREAD_TIME_MS(¤t));
1177 if (do_reread)
1178 event_config_read ();
1179
1180 if (streamlist_check <= current.tv_sec)
1181 {
1182 ice_config_t *config = config_get_config();
1183
1184 streamlist_check = current.tv_sec + config->master_update_interval;
1185 update_master_as_slave (config);
1186
1187 update_from_master (config);
1188
1189 config_release_config();
1190 }
1191
1192 int update = 0, update_all = 0, restart = 0;
1193 thread_spin_lock (&relay_start_lock);
1194 if (update_settings)
1195 {
1196 update = update_settings;
1197 update_all = update_all_sources;
1198 if (update_all_sources || current.tv_sec%5 == 0)
1199 {
1200 update_settings = 0;
1201 update_all_sources = 0;
1202 }
1203 if (restart_connection_thread)
1204 {
1205 restart = restart_connection_thread;
1206 restart_connection_thread = 0;
1207 }
1208 }
1209 thread_spin_unlock (&relay_start_lock);
1210
1211 if (update)
1212 source_recheck_mounts (update_all);
1213 if (restart)
1214 {
1215 connection_thread_shutdown();
1216 connection_thread_startup();
1217 }
1218
1219 stats_global_calc (current.tv_sec);
1220 fserve_scan (current.tv_sec);
1221
1222 /* allow for terminating icecast if no streams running */
1223 if (inactivity_timer)
1224 {
1225 if (global.sources)
1226 {
1227 inactivity_timer = 0;
1228 INFO0 ("inactivity timeout cancelled");
1229 }
1230 else if (inactivity_timer <= current.tv_sec)
1231 {
1232 INFO0 ("inactivity timeout reached, terminating server");
1233 global.running = ICE_HALTING;
1234 }
1235 }
1236 else
1237 {
1238 if (inactivity_timeout && global.sources == 0)
1239 {
1240 inactivity_timer = current.tv_sec + inactivity_timeout;
1241 INFO1 ("inactivity timeout started, terminate in %d seconds", inactivity_timeout);
1242 }
1243 }
1244 worker_balance_trigger (current.tv_sec);
1245 thread_sleep (1000000);
1246 }
1247 global_unlock();
1248 connection_thread_shutdown();
1249 fserve_running = 0;
1250 stats_clients_wakeup ();
1251 INFO0 ("shutting down current relays");
1252 time_t next = time(NULL) + 1000;
1253 thread_spin_lock (&relay_start_lock);
1254 relay_barrier_xml = next;
1255 relay_barrier_master = relay_barrier_xml;
1256 thread_spin_unlock (&relay_start_lock);
1257 redirector_clearall();
1258
1259 INFO0 ("Slave thread shutdown complete");
1260 }
1261
1262
slave_find_relay(const char * mount)1263 relay_server *slave_find_relay (const char *mount)
1264 {
1265 relay_server *result, find;
1266
1267 find.localmount = (char*)mount;
1268 if (avl_get_by_key (global.relays, &find, (void*)&result))
1269 result = NULL;
1270 return result;
1271 }
1272
1273
1274
1275 /* drop all redirection details.
1276 */
redirector_clearall(void)1277 void redirector_clearall (void)
1278 {
1279 thread_rwlock_wlock (&slaves_lock);
1280 while (redirectors)
1281 {
1282 redirect_host *current = redirectors;
1283 redirectors = current->next;
1284 INFO2 ("removing %s:%d", current->server, current->port);
1285 free (current->server);
1286 free (current);
1287 }
1288 global.redirect_count = 0;
1289 thread_rwlock_unlock (&slaves_lock);
1290 }
1291
1292
redirector_setup(ice_config_t * config)1293 void redirector_setup (ice_config_t *config)
1294 {
1295 redirect_host *redir = config->redirect_hosts;
1296
1297 thread_rwlock_wlock (&slaves_lock);
1298 while (redir)
1299 {
1300 redirector_add (redir->server, redir->port, 0);
1301 redir = redir->next;
1302 }
1303 thread_rwlock_unlock (&slaves_lock);
1304
1305 inactivity_timeout = config->inactivity_timeout;
1306 inactivity_timer = 0;
1307 }
1308
1309
1310 /* Add new redirectors or update any existing ones
1311 */
redirector_update(client_t * client)1312 void redirector_update (client_t *client)
1313 {
1314 redirect_host *redirect;
1315 const char *rserver = httpp_get_query_param (client->parser, "rserver");
1316 const char *value;
1317 int rport = 0, interval = 0;
1318
1319 if (rserver==NULL) return;
1320 value = httpp_get_query_param (client->parser, "rport");
1321 if (value == NULL) return;
1322 rport = atoi (value);
1323 if (rport <= 0) return;
1324 value = httpp_get_query_param (client->parser, "interval");
1325 if (value == NULL) return;
1326 interval = atoi (value);
1327 if (interval < 5) return;
1328
1329 thread_rwlock_wlock (&slaves_lock);
1330 redirect = find_slave_host (rserver, rport);
1331 if (redirect == NULL)
1332 {
1333 ice_config_t *config = config_get_config();
1334 unsigned int allowed = config->max_redirects;
1335
1336 config_release_config();
1337
1338 if (global.redirect_count < allowed)
1339 redirector_add (rserver, rport, interval);
1340 else
1341 INFO2 ("redirect to slave limit reached (%d, %d)", global.redirect_count, allowed);
1342 }
1343 else
1344 {
1345 DEBUG2 ("touch update on %s:%d", redirect->server, redirect->port);
1346 redirect->next_update = time(NULL) + interval;
1347 }
1348 thread_rwlock_unlock (&slaves_lock);
1349 }
1350
1351
1352
1353 /* search list of redirectors for a matching entry, lock must be held before
1354 * invoking this function
1355 */
find_slave_host(const char * server,int port)1356 static redirect_host *find_slave_host (const char *server, int port)
1357 {
1358 redirect_host *redirect = redirectors;
1359 while (redirect)
1360 {
1361 if (strcmp (redirect->server, server) == 0 && redirect->port == port)
1362 break;
1363 redirect = redirect->next;
1364 }
1365 return redirect;
1366 }
1367
1368
redirector_add(const char * server,int port,int interval)1369 static void redirector_add (const char *server, int port, int interval)
1370 {
1371 redirect_host *redirect = calloc (1, sizeof (redirect_host));
1372 if (redirect == NULL)
1373 abort();
1374 redirect->server = strdup (server);
1375 redirect->port = port;
1376 if (interval == 0)
1377 redirect->next_update = (time_t)0;
1378 else
1379 redirect->next_update = time(NULL) + interval;
1380 redirect->next = redirectors;
1381 redirectors = redirect;
1382 global.redirect_count++;
1383 INFO3 ("slave (%d) at %s:%d added", global.redirect_count,
1384 redirect->server, redirect->port);
1385 }
1386
1387
1388
relay_expired(relay_server * relay)1389 static int relay_expired (relay_server *relay)
1390 {
1391 thread_spin_lock (&relay_start_lock);
1392 time_t t = (relay->flags & RELAY_FROM_MASTER) ? relay_barrier_master : relay_barrier_xml;
1393 thread_spin_unlock (&relay_start_lock);
1394
1395 return (relay->updated < t) ? 1 : 0;
1396 }
1397
1398
get_relay_details(client_t * client)1399 static relay_server *get_relay_details (client_t *client)
1400 {
1401 relay_server *relay = client->shared_data;
1402
1403 avl_tree_rlock (global.relays);
1404 if (relay->new_details)
1405 {
1406 relay_server *old_details = relay;
1407
1408 INFO1 ("Detected change in relay details for %s", relay->localmount);
1409 client->shared_data = relay->new_details;
1410 relay = client->shared_data;
1411 relay->source = old_details->source;
1412 old_details->source = NULL;
1413 config_clear_relay (old_details);
1414 }
1415 if (relay_expired (relay))
1416 {
1417 DEBUG1 ("relay expired %s", relay->localmount);
1418 relay->flags |= RELAY_CLEANUP;
1419 }
1420 avl_tree_unlock (global.relays);
1421 if (relay->flags & RELAY_CLEANUP)
1422 relay->flags &= ~RELAY_RUNNING;
1423 return relay;
1424 }
1425
1426
relay_reset(relay_server * relay)1427 static void relay_reset (relay_server *relay)
1428 {
1429 relay_server_host *server = relay->hosts;
1430
1431 for (; server; server = server->next)
1432 server->skip = 0;
1433 INFO1 ("servers to be retried on %s", relay->localmount);
1434 }
1435
1436
relay_read(client_t * client)1437 static int relay_read (client_t *client)
1438 {
1439 relay_server *relay = get_relay_details (client);
1440 source_t *source = relay->source;
1441
1442 thread_rwlock_wlock (&source->lock);
1443 if (source_running (source))
1444 {
1445 if ((relay->flags & RELAY_RUNNING) == 0)
1446 source->flags &= ~SOURCE_RUNNING;
1447 if (source->listeners == 0 && (relay->flags & RELAY_ON_DEMAND))
1448 {
1449 if (client->connection.discon.time == 0)
1450 client->connection.discon.time = client->worker->current_time.tv_sec + relay->run_on;
1451
1452 if (client->worker->current_time.tv_sec > client->connection.discon.time)
1453 source->flags &= ~SOURCE_RUNNING;
1454 }
1455 if (source_read (source) > 0)
1456 return 1;
1457 if (source_running (source))
1458 {
1459 thread_rwlock_unlock (&source->lock);
1460 return 0;
1461 }
1462 }
1463 if ((source->flags & SOURCE_TERMINATING) == 0)
1464 {
1465 /* this section is for once through code */
1466 int fallback = global.running == ICE_RUNNING ? 1 : 0;
1467 if (client->connection.con_time && global.running == ICE_RUNNING)
1468 {
1469 if ((relay->flags & RELAY_RUNNING) && relay->in_use)
1470 fallback = 0;
1471 if ((relay->flags & RELAY_ON_DEMAND) == 0 &&
1472 client->worker->current_time.tv_sec - client->connection.con_time < 60)
1473 {
1474 /* force a server skip if a stream cannot be maintained for 1 min */
1475 WARN1 ("stream for %s died too quickly, skipping server for now", relay->localmount);
1476 if (relay->in_use) relay->in_use->skip = 1;
1477 }
1478 else
1479 {
1480 if (client->connection.sent_bytes < 500000 && source->flags & SOURCE_TIMEOUT)
1481 {
1482 WARN1 ("stream for %s timed out, skipping server for now", relay->localmount);
1483 if (relay->in_use) relay->in_use->skip = 1;
1484 }
1485 else
1486 relay_reset (relay); // spent some time on this so give other servers a chance
1487 }
1488 }
1489 /* don't pause listeners if relay shutting down */
1490 if ((relay->flags & RELAY_RUNNING) == 0)
1491 source->flags &= ~SOURCE_PAUSE_LISTENERS;
1492 // fallback listeners unless relay is to be retried
1493 INFO2 ("fallback on %s %sattempted", source->mount, fallback ? "" : "not ");
1494 source_shutdown (source, fallback);
1495 }
1496 if (source->termination_count && source->termination_count <= source->listeners)
1497 {
1498 client->schedule_ms = client->worker->time_ms + 150;
1499 if (client->timer_start + 1500 < client->worker->time_ms)
1500 {
1501 WARN2 ("%ld listeners still to process in terminating %s", source->termination_count, source->mount);
1502 source->flags &= ~SOURCE_TERMINATING;
1503 }
1504 else
1505 DEBUG3 ("%s waiting (%lu, %lu)", source->mount, source->termination_count, source->listeners);
1506 thread_rwlock_unlock (&source->lock);
1507 return 0;
1508 }
1509 DEBUG1 ("all listeners have now been checked on %s", relay->localmount);
1510 if (client->connection.con_time)
1511 {
1512 global_lock();
1513 global.sources--;
1514 stats_event_args (NULL, "sources", "%d", global.sources);
1515 global_unlock();
1516 global_reduce_bitrate_sampling (global.out_bitrate);
1517 }
1518 client->timer_start = 0;
1519 client->parser = NULL;
1520 free (source->fallback.mount);
1521 source->fallback.mount = NULL;
1522 source->flags &= ~(SOURCE_TERMINATING|SOURCE_LISTENERS_SYNC|SOURCE_ON_DEMAND);
1523 if (relay->flags & RELAY_CLEANUP)
1524 {
1525 connection_close (&client->connection);
1526 if (source->listeners)
1527 {
1528 INFO1 ("listeners on terminating relay %s, rechecking", relay->localmount);
1529 client->timer_start = client->worker->time_ms;
1530 source->termination_count = source->listeners;
1531 source->flags &= ~SOURCE_PAUSE_LISTENERS;
1532 source->flags |= SOURCE_LISTENERS_SYNC;
1533 source_listeners_wakeup (source);
1534 thread_rwlock_unlock (&source->lock);
1535 return 0; /* listeners may be paused, recheck and let them leave this stream */
1536 }
1537 INFO1 ("shutting down relay %s", relay->localmount);
1538 if (relay->flags & RELAY_IN_LIST)
1539 {
1540 avl_tree_wlock (global.relays);
1541 detach_master_relay (relay->localmount, 1);
1542 avl_tree_unlock (global.relays);
1543 }
1544 stats_lock (source->stats, NULL);
1545 stats_set_args (source->stats, "listeners", "%lu", source->listeners);
1546 stats_set (source->stats, NULL, NULL);
1547 source->stats = 0;
1548 thread_rwlock_unlock (&source->lock);
1549 slave_update_mounts();
1550 return -1;
1551 }
1552 client->ops = &relay_init_ops;
1553 do {
1554 if (relay->flags & RELAY_RUNNING)
1555 {
1556 if (client->connection.con_time && relay->in_use)
1557 {
1558 INFO1 ("standing by to restart relay on %s", relay->localmount);
1559 stats_flush (source->stats);
1560 if (relay->flags & RELAY_ON_DEMAND && source->listeners == 0 && relay->in_use->next)
1561 {
1562 source_clear_source (relay->source);
1563 relay_reset (relay);
1564 }
1565 break;
1566 }
1567 if (relay->interval < 3)
1568 relay->interval = 60; // if set too low then give a decent retry delay
1569 client->schedule_ms = client->worker->time_ms + (relay->interval * 1000);
1570 INFO2 ("standing by to restart relay on %s in %d seconds", relay->localmount, relay->interval);
1571 }
1572 else
1573 {
1574 INFO1 ("Relay %s is disabled", relay->localmount);
1575 client->schedule_ms = client->worker->time_ms + 3600000;
1576 }
1577 stats_lock (source->stats, NULL);
1578 stats_set_args (source->stats, "listeners", "%lu", source->listeners);
1579 source_clear_source (relay->source);
1580 relay_reset (relay);
1581 stats_set (source->stats, NULL, NULL);
1582 source->stats = 0;
1583 slave_update_mounts();
1584 } while (0);
1585
1586 thread_rwlock_unlock (&source->lock);
1587 connection_close (&client->connection);
1588 return 0;
1589 }
1590
1591
relay_release(client_t * client)1592 static void relay_release (client_t *client)
1593 {
1594 relay_server *relay = client->shared_data;
1595 DEBUG2("freeing relay %s (%p)", relay->localmount, relay);
1596 if (relay->source)
1597 source_free_source (relay->source);
1598 relay->source = NULL;
1599 config_clear_relay (relay);
1600 client_destroy (client);
1601 }
1602
1603
1604
1605 // This is a special case one, to act as a once through, to get the source reserved and stat initialised
relay_initialise(client_t * client)1606 static int relay_initialise (client_t *client)
1607 {
1608 relay_server *relay = get_relay_details (client);
1609 int rc = relay_has_source (relay, client);
1610 source_t *source = relay->source;
1611
1612 if (rc < 0) return -1;
1613 if (rc == 0) // in cases where relay was added ok but source in use, should be rare
1614 {
1615 WARN1 ("relay for \"%s\" cannot get started, mountpoint in use, waiting", relay->localmount);
1616 client->schedule_ms = client->worker->time_ms + 120000;
1617 return 0;
1618 }
1619 do
1620 {
1621 if (relay->flags & RELAY_RUNNING)
1622 {
1623 if (relay->flags & RELAY_ON_DEMAND)
1624 {
1625 ice_config_t *config;
1626 mount_proxy *mountinfo;
1627
1628 source_clear_source (source);
1629 config = config_get_config();
1630 mountinfo = config_find_mount (config, source->mount);
1631 source->flags |= SOURCE_ON_DEMAND;
1632 if (source->stats == 0)
1633 {
1634 source->stats = stats_lock (source->stats, source->mount);
1635 stats_release (source->stats);
1636 }
1637 source_update_settings (config, source, mountinfo);
1638 config_release_config();
1639 slave_update_mounts();
1640 stats_set_flags (source->stats, "listener_connections", "0", STATS_COUNTERS);
1641 }
1642 break;
1643 }
1644 thread_rwlock_unlock (&source->lock);
1645 if (relay->flags & RELAY_CLEANUP)
1646 return relay_read (client);
1647 client->schedule_ms = client->worker->time_ms + 1000000;
1648 return 0;
1649 } while(0);
1650 thread_rwlock_unlock (&source->lock);
1651 client->ops = &relay_startup_ops;
1652 return client->ops->process (client);
1653 }
1654
1655
relay_startup(client_t * client)1656 static int relay_startup (client_t *client)
1657 {
1658 relay_server *relay = get_relay_details (client);
1659 worker_t *worker = client->worker;
1660
1661 if ((relay->flags & RELAY_RUNNING) == 0)
1662 {
1663 if (relay->source == NULL) { WARN1 ("odd case for %s", relay->localmount); return -1; }
1664 client->ops = &relay_client_ops;
1665 client->schedule_ms = worker->time_ms + 10;
1666 DEBUG1 ("relay %s disabled", relay->localmount);
1667 return client->ops->process (client);
1668 }
1669 global_lock();
1670 if (global.running != ICE_RUNNING) /* wait for cleanup */
1671 {
1672 global_unlock();
1673 client->schedule_ms = client->worker->time_ms + 50;
1674 return 0;
1675 }
1676 global_unlock();
1677 if (worker->move_allocations)
1678 {
1679 int ret = 0;
1680 worker_t *dest_worker;
1681
1682 thread_rwlock_rlock (&workers_lock);
1683 dest_worker = worker_selected ();
1684 if (dest_worker != worker)
1685 {
1686 long diff = worker->count - dest_worker->count;
1687 if (diff > 5)
1688 {
1689 worker->move_allocations--;
1690 ret = client_change_worker (client, dest_worker);
1691 }
1692 }
1693 thread_rwlock_unlock (&workers_lock);
1694 if (ret)
1695 return ret;
1696 }
1697
1698 if (relay->flags & RELAY_ON_DEMAND)
1699 {
1700 source_t *source = relay->source;
1701 int start_relay;
1702 mount_proxy *mountinfo;
1703
1704 thread_rwlock_wlock (&source->lock);
1705 start_relay = source->listeners; // 0 or non-zero
1706 source->flags |= SOURCE_ON_DEMAND;
1707 thread_rwlock_unlock (&source->lock);
1708 mountinfo = config_find_mount (config_get_config(), source->mount);
1709
1710 if (mountinfo && mountinfo->fallback_mount)
1711 {
1712 avl_tree_rlock (global.source_tree);
1713 if (fallback_count (config_get_config_unlocked(), mountinfo->fallback_mount) > 0)
1714 start_relay = 1;
1715 avl_tree_unlock (global.source_tree);
1716 }
1717 config_release_config();
1718 if (start_relay == 0)
1719 {
1720 if (source->stats == 0)
1721 {
1722 source->stats = stats_lock (source->stats, source->mount);
1723 stats_release (source->stats);
1724 slave_update_mounts();
1725 }
1726 client->schedule_ms = (worker->time_ms + 1000) | 0xff;
1727 return 0;
1728 }
1729 INFO1 ("starting on-demand relay %s", relay->localmount);
1730 }
1731
1732 /* limit the number of relays starting up at the same time */
1733 thread_spin_lock (&relay_start_lock);
1734 if (relays_connecting > 3)
1735 {
1736 thread_spin_unlock (&relay_start_lock);
1737 client->schedule_ms = worker->time_ms + 200;
1738 if (global.new_connections_slowdown < 5)
1739 global.new_connections_slowdown++;
1740 return 0;
1741 }
1742 relays_connecting++;
1743 thread_spin_unlock (&relay_start_lock);
1744
1745 client->flags &= ~CLIENT_ACTIVE;
1746 thread_create ("Relay Thread", start_relay_stream, client, THREAD_DETACHED);
1747 return 0;
1748 }
1749
1750
fallback_count(ice_config_t * config,const char * mount)1751 int fallback_count (ice_config_t *config, const char *mount)
1752 {
1753 int count = -1, loop = 10;
1754 const char *m = mount;
1755 char buffer[4096];
1756
1757 if (mount == NULL) return -1;
1758 if (strstr (mount, "${")) return -1;
1759 while (m && loop--)
1760 {
1761 source_t *fallback = source_find_mount_raw (m);
1762 if (fallback == NULL || source_running (fallback) == 0)
1763 {
1764 unsigned int len;
1765 mount_proxy *mountinfo = config_find_mount (config, m);
1766 if (fallback == NULL)
1767 {
1768 fbinfo finfo;
1769
1770 memset (&finfo, 0, sizeof (finfo));
1771 finfo.flags = FS_FALLBACK;
1772 finfo.mount = (char *)m;
1773 finfo.fallback = NULL;
1774 finfo.limit = mountinfo ? mountinfo->limit_rate/8 : 0;
1775 if (finfo.limit == 0)
1776 {
1777 unsigned int rate;
1778 if (sscanf (m, "%*[^[][%u]", &rate) == 1)
1779 finfo.limit = rate * 1000 / 8;
1780 }
1781 count = fserve_query_count (&finfo);
1782 }
1783 if (mountinfo == NULL)
1784 break;
1785 len = sizeof buffer;
1786 if (util_expand_pattern (m, mountinfo->fallback_mount, buffer, &len) < 0)
1787 break;
1788 m = buffer;
1789 continue;
1790 }
1791 count = fallback->listeners;
1792 break;
1793 }
1794 return count;
1795 }
1796