1 /* Icecast
2  *
3  * This program is distributed under the GNU General Public License, version 2.
4  * A copy of this license is included with this source.
5  *
6  * Copyright 2000-2014, Karl Heyes <karl@kheyes.plus.com>
7  * Copyright 2000-2004, Jack Moffitt <jack@xiph.org>,
8  *                      Michael Smith <msmith@xiph.org>,
9  *                      oddsock <oddsock@xiph.org>,
10  *                      Karl Heyes <karl@xiph.org>
11  *                      and others (see AUTHORS for details).
12  */
13 
14 /* -*- c-basic-offset: 4; indent-tabs-mode: nil; -*- */
15 #ifdef HAVE_CONFIG_H
16 #include <config.h>
17 #endif
18 
19 #ifdef WIN32
20 #include <winsock2.h>
21 #include <windows.h>
22 #endif
23 
24 #include "compat.h"
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <string.h>
28 #include <limits.h>
29 #include <sys/types.h>
30 #include <ogg/ogg.h>
31 #include <errno.h>
32 
33 #ifdef HAVE_UNISTD_H
34 #include <unistd.h>
35 #endif
36 #ifdef HAVE_SYS_SOCKET_H
37 #include <sys/socket.h>
38 #endif
39 #ifdef HAVE_SYS_WAIT_H
40 #include <sys/wait.h>
41 #endif
42 
43 #include "thread/thread.h"
44 #include "avl/avl.h"
45 #include "httpp/httpp.h"
46 #include "net/sock.h"
47 
48 #include "connection.h"
49 #include "global.h"
50 #include "refbuf.h"
51 #include "client.h"
52 #include "stats.h"
53 #include "logging.h"
54 #include "cfgfile.h"
55 #include "util.h"
56 #include "source.h"
57 #include "format.h"
58 #include "fserve.h"
59 #include "auth.h"
60 #include "slave.h"
61 
62 #undef CATMODULE
63 #define CATMODULE "source"
64 
65 #define MAX_FALLBACK_DEPTH 10
66 
67 
68 /* avl tree helper */
69 static void _parse_audio_info (source_t *source, const char *s);
70 static void source_client_release (client_t *client);
71 static int  source_listener_release (source_t *source, client_t *client);
72 static int  source_client_read (client_t *client);
73 static int  source_client_shutdown (client_t *client);
74 static int  source_client_http_send (client_t *client);
75 static int  send_to_listener (client_t *client);
76 static int  send_listener (source_t *source, client_t *client);
77 static int  wait_for_restart (client_t *client);
78 static int  wait_for_other_listeners (client_t *client);
79 
80 static int  http_source_listener (client_t *client);
81 static int  http_source_intro (client_t *client);
82 static int  http_source_introfile (client_t *client);
83 static int  locate_start_on_queue (source_t *source, client_t *client);
84 static int  listener_change_worker (client_t *client, source_t *source);
85 static int  source_change_worker (source_t *source, client_t *client);
86 static int  source_client_callback (client_t *client);
87 static int  source_set_override (mount_proxy *mountinfo, source_t *dest_source, format_type_t type);
88 
89 #ifdef _WIN32
90 #define source_run_script(x,y)  WARN0("on [dis]connect scripts disabled");
91 #else
92 static void source_run_script (char *command, char *mountpoint);
93 #endif
94 
95 struct _client_functions source_client_ops =
96 {
97     source_client_read,
98     client_destroy
99 };
100 
101 struct _client_functions source_client_halt_ops =
102 {
103     source_client_shutdown,
104     source_client_release
105 };
106 
107 struct _client_functions listener_client_ops =
108 {
109     send_to_listener,
110     client_destroy
111 };
112 
113 struct _client_functions listener_pause_ops =
114 {
115     wait_for_restart,
116     client_destroy
117 };
118 
119 struct _client_functions listener_wait_ops =
120 {
121     wait_for_other_listeners,
122     client_destroy
123 };
124 
125 struct _client_functions source_client_http_ops =
126 {
127     source_client_http_send,
128     source_client_release
129 };
130 
131 
132 /* Allocate a new source with the stated mountpoint, if one already
133  * exists with that mountpoint in the global source tree then return
134  * NULL.
135  */
source_reserve(const char * mount,int flags)136 source_t *source_reserve (const char *mount, int flags)
137 {
138     source_t *src = NULL;
139 
140     do
141     {
142         avl_tree_wlock (global.source_tree);
143         src = source_find_mount_raw (mount);
144         if (src)
145         {
146             if ((flags & 1) == 0)
147                 src = NULL;
148             else if (src->flags & SOURCE_LISTENERS_SYNC)
149                 src = NULL;
150             break;
151         }
152 
153         src = calloc (1, sizeof(source_t));
154         if (src == NULL)
155             break;
156 
157         /* make duplicates for strings or similar */
158         src->mount = strdup (mount);
159         src->listener_send_trigger = 16000;
160         src->format = calloc (1, sizeof(format_plugin_t));
161         src->clients = avl_tree_new (client_compare, NULL);
162         src->intro_file = -1;
163         src->preroll_log_id = -1;
164 
165         thread_rwlock_create (&src->lock);
166         thread_spin_create (&src->shrink_lock);
167         src->flags |= SOURCE_RESERVED;
168 
169         avl_insert (global.source_tree, src);
170 
171     } while (0);
172 
173     if (src)
174         thread_rwlock_wlock (&src->lock);
175     avl_tree_unlock (global.source_tree);
176     return src;
177 }
178 
179 
180 /* Find a mount with this raw name - ignoring fallbacks. You should have the
181  * global source tree locked to call this.
182  */
source_find_mount_raw(const char * mount)183 source_t *source_find_mount_raw(const char *mount)
184 {
185     source_t *source;
186     avl_node *node;
187     int cmp;
188 
189     if (!mount) {
190         return NULL;
191     }
192     /* get the root node */
193     node = global.source_tree->root->right;
194 
195     while (node) {
196         source = (source_t *)node->key;
197         cmp = strcmp (mount, source->mount);
198         if (cmp < 0)
199             node = node->left;
200         else if (cmp > 0)
201             node = node->right;
202         else
203             return source;
204     }
205 
206     /* didn't find it */
207     return NULL;
208 }
209 
210 
211 /* Search for mount, if the mount is there but not currently running then
212  * check the fallback, and so on.  Must have a global source lock to call
213  * this function.
214  */
source_find_mount(const char * mount)215 source_t *source_find_mount (const char *mount)
216 {
217     source_t *source = NULL;
218     ice_config_t *config;
219     mount_proxy *mountinfo;
220     int depth = 0;
221 
222     config = config_get_config();
223     while (mount && depth < MAX_FALLBACK_DEPTH)
224     {
225         source = source_find_mount_raw (mount);
226 
227         if (source)
228         {
229             if (source_available (source))
230                 break;
231         }
232 
233         /* we either have a source which is not active (relay) or no source
234          * at all. Check the mounts list for fallback settings
235          */
236         mountinfo = config_find_mount (config, mount);
237         source = NULL;
238 
239         if (mountinfo == NULL)
240             break;
241         mount = mountinfo->fallback_mount;
242         depth++;
243     }
244 
245     config_release_config();
246     return source;
247 }
248 
249 
source_compare_sources(void * arg,void * a,void * b)250 int source_compare_sources(void *arg, void *a, void *b)
251 {
252     source_t *srca = (source_t *)a;
253     source_t *srcb = (source_t *)b;
254 
255     return strcmp(srca->mount, srcb->mount);
256 }
257 
258 
source_clear_source(source_t * source)259 void source_clear_source (source_t *source)
260 {
261     DEBUG2 ("clearing source \"%s\" %p", source->mount, source);
262 
263     if (source->dumpfile)
264     {
265         INFO1 ("Closing dumpfile for %s", source->mount);
266         fclose (source->dumpfile);
267         source->dumpfile = NULL;
268     }
269 
270     /* flush out the stream data, we don't want any left over */
271     while (source->stream_data)
272     {
273         refbuf_t *to_go = source->stream_data;
274         source->stream_data = to_go->next;
275         to_go->next = NULL;
276         if (source->format->detach_queue_block)
277             source->format->detach_queue_block (source, to_go);
278         refbuf_release (to_go);
279     }
280     source->min_queue_point = NULL;
281     source->stream_data_tail = NULL;
282 
283     source->min_queue_size = 0;
284     source->min_queue_offset = 0;
285     source->default_burst_size = 0;
286     source->queue_size = 0;
287     source->queue_size_limit = 0;
288     source->client_stats_update = 0;
289     source->shrink_pos = 0;
290     source->shrink_time = 0;
291     util_dict_free (source->audio_info);
292     source->audio_info = NULL;
293     rate_free (source->out_bitrate);
294     source->out_bitrate = NULL;
295     rate_free (source->in_bitrate);
296     source->in_bitrate = NULL;
297 
298     free(source->dumpfilename);
299     source->dumpfilename = NULL;
300 
301     free (source->intro_filename);
302     source->intro_filename = NULL;
303     file_close (&source->intro_file);
304     log_close (source->preroll_log_id);
305     source->preroll_log_id = -1;
306 }
307 
308 
309 /* the internal free function. at this point we know the source is
310  * not on the source tree */
_free_source(void * p)311 static int _free_source (void *p)
312 {
313     source_t *source = p;
314     source_clear_source (source);
315 
316     /* make sure all YP entries have gone */
317     yp_remove (source->mount);
318 
319     /* There should be no listeners on this mount */
320     if (source->listeners)
321         WARN3("active listeners on mountpoint %s (%ld, %ld)", source->mount, source->listeners, source->termination_count);
322     avl_tree_free (source->clients, NULL);
323 
324     thread_rwlock_unlock (&source->lock);
325     thread_rwlock_destroy (&source->lock);
326     thread_spin_destroy (&source->shrink_lock);
327 
328     INFO1 ("freeing source \"%s\"", source->mount);
329     format_plugin_clear (source->format, source->client);
330 
331     cached_prune (source->intro_ipcache);
332     free (source->intro_ipcache);
333     source->intro_ipcache = NULL;
334 
335     free (source->format);
336     free (source->mount);
337     free (source);
338     return 1;
339 }
340 
341 
342 // drop source from tree, so it cannot be found by name. No lock on source on entry but
343 // lock still active on return (stats cleared)
drop_source_from_tree(source_t * source)344 static void drop_source_from_tree (source_t *source)
345 {
346     if (source->flags & SOURCE_RESERVED)
347     {
348         avl_tree_wlock (global.source_tree);
349         avl_delete (global.source_tree, source, NULL);
350 
351         source->flags &= ~SOURCE_RESERVED;
352         // this is only called from the sources client processing
353         if (source->stats)
354         {
355             DEBUG1 ("stats still referenced on %s", source->mount);
356             stats_lock (source->stats, source->mount);
357             stats_set (source->stats, NULL, NULL);
358             source->stats = 0;
359         }
360         avl_tree_unlock (global.source_tree);
361         DEBUG2 ("removed source %s (%p) from tree", source->mount, source);
362     }
363     thread_rwlock_wlock (&source->lock);
364 }
365 
366 
367 /* Remove the provided source from the global tree and free it */
source_free_source(source_t * source)368 void source_free_source (source_t *source)
369 {
370     //INFO1 ("source %s to be freed", source->mount);
371     drop_source_from_tree (source);
372     _free_source (source);
373 }
374 
375 
source_find_client(source_t * source,uint64_t id)376 client_t *source_find_client(source_t *source, uint64_t id)
377 {
378     client_t fakeclient;
379     void *result = NULL;
380 
381     fakeclient.connection.id = id;
382 
383     avl_get_by_key (source->clients, &fakeclient, &result);
384     return result;
385 }
386 
listener_skips_intro(cache_file_contents * cache,client_t * client,int allow)387 static void listener_skips_intro (cache_file_contents *cache, client_t *client, int allow)
388 {
389     struct node_IP_time *result = calloc (1, sizeof (struct node_IP_time));
390 
391     snprintf (result->ip, sizeof (result->ip), "%s", client->connection.ip);
392     result->a.timeout = client->worker->current_time.tv_sec + allow;
393     avl_insert (cache->contents, result);
394     DEBUG1 ("Added intro skip entry for %s", &result->ip[0]);
395 }
396 
397 
listener_check_intro(cache_file_contents * cache,client_t * client,int allow)398 static int listener_check_intro (cache_file_contents *cache, client_t *client, int allow)
399 {
400     int i, ret = 0;
401 
402     if (cache == NULL || cache->contents == NULL)
403         return 0;
404     cache->deletions_count = 0;
405     do
406     {
407         struct node_IP_time *result;
408         const char *ip = client->connection.ip;
409         time_t now = client->worker->current_time.tv_sec;
410 
411         cache->file_recheck = now;
412         if (avl_get_by_key (cache->contents, (char*)ip, (void*)&result) == 0)
413         {
414             ret = 1;
415             if (result->a.timeout > now)
416             {
417                 DEBUG1 ("skipping intro for %s", result->ip);
418                 client->intro_offset = -1;
419                 break;
420             }
421             DEBUG1 ("found intro skip entry for %s, refreshing", result->ip);
422             result->a.timeout = now + allow;
423         }
424     } while (0);
425 
426     for (i = 0; i < cache->deletions_count; ++i)
427     {
428         struct node_IP_time *to_go = cache->deletions[i];
429 
430         INFO1 ("removing %s from intro list", &(to_go->ip[0]));
431         avl_delete (cache->contents, &(to_go->ip[0]), cached_treenode_free);
432     }
433     return ret;
434 }
435 
436 
source_convert_qvalue(source_t * source,uint32_t value)437 uint32_t source_convert_qvalue (source_t *source, uint32_t value)
438 {
439     if (value & 0x80000000)
440     {   // so in secs;
441         value &= ~0x80000000;
442         return source->incoming_rate * value;
443     }
444     return value;
445 }
446 
447 
448 /* Update stats from source processing, this should be called regulary (every
449  * few seconds) to keep totals up to date.
450  */
update_source_stats(source_t * source)451 static void update_source_stats (source_t *source)
452 {
453     unsigned long incoming_rate = (long)rate_avg (source->in_bitrate);
454     unsigned long kbytes_sent = (source->format->sent_bytes - source->bytes_sent_at_update)/1024;
455     unsigned long kbytes_read = source->bytes_read_since_update/1024;
456 
457     stats_lock (source->stats, source->mount);
458     stats_set_args (source->stats, "outgoing_kbitrate", "%ld",
459             (long)(8 * rate_avg (source->out_bitrate))/1024);
460     stats_set_args (source->stats, "incoming_bitrate", "%ld", (8 * incoming_rate));
461     stats_set_args (source->stats, "total_bytes_read", "%"PRIu64, source->format->read_bytes);
462     stats_set_args (source->stats, "total_bytes_sent", "%"PRIu64, source->format->sent_bytes);
463     stats_set_args (source->stats, "total_mbytes_sent",
464             "%"PRIu64, source->format->sent_bytes/(1024*1024));
465     stats_set_args (source->stats, "queue_size", "%u", source->queue_size);
466     if (source->client->connection.con_time)
467     {
468         worker_t *worker = source->client->worker;
469         stats_set_args (source->stats, "connected", "%"PRIu64,
470                 (uint64_t)(worker->current_time.tv_sec - source->client->connection.con_time));
471     }
472     stats_release (source->stats);
473     stats_event_add (NULL, "stream_kbytes_sent", kbytes_sent);
474     stats_event_add (NULL, "stream_kbytes_read", kbytes_read);
475     if (incoming_rate)
476     {
477         int log = 0;
478         uint32_t qlen = (float)source_convert_qvalue (source, source->queue_len_value);
479         if (qlen > 0)
480         {
481             float ratio = source->queue_size_limit / (float)qlen;
482             if (ratio < 0.85 || ratio > 1.15)
483                 log = 1;    // sizeable change in result so log it
484         }
485         source->queue_size_limit = qlen;
486         source->min_queue_size = source_convert_qvalue (source, source->min_queue_len_value);
487         source->default_burst_size = source_convert_qvalue (source, source->default_burst_value);
488         //DEBUG3 ("%s, burst %d, %d", source->mount, (source->default_burst_value&(1<<31))?1:0, source->default_burst_value&(~(1<<31)));
489 
490         // sanity checks
491         if (source->default_burst_size > 50000000)
492             source->default_burst_size = 100000;
493         if (source->queue_size_limit > 1000000000)
494             source->queue_size_limit = 1000000;
495         if (source->min_queue_size > 50000000 || source->min_queue_size < source->default_burst_size)
496             source->min_queue_size = source->default_burst_size;
497         if (source->min_queue_size + (incoming_rate<<2) > source->queue_size_limit)
498         {
499             source->queue_size_limit = source->min_queue_size + (incoming_rate<<2);
500             INFO1 ("Adjusting queue size limit higher to allow for a minimum on %s", source->mount);
501             source->queue_len_value = source->queue_size_limit;
502         }
503 
504         if (log)
505         {
506             DEBUG2 ("%s queue size set to %u", source->mount, source->queue_size_limit);
507             DEBUG2 ("%s min queue size set to %u", source->mount, source->min_queue_size);
508             DEBUG2 ("%s burst size set to %u", source->mount, source->default_burst_size);
509         }
510     }
511 
512     source->bytes_sent_at_update = source->format->sent_bytes;
513     source->bytes_read_since_update %= 1024;
514     source->listener_send_trigger = incoming_rate < 8000 ? 8000 : (8000 + (incoming_rate>>4));
515     if (incoming_rate)
516         source->incoming_adj = 2000000/incoming_rate;
517     else
518         source->incoming_adj = 20;
519     source->stats_interval = 5 + (global.sources >> 10);
520 }
521 
522 
source_add_queue_buffer(source_t * source,refbuf_t * r)523 void source_add_queue_buffer (source_t *source, refbuf_t *r)
524 {
525     source->bytes_read_since_update += r->len;
526 
527     r->flags |= SOURCE_QUEUE_BLOCK;
528 
529     /* append buffer to the in-flight data queue,  */
530     if (source->stream_data == NULL)
531     {
532         mount_proxy *mountinfo = config_find_mount (config_get_config(), source->mount);
533         if (mountinfo)
534         {
535             source_set_intro (source, mountinfo->intro_filename);
536             source_set_override (mountinfo, source, source->format->type);
537         }
538         config_release_config();
539 
540         source->stream_data = r;
541         source->min_queue_point = r;
542         source->min_queue_offset = 0;
543     }
544     if (source->stream_data_tail)
545         source->stream_data_tail->next = r;
546     source->buffer_count++;
547 
548     source->stream_data_tail = r;
549     source->queue_size += r->len;
550     source->wakeup = 1;
551 
552     /* move the starting point for new listeners */
553     source->min_queue_offset += r->len;
554 
555     if ((source->buffer_count & 3) == 3)
556         source->incoming_rate = (long)rate_avg (source->in_bitrate);
557 
558     /* save stream to file */
559     if (source->dumpfile && source->format->write_buf_to_file)
560         source->format->write_buf_to_file (source, r);
561 
562     if (source->shrink_time == 0 && (source->buffer_count & 31) == 31)
563     {
564         // kick off timed response to find oldest buffer. Every so many buffers
565         source->shrink_pos = source->client->queue_pos - source->min_queue_offset;
566         source->shrink_time = source->client->worker->time_ms + 600;
567     }
568 }
569 
570 
571 /* get some data from the source. The stream data is placed in a refbuf
572  * and sent back, however NULL is also valid as in the case of a short
573  * timeout and there's no data pending.
574  */
source_read(source_t * source)575 int source_read (source_t *source)
576 {
577     client_t *client = source->client;
578     refbuf_t *refbuf = NULL;
579     int skip = 1, loop = 1;
580     time_t current = client->worker->current_time.tv_sec;
581     unsigned long queue_size_target = 0;
582     int fds = 0;
583 
584     if (global.running != ICE_RUNNING)
585         source->flags &= ~SOURCE_RUNNING;
586     do
587     {
588         source->wakeup = 0;
589         client->schedule_ms = client->worker->time_ms;
590         if (source->flags & SOURCE_LISTENERS_SYNC)
591         {
592             if (source->termination_count > 0)
593             {
594                 if (client->timer_start + 1000 < client->worker->time_ms)
595                 {
596                     source->flags &= ~(SOURCE_RUNNING|SOURCE_LISTENERS_SYNC);
597                     WARN1 ("stopping %s as sync mode lasted too long", source->mount);
598                 }
599                 client->schedule_ms += 30;
600                 return 0;
601             }
602             if (source->fallback.mount)
603             {
604                 DEBUG1 ("listeners have now moved to %s", source->fallback.mount);
605                 free (source->fallback.mount);
606                 source->fallback.mount = NULL;
607             }
608             source->flags &= ~SOURCE_LISTENERS_SYNC;
609         }
610         rate_add (source->out_bitrate, 0, client->worker->time_ms);
611         global_add_bitrates (global.out_bitrate, 0, client->worker->time_ms);
612 
613         if (source->prev_listeners != source->listeners)
614         {
615             INFO2("listener count on %s now %lu", source->mount, source->listeners);
616             source->prev_listeners = source->listeners;
617             stats_lock (source->stats, source->mount);
618             stats_set_args (source->stats, "listeners", "%lu", source->listeners);
619             if (source->listeners > source->peak_listeners)
620             {
621                 source->peak_listeners = source->listeners;
622                 stats_set_args (source->stats, "listener_peak", "%lu", source->peak_listeners);
623             }
624             stats_release (source->stats);
625         }
626         if (current >= source->client_stats_update)
627         {
628             update_source_stats (source);
629             if (current - client->connection.con_time < source->stats_interval)
630                 source->client_stats_update = current + 1;
631             else
632                 source->client_stats_update = current + source->stats_interval;
633             if (source_change_worker (source, client))
634                 return 1;
635         }
636 
637         fds = util_timed_wait_for_fd (client->connection.sock, 0);
638         if (fds < 0)
639         {
640             if (! sock_recoverable (sock_error()))
641             {
642                 WARN0 ("Error while waiting on socket, Disconnecting source");
643                 source->flags &= ~SOURCE_RUNNING;
644                 return 0;
645             }
646             break;
647         }
648         if (fds == 0)
649         {
650             if (source->last_read + (time_t)3 == current)
651                 WARN1 ("Nothing received on %s for 3 seconds", source->mount);
652             if (source->last_read + (time_t)source->timeout < current)
653             {
654                 DEBUG3 ("last %ld, timeout %d, now %ld", (long)source->last_read,
655                         source->timeout, (long)current);
656                 WARN1 ("Disconnecting %s due to socket timeout", source->mount);
657                 source->flags &= ~SOURCE_RUNNING;
658                 source->flags |= SOURCE_TIMEOUT;
659                 return 0;
660             }
661             source->skip_duration = (int)((source->skip_duration + 12) * 1.1);
662             if (source->skip_duration > 400)
663                 source->skip_duration = 400;
664             break;
665         }
666 
667         source->last_read = current;
668         unsigned int prev_qsize = source->queue_size;
669         do
670         {
671             refbuf = source->format->get_buffer (source);
672             if (refbuf)
673                 source_add_queue_buffer (source, refbuf);
674 
675             skip = 0;
676 
677             if (client->connection.error)
678             {
679                 INFO1 ("End of Stream %s", source->mount);
680                 source->flags &= ~SOURCE_RUNNING;
681                 return 0;
682             }
683             loop--;
684         } while (loop);
685 
686         if (source->queue_size != prev_qsize)
687         {
688             uint64_t sync_off = source->min_queue_offset, off = sync_off;
689             refbuf_t *sync_point = source->min_queue_point, *ref = sync_point;
690             while (off > source->min_queue_size)
691             {
692                 refbuf_t *to_release = ref;
693                 if (to_release && to_release->next)
694                 {
695                     if (to_release->flags & SOURCE_BLOCK_SYNC)
696                     {
697                         sync_off = off;
698                         sync_point = ref;
699                     }
700                     off -= to_release->len;
701                     ref = to_release->next;
702                     continue;
703                 }
704                 break;
705             }
706             source->min_queue_offset = sync_off;
707             source->min_queue_point = sync_point;
708             source->skip_duration = (long)(source->skip_duration * 0.9);
709         }
710 
711         if (source->shrink_time)
712         {
713             if (source->shrink_time > client->worker->time_ms)
714                 break;      // not time yet to consider the purging point
715             queue_size_target = (source->client->queue_pos - source->shrink_pos);
716             source->shrink_pos = 0;
717             source->shrink_time = 0;
718         }
719         /* lets see if we have too much/little data in the queue */
720         if ((queue_size_target < source->min_queue_size) || (queue_size_target > source->queue_size_limit))
721             queue_size_target = (source->listeners) ? source->queue_size_limit : source->min_queue_size;
722 
723         loop = 48 + (source->incoming_rate >> 13); // scale max on high bitrates
724         queue_size_target += 8000; // lets not be too tight to the limit
725         while (source->queue_size > queue_size_target && loop)
726         {
727             refbuf_t *to_go = source->stream_data;
728             if (to_go == NULL || to_go->next == NULL) // always leave at least one on the queue
729                 break;
730             source->stream_data = to_go->next;
731             source->queue_size -= to_go->len;
732             if (source->min_queue_point == to_go)
733             {
734                 // adjust min queue in line with expectations
735                 source->min_queue_offset -= to_go->len;
736                 source->min_queue_point = to_go->next;
737             }
738             to_go->next = NULL;
739             if (source->format->detach_queue_block)
740                 source->format->detach_queue_block (source, to_go);
741             refbuf_release (to_go);
742             loop--;
743         }
744     } while (0);
745 
746     if (skip)
747         client->schedule_ms += source->skip_duration;
748     return 0;
749 }
750 
751 
source_listeners_wakeup(source_t * source)752 void source_listeners_wakeup (source_t *source)
753 {
754     client_t *s = source->client;
755     avl_node *node = avl_get_first (source->clients);
756     while (node)
757     {
758         client_t *client = (client_t *)node->key;
759         if (s->schedule_ms + 100 < client->schedule_ms)
760             DEBUG2 ("listener on %s was ahead by %ld", source->mount, (long)(client->schedule_ms - s->schedule_ms));
761         client->schedule_ms = 0;
762         node = avl_get_next (node);
763     }
764 }
765 
766 
source_client_read(client_t * client)767 static int source_client_read (client_t *client)
768 {
769     source_t *source = client->shared_data;
770 
771     if (source == NULL)
772     {
773         INFO1 ("source client from %s hijacked", client->connection.ip);
774         return -1;
775     }
776 
777     thread_rwlock_wlock (&source->lock);
778     if (client->connection.discon.time &&
779             client->connection.discon.time <= client->worker->current_time.tv_sec)
780     {
781         source->flags &= ~SOURCE_RUNNING;
782         INFO1 ("streaming duration expired on %s", source->mount);
783     }
784     if (source_running (source))
785     {
786         if (source->limit_rate)
787         {
788             if (source->limit_rate < (8 * source->incoming_rate) && global.running == ICE_RUNNING)
789             {
790                 rate_add (source->in_bitrate, 0, client->worker->current_time.tv_sec);
791                 source->incoming_rate = (long)rate_avg (source->in_bitrate);
792                 thread_rwlock_unlock (&source->lock);
793                 client->schedule_ms += 310;
794                 return 0;
795             }
796         }
797         if (source_read (source) > 0)
798             return 1;
799         if (source_running (source))
800         {
801             thread_rwlock_unlock (&source->lock);
802             return 0;
803         }
804     }
805     if ((source->flags & SOURCE_TERMINATING) == 0)
806     {
807         source_shutdown (source, 1);
808 
809         if (source->wait_time == 0)
810         {
811             thread_rwlock_unlock (&source->lock);
812             drop_source_from_tree (source);
813         }
814     }
815 
816     if (source->termination_count && source->termination_count <= (long)source->listeners)
817     {
818         if (client->timer_start + 1000 < client->worker->time_ms)
819         {
820             WARN2 ("%ld listeners still to process in terminating %s", source->termination_count, source->mount);
821             if (source->listeners != source->clients->length)
822             {
823                 WARN3 ("source %s has inconsistent listeners (%ld, %u)", source->mount, source->listeners, source->clients->length);
824                 source->listeners = source->clients->length;
825             }
826             source->flags &= ~SOURCE_TERMINATING;
827         }
828         else
829             DEBUG4 ("%p %s waiting (%lu, %lu)", source, source->mount, source->termination_count, source->listeners);
830         client->schedule_ms = client->worker->time_ms + 50;
831     }
832     else
833     {
834         if (source->listeners)
835         {
836             INFO1 ("listeners on terminating source %s, rechecking", source->mount);
837             source->termination_count = source->listeners;
838             client->timer_start = client->worker->time_ms;
839             source->flags &= ~SOURCE_PAUSE_LISTENERS;
840             source->flags |= (SOURCE_TERMINATING|SOURCE_LISTENERS_SYNC);
841             source_listeners_wakeup (source);
842             thread_rwlock_unlock (&source->lock);
843             return 0;
844         }
845         free (source->fallback.mount);
846         source->fallback.mount = NULL;
847         source->flags &= ~SOURCE_LISTENERS_SYNC;
848         client->connection.discon.time = 0;
849         client->ops = &source_client_halt_ops;
850         global_lock();
851         global.sources--;
852         stats_event_args (NULL, "sources", "%d", global.sources);
853         global_unlock();
854         if (source->wait_time == 0 || global.running != ICE_RUNNING)
855         {
856             INFO1 ("no more listeners on %s", source->mount);
857             return -1;   // don't unlock source as the release is called which requires it
858         }
859         /* set a wait time for leaving the source reserved */
860         client->connection.discon.time = client->worker->current_time.tv_sec + source->wait_time;
861         client->schedule_ms = client->worker->time_ms + (1000 * source->wait_time);
862         INFO2 ("listeners gone, keeping %s reserved for %ld seconds", source->mount, (long)source->wait_time);
863     }
864     thread_rwlock_unlock (&source->lock);
865     return 0;
866 }
867 
868 
source_add_bytes_sent(struct rate_calc * out_bitrate,unsigned long written,uint64_t milli,uint64_t * sent_bytes)869 void source_add_bytes_sent (struct rate_calc *out_bitrate, unsigned long written, uint64_t milli, uint64_t *sent_bytes)
870 {
871     rate_add_sum (out_bitrate, written, milli, sent_bytes);
872     global_add_bitrates (global.out_bitrate, written, milli);
873 }
874 
875 
source_queue_advance(client_t * client)876 static int source_queue_advance (client_t *client)
877 {
878     static unsigned char offset = 0;
879     unsigned long written = 0;
880     source_t *source = client->shared_data;
881     refbuf_t *refbuf;
882     uint64_t lag;
883 
884     if (client->refbuf == NULL && locate_start_on_queue (source, client) < 0)
885         return -1;
886 
887     lag = source->client->queue_pos - client->queue_pos;
888 
889     if (client->flags & CLIENT_HAS_INTRO_CONTENT) abort(); // trap
890 
891     if (lag == 0)
892     {
893         // most listeners will be through here, so a minor spread should limit a wave of sends
894         int ret = (offset & 31);
895         offset++;   // this can be a race as it helps for randomizing
896         client->schedule_ms += 5 + ((source->incoming_adj>>1) + ret);
897         client->wakeup = &source->wakeup; // allow for quick wakeup
898         return -1;
899     }
900     client->wakeup = NULL;
901     if (lag > source->queue_size || (lag == source->queue_size && client->pos))
902     {
903         INFO4 ("Client %" PRIu64 " (%s) has fallen too far behind (%"PRIu64") on %s, removing",
904                 client->connection.id, client->connection.ip, client->queue_pos, source->mount);
905         stats_lock (source->stats, source->mount);
906         stats_set_inc (source->stats, "slow_listeners");
907         stats_release (source->stats);
908         client->refbuf = NULL;
909         client->connection.error = 1;
910         return -1;
911     }
912     if ((lag+source->incoming_rate) > source->queue_size_limit && client->connection.error == 0)
913     {
914         // if the listener is really lagging but has been received a decent
915         // amount of data then allow a requeue, else allow the drop
916         if (client->counter > (source->queue_size_limit << 1))
917         {
918             const char *p = httpp_get_query_param (client->parser, "norequeue");
919             if (p == NULL)
920             {
921                 // we may need to copy the complete frame for private use
922                 if (client->pos < client->refbuf->len)
923                 {
924                     refbuf_t *copy = source->format->qblock_copy (client->refbuf);
925                     client->refbuf = copy;
926                     client->flags |= CLIENT_HAS_INTRO_CONTENT;
927                     DEBUG2 ("client %s requeued copy on %s", client->connection.ip, source->mount);
928                 }
929                 else
930                 {
931                     client->refbuf = NULL;
932                     client->pos = 0;
933                 }
934                 client->timer_start = 0;
935                 client->check_buffer = http_source_introfile;
936                 // do not be too eager to refill socket buffer
937                 client->schedule_ms += source->incoming_rate < 16000 ? source->incoming_rate/16 : 800;
938                 return -1;
939             }
940         }
941     }
942     int loop = 50;
943     while (--loop)
944     {
945         refbuf = client->refbuf;
946         if ((refbuf->flags & SOURCE_QUEUE_BLOCK) == 0 || refbuf->len > 66000)  abort();
947 
948         int ret = 0;
949 
950         if (client->pos < refbuf->len)
951             ret = source->format->write_buf_to_client (client);
952         if (ret > 0)
953             written += ret;
954         if (client->pos >= refbuf->len)
955         {
956             if (refbuf->next)
957             {
958                 client->refbuf = refbuf->next;
959                 client->pos = 0;
960                 continue;
961             }
962         }
963         break;
964     }
965     if (written)
966         source_add_bytes_sent (source->out_bitrate, written, client->worker->time_ms, &source->format->sent_bytes);
967     return -1;
968 }
969 
970 
locate_start_on_queue(source_t * source,client_t * client)971 static int locate_start_on_queue (source_t *source, client_t *client)
972 {
973     refbuf_t *refbuf;
974     long lag = 0;
975 
976     /* we only want to attempt a burst at connection time, not midstream
977      * however streams like theora may not have the most recent page marked as
978      * a starting point, so look for one from the burst point */
979     if (client->connection.error || source->stream_data_tail == NULL)
980         return -1;
981     refbuf = source->stream_data_tail;
982     if (client->connection.sent_bytes > source->min_queue_offset && (refbuf->flags & SOURCE_BLOCK_SYNC))
983     {
984         lag = refbuf->len;
985     }
986     else
987     {
988         size_t size = source->min_queue_size;
989         uint32_t v = -1;
990         const char *param = httpp_getvar (client->parser, "initial-burst");
991 
992         if (param)
993             config_qsizing_conv_a2n (param, &v);
994         else
995         {
996             param = httpp_get_query_param (client->parser, "burst");
997             if (param)
998                 config_qsizing_conv_a2n (param, &v);
999         }
1000         if (param)
1001         {
1002             v = source_convert_qvalue (source, (uint32_t)v);
1003             DEBUG4 ("listener from %s (on %s) requested burst (%s, %u)", &client->connection.ip[0], source->mount, param, v);
1004         }
1005         else
1006             v = source->default_burst_size;
1007 
1008         if (v > client->connection.sent_bytes)
1009         {
1010             v -= client->connection.sent_bytes; /* have we sent data already */
1011             refbuf = source->min_queue_point;
1012             lag = source->min_queue_offset;
1013             // DEBUG3 ("size %lld, v %lld, lag %ld", size, v, lag);
1014             while (size > v && refbuf && refbuf->next)
1015             {
1016                 size -= refbuf->len;
1017                 lag -= refbuf->len;
1018                 refbuf = refbuf->next;
1019             }
1020             if (lag < 0)
1021                 ERROR1 ("Odd, lag is negative %ld", lag);
1022         }
1023         else
1024             lag = refbuf->len;
1025     }
1026 
1027     while (refbuf)
1028     {
1029         if (refbuf->flags & SOURCE_BLOCK_SYNC)
1030         {
1031             client_set_queue (client, NULL);
1032             client->refbuf = refbuf;
1033             client->intro_offset = -1;
1034             client->pos = 0;
1035             client->counter = 0;
1036             client->queue_pos = source->client->queue_pos - lag;
1037             client->flags &= ~CLIENT_HAS_INTRO_CONTENT;
1038             DEBUG4 ("%s Joining queue on %s (%"PRIu64 ", %"PRIu64 ")", &client->connection.ip[0], source->mount, source->client->queue_pos, client->queue_pos);
1039             return 0;
1040         }
1041         lag -= refbuf->len;
1042         refbuf = refbuf->next;
1043     }
1044     client->schedule_ms += 150;
1045     return -1;
1046 }
1047 
source_preroll_logging(source_t * source,client_t * client)1048 static void source_preroll_logging (source_t *source, client_t *client)
1049 {
1050     if (source->intro_filename == NULL || client->intro_offset < 0 || (client->flags & CLIENT_HAS_INTRO_CONTENT))
1051         return; // content provided separately, auth or queue block copy
1052     if (source->preroll_log_id < 0)
1053     {
1054         ice_config_t *config = config_get_config();
1055         if (config->preroll_log.logid >= 0)
1056             logging_preroll (config->preroll_log.logid, source->intro_filename, client);
1057         config_release_config();
1058     }
1059     else
1060         logging_preroll (source->preroll_log_id, source->intro_filename, client);
1061 }
1062 
1063 
http_source_introfile(client_t * client)1064 static int http_source_introfile (client_t *client)
1065 {
1066     source_t *source = client->shared_data;
1067     long duration, rate = source->incoming_rate, incoming_rate;
1068     int assumed = 0;
1069 
1070     //DEBUG2 ("client intro_pos is %ld, sent bytes is %ld", client->intro_offset, client->connection.sent_bytes);
1071     if (format_file_read (client, source->format, source->intro_file) < 0)
1072     {
1073         source_preroll_logging (source, client);
1074         if (source->stream_data_tail)
1075         {
1076             if (source->intro_skip_replay)
1077                 listener_skips_intro (source->intro_ipcache, client, source->intro_skip_replay);
1078             /* better find the right place in queue for this client */
1079             if (source->format->detach_queue_block)
1080                 source->format->detach_queue_block (source, client->refbuf); // in case of private queue
1081             client_set_queue (client, NULL);
1082             client->check_buffer = source_queue_advance;
1083             client->intro_offset = -1;
1084             return source_queue_advance (client);
1085         }
1086         client->schedule_ms += 100;
1087         client->intro_offset = source->intro_start;  /* replay intro file */
1088         return -1;
1089     }
1090 
1091     if (rate == 0) // stream must of just started, make an assumption, re-evaluate next time
1092     {
1093         rate = 32000;
1094         assumed = 1;
1095     }
1096     if (client->timer_start == 0)
1097     {
1098         long to_send = 0;
1099         if (client->connection.sent_bytes < source->default_burst_size)
1100             to_send = source->default_burst_size - client->connection.sent_bytes;
1101         duration = (long)((float)to_send / rate);
1102         client->aux_data = duration + 8;
1103         client->timer_start = client->worker->current_time.tv_sec - client->aux_data;
1104         client->counter = 8 * rate;
1105     }
1106     incoming_rate = rate;
1107     duration = (client->worker->current_time.tv_sec - client->timer_start);
1108     if (duration)
1109         rate = (long)((float)client->counter / duration);
1110     // DEBUG4 ("duration %lu, counter %ld, rate %ld, bytes %ld", duration, client->counter, rate, client->connection.sent_bytes);
1111 
1112     if (assumed)
1113         client->timer_start = 0;  // force a reset for next time around, ignore rate checks this time
1114 
1115     if (rate > incoming_rate)
1116     {
1117         //DEBUG1 ("rate too high %lu, delaying", rate);
1118         client->schedule_ms += 40;
1119         rate_add (source->in_bitrate, 0, client->worker->current_time.tv_sec);
1120         return -1;
1121     }
1122     if (source->format->sent_bytes > (incoming_rate << 5) && // allow at least 30+ seconds on the stream
1123             (duration - client->aux_data) > 15 &&
1124             rate < (incoming_rate>>1))
1125     {
1126         INFO3 ("Dropped listener %s (%" PRIu64 "), running too slow on %s", &client->connection.ip[0], client->connection.id, source->mount);
1127         source_preroll_logging (source, client);
1128         client->connection.error = 1;
1129         client_set_queue (client, NULL);
1130         return -1; // assume a slow/stalled connection so drop
1131     }
1132 
1133     int ret = source->format->write_buf_to_client (client);
1134     if (client->connection.error)
1135         source_preroll_logging (source, client);
1136     return ret;
1137 }
1138 
1139 
http_source_intro(client_t * client)1140 static int http_source_intro (client_t *client)
1141 {
1142     /* we only need to send the intro if nothing else has been sent */
1143     if (client->intro_offset < 0)
1144     {
1145         client_set_queue (client, NULL);
1146         client->check_buffer = source_queue_advance;
1147         return source_queue_advance (client);
1148     }
1149     source_t *source = client->shared_data;
1150     refbuf_t *n = client->refbuf ? client->refbuf->next : NULL;
1151     if (n)
1152         client->refbuf->next = NULL;
1153     refbuf_release (client->refbuf);
1154     client->refbuf = n;
1155     client->pos = 0;
1156     client->intro_offset = source->intro_start;
1157     client->check_buffer = http_source_introfile;
1158     return http_source_introfile (client);
1159 }
1160 
1161 
http_source_listener(client_t * client)1162 static int http_source_listener (client_t *client)
1163 {
1164     refbuf_t *refbuf = client->refbuf;
1165     source_t *source = client->shared_data;
1166     int ret;
1167 
1168     if (refbuf == NULL || client->pos == refbuf->len)
1169     {
1170         client->check_buffer = http_source_intro;
1171         return http_source_intro (client);
1172     }
1173     if (source->queue_size == 0)
1174     {
1175         client->schedule_ms += 500;
1176         return -1;  /* postpone processing until data on queue */
1177     }
1178 
1179     if (client->respcode == 0)
1180     {
1181         int (*build_headers)(format_plugin_t *, client_t *) = format_general_headers;
1182 
1183         if (source_running (source) == 0)
1184         {
1185             client->schedule_ms += 200;
1186             return -1;
1187         }
1188         if (source->format->create_client_data)
1189             build_headers = source->format->create_client_data;
1190 
1191         refbuf->len = 0;
1192         if (build_headers (source->format, client) < 0)
1193         {
1194             ERROR1 ("internal problem, dropping client %" PRIu64, client->connection.id);
1195             return -1;
1196         }
1197         stats_lock (source->stats, source->mount);
1198         stats_set_inc (source->stats, "listener_connections");
1199         stats_release (source->stats);
1200     }
1201     ret = format_generic_write_to_client (client);
1202     if (client->pos == refbuf->len)
1203     {
1204         if (client->flags & CLIENT_AUTHENTICATED)
1205         {
1206             client->check_buffer = http_source_intro;
1207             client->connection.sent_bytes = 0;
1208             return ret;
1209         }
1210         client->connection.error = 1;
1211         return -1;
1212     }
1213     client->schedule_ms += 200;
1214     return ret;
1215 }
1216 
1217 
1218 // detach client from the source, enter with lock (probably read) and exit with write lock.
source_listener_detach(source_t * source,client_t * client)1219 void source_listener_detach (source_t *source, client_t *client)
1220 {
1221     client->wakeup = NULL;
1222     if (client->check_buffer != http_source_listener) // not in http headers
1223     {
1224         refbuf_t *ref = client->refbuf;
1225 
1226         if (ref)
1227         {
1228             if (ref->flags & REFBUF_SHARED)  // on the queue
1229             {
1230                 if (client->connection.error == 0 && client->pos < ref->len && source->fallback.mount)
1231                 {
1232                     /* make a private copy so that a write can complete later */
1233                     refbuf_t *copy = source->format->qblock_copy (client->refbuf);
1234 
1235                     client->refbuf = copy;
1236                     client->flags |= CLIENT_HAS_INTRO_CONTENT;
1237                 }
1238                 else
1239                     client->refbuf = NULL;
1240             }
1241         }
1242         client->check_buffer = source->format->write_buf_to_client;
1243     }
1244     else
1245         client->check_buffer = NULL;
1246     thread_rwlock_unlock (&source->lock);   // read lock in use!
1247     thread_rwlock_wlock (&source->lock);
1248     avl_delete (source->clients, client, NULL);
1249 }
1250 
1251 
1252 /* used to hold listeners in waiting over a relay restart. Handling of a failed relay also
1253  * needs to occur.
1254  */
wait_for_restart(client_t * client)1255 static int wait_for_restart (client_t *client)
1256 {
1257     source_t *source = client->shared_data;
1258 
1259     if (client->timer_start && client->worker->current_time.tv_sec - client->timer_start > 15)
1260     {
1261         INFO1 ("Dropping listener, stuck in %s too long", source->mount);
1262         client->connection.error = 1; // in here too long, drop client
1263     }
1264 
1265     if (source_running (source) || client->connection.error ||
1266             (source->flags & SOURCE_PAUSE_LISTENERS) == 0 ||
1267             (source->flags & (SOURCE_TERMINATING|SOURCE_LISTENERS_SYNC)))
1268     {
1269         client->ops = &listener_client_ops;
1270         return 0;
1271     }
1272 
1273     if (source->flags & SOURCE_LISTENERS_SYNC)
1274         client->schedule_ms = client->worker->time_ms + 100;
1275     else
1276         client->schedule_ms = client->worker->time_ms + 300;
1277     return 0;
1278 }
1279 
1280 
1281 /* used to hold listeners that have already been processed while other listeners
1282  * are still to be done
1283  */
wait_for_other_listeners(client_t * client)1284 static int wait_for_other_listeners (client_t *client)
1285 {
1286     source_t *source = client->shared_data;
1287 
1288     if ((source->flags & (SOURCE_TERMINATING|SOURCE_LISTENERS_SYNC)) == SOURCE_LISTENERS_SYNC)
1289     {
1290         client->schedule_ms = client->worker->time_ms + 150;
1291         return 0;
1292     }
1293     client->ops = &listener_client_ops;
1294     return 0;
1295 }
1296 
1297 
1298 /* general send routine per listener.
1299  */
send_to_listener(client_t * client)1300 static int send_to_listener (client_t *client)
1301 {
1302     source_t *source = client->shared_data;
1303     int ret;
1304 
1305     if (source == NULL)
1306         return -1;
1307     if (thread_rwlock_tryrlock (&source->lock) != 0)
1308     {
1309         client->schedule_ms = client->worker->time_ms + 4;
1310         return 0; // probably busy, check next client, come back to this
1311     }
1312     ret = send_listener (source, client);
1313     if (ret == 1)
1314         return 1; // client moved, and source unlocked
1315     if (ret < 0)
1316         ret = source_listener_release (source, client);
1317     thread_rwlock_unlock (&source->lock);
1318     return ret;
1319 }
1320 
1321 
listener_waiting_on_source(source_t * source,client_t * client)1322 int listener_waiting_on_source (source_t *source, client_t *client)
1323 {
1324     int read_lock = 1, ret = 0;
1325     while (1)
1326     {
1327         if (client->connection.error)
1328         {
1329             source_listener_detach (source, client);    // return with write lock
1330             read_lock = 0;  // skip the possible reacquiring of the lock later.
1331             source->listeners--;
1332             client->shared_data = NULL;
1333             ret = -1;
1334             break;
1335         }
1336         if (source->fallback.mount)
1337         {
1338             source_listener_detach (source, client);
1339             source->listeners--;
1340             thread_rwlock_unlock (&source->lock);
1341             client->shared_data = NULL;
1342             ret = move_listener (client, &source->fallback);
1343             thread_rwlock_wlock (&source->lock);
1344             if (ret <= 0)
1345             {
1346                 source->termination_count--;
1347                 return ret;
1348             }
1349             read_lock = 0;
1350             source->listeners++;
1351             source_setup_listener (source, client);
1352             ret = 0;
1353         }
1354         if (source->flags & SOURCE_TERMINATING)
1355         {
1356             if ((source->flags & SOURCE_PAUSE_LISTENERS) && global.running == ICE_RUNNING)
1357             {
1358                 if (client->refbuf && (client->refbuf->flags & SOURCE_QUEUE_BLOCK))
1359                     client->refbuf = NULL;
1360                 client->ops = &listener_pause_ops;
1361                 client->flags |= CLIENT_HAS_MOVED;
1362                 client->schedule_ms = client->worker->time_ms + 60;
1363                 client->timer_start = client->worker->current_time.tv_sec;
1364                 break;
1365             }
1366             ret = -1;
1367             break;
1368         }
1369         client->ops = &listener_wait_ops;
1370         client->schedule_ms = client->worker->time_ms + 100;
1371         break;
1372     }
1373     if (read_lock) // acquire write lock if still with read lock
1374     {
1375         thread_rwlock_unlock (&source->lock);
1376         thread_rwlock_wlock (&source->lock);
1377     }
1378     source->termination_count--;
1379     return ret;
1380 }
1381 
1382 
send_listener(source_t * source,client_t * client)1383 static int send_listener (source_t *source, client_t *client)
1384 {
1385     int bytes;
1386     int loop = 40;   /* max number of iterations in one go */
1387     long total_written = 0, limiter = source->listener_send_trigger;
1388     int ret = 0, lag;
1389     worker_t *worker = client->worker;
1390     time_t now = worker->current_time.tv_sec;
1391 
1392     client->schedule_ms = worker->time_ms;
1393 
1394     if (source->flags & SOURCE_LISTENERS_SYNC)
1395         return listener_waiting_on_source (source, client);
1396 
1397     /* check for limited listener time */
1398     if (client->flags & CLIENT_RANGE_END)
1399     {
1400         if (client->connection.discon.offset <= client->connection.sent_bytes)
1401             return -1;
1402     }
1403     else if (client->connection.discon.time && now >= client->connection.discon.time)
1404     {
1405         INFO1 ("time limit reached for client #%" PRIu64, client->connection.id);
1406         client->connection.error = 1;
1407         return -1;
1408     }
1409     if (source_running (source) == 0)
1410     {
1411         DEBUG0 ("source not running, listener will wait");
1412         client->schedule_ms += 100;
1413         return 0;
1414     }
1415 
1416     // do we migrate this listener to the same handler as the source client
1417     if (listener_change_worker (client, source))
1418         return 1;
1419 
1420     lag = source->client->queue_pos - client->queue_pos;
1421 
1422     /* progessive slowdown if nearing max bandwidth.  */
1423     if (global.max_rate)
1424     {
1425         if (throttle_sends > 2) /* exceeded limit, skip */
1426         {
1427             client->schedule_ms += 40 + (client->throttle * 3);
1428             return 0;
1429         }
1430         if (throttle_sends > 1) /* slow down any multiple sends */
1431         {
1432             loop = 3;
1433             client->schedule_ms += (client->throttle * 4);
1434         }
1435         if (throttle_sends > 0)
1436         {
1437             /* make lagging listeners, lag further on high server bandwidth use */
1438             if (lag > (source->incoming_rate*2))
1439                 client->schedule_ms += 100 + (client->throttle * 3);
1440         }
1441     }
1442     // set between 1 and 25
1443     client->throttle = source->incoming_adj > 25 ? 25 : (source->incoming_adj > 0 ? source->incoming_adj : 1);
1444     while (1)
1445     {
1446         /* lets not send too much to one client in one go, but don't
1447            sleep for too long if more data can be sent */
1448         if (loop == 0 || total_written > limiter)
1449         {
1450             client->schedule_ms += 25;
1451             break;
1452         }
1453         bytes = client->check_buffer (client);
1454         if (bytes < 0)
1455         {
1456             if (client->connection.error || (total_written == 0 && connection_unreadable (&client->connection)))
1457             {
1458                 ret = -1;
1459                 break;
1460             }
1461             client->schedule_ms += 15;
1462             break;  /* can't write any more */
1463         }
1464 
1465         total_written += bytes;
1466         loop--;
1467     }
1468     if (total_written)
1469     {
1470         rate_add_sum (source->out_bitrate, total_written, worker->time_ms, &source->format->sent_bytes);
1471         global_add_bitrates (global.out_bitrate, total_written, worker->time_ms);
1472     }
1473 
1474     if (source->shrink_time && client->connection.error == 0)
1475     {
1476         lag = source->client->queue_pos - client->queue_pos;
1477         if (lag > source->queue_size_limit)
1478             lag = source->queue_size_limit; // impose a higher lag value
1479         thread_spin_lock (&source->shrink_lock);
1480         if (client->queue_pos < source->shrink_pos)
1481             source->shrink_pos = source->client->queue_pos - lag;
1482         thread_spin_unlock (&source->shrink_lock);
1483     }
1484     return ret;
1485 }
1486 
1487 
1488 /* Perform any initialisation before the stream data is processed, the header
1489  * info is processed by now and the format details are setup
1490  */
source_init(source_t * source)1491 void source_init (source_t *source)
1492 {
1493     mount_proxy *mountinfo;
1494 
1495     if (source->dumpfilename != NULL)
1496     {
1497         unsigned int len;
1498         char buffer [4096];
1499 
1500         len = sizeof buffer;
1501         if (util_expand_pattern (source->mount, source->dumpfilename, buffer, &len) == 0)
1502         {
1503             INFO2 ("dumpfile \"%s\" for %s", buffer, source->mount);
1504             source->dumpfile = fopen (buffer, "ab");
1505             if (source->dumpfile == NULL)
1506             {
1507                 WARN2("Cannot open dump file \"%s\" for appending: %s, disabling.",
1508                         buffer, strerror(errno));
1509             }
1510         }
1511     }
1512 
1513     /* start off the statistics */
1514     stats_event_inc (NULL, "source_total_connections");
1515     source->stats = stats_lock (source->stats, source->mount);
1516     stats_set_flags (source->stats, "slow_listeners", "0", STATS_COUNTERS);
1517     stats_set (source->stats, "server_type", source->format->contenttype);
1518     stats_set_flags (source->stats, "listener_peak", "0", STATS_COUNTERS);
1519     stats_set_args (source->stats, "listener_peak", "%lu", source->peak_listeners);
1520     stats_set_flags (source->stats, "listener_connections", "0", STATS_COUNTERS);
1521     stats_set_time (source->stats, "stream_start", STATS_COUNTERS, source->client->worker->current_time.tv_sec);
1522     stats_set_flags (source->stats, "total_mbytes_sent", "0", STATS_COUNTERS);
1523     stats_set_flags (source->stats, "total_bytes_sent", "0", STATS_COUNTERS);
1524     stats_set_flags (source->stats, "total_bytes_read", "0", STATS_COUNTERS);
1525     stats_set_flags (source->stats, "outgoing_kbitrate", "0", STATS_COUNTERS);
1526     stats_set_flags (source->stats, "incoming_bitrate", "0", STATS_COUNTERS);
1527     stats_set_flags (source->stats, "queue_size", "0", STATS_COUNTERS);
1528     stats_set_flags (source->stats, "connected", "0", STATS_COUNTERS);
1529     stats_set_flags (source->stats, "source_ip", source->client->connection.ip, STATS_COUNTERS);
1530 
1531     source->last_read = time(NULL);
1532     source->prev_listeners = -1;
1533     source->bytes_sent_at_update = 0;
1534     source->stats_interval = 5;
1535     /* so the first set of average stats after 4 seconds */
1536     source->client_stats_update = source->last_read + 4;
1537     source->skip_duration = 40;
1538     source->buffer_count = 0;
1539     source->queue_size_limit = 200000000; // initial sizing
1540     source->default_burst_size = 300000;
1541     source->min_queue_size = 600000;
1542 
1543     util_dict_free (source->audio_info);
1544     source->audio_info = util_dict_new();
1545     if (source->client)
1546     {
1547         const char *str = httpp_getvar(source->client->parser, "ice-audio-info");
1548         if (str)
1549         {
1550             _parse_audio_info (source, str);
1551             stats_set_flags (source->stats, "audio_info", str, STATS_GENERAL);
1552         }
1553         source->client->queue_pos = 0;
1554     }
1555     stats_release (source->stats);
1556     rate_free (source->in_bitrate);
1557     source->in_bitrate = rate_setup (60, 1);
1558     rate_free (source->out_bitrate);
1559     source->out_bitrate = rate_setup (9000, 1000);
1560 
1561     source->flags |= SOURCE_RUNNING;
1562 
1563     mountinfo = config_find_mount (config_get_config(), source->mount);
1564     if (mountinfo)
1565     {
1566         if (mountinfo->max_stream_duration)
1567             source->client->connection.discon.time = source->client->worker->current_time.tv_sec + mountinfo->max_stream_duration;
1568         if (mountinfo->on_connect)
1569             source_run_script (mountinfo->on_connect, source->mount);
1570         auth_stream_start (mountinfo, source);
1571     }
1572     config_release_config();
1573 
1574     INFO1 ("Source %s initialised", source->mount);
1575 
1576     /* on demand relays should of already called this */
1577     if ((source->flags & SOURCE_ON_DEMAND) == 0)
1578         slave_update_mounts();
1579     source->flags &= ~SOURCE_ON_DEMAND;
1580 }
1581 
1582 
source_set_override(mount_proxy * mountinfo,source_t * dest_source,format_type_t type)1583 static int source_set_override (mount_proxy *mountinfo, source_t *dest_source, format_type_t type)
1584 {
1585     source_t *source;
1586     const char *dest = dest_source->mount;
1587     int ret = 0, loop = 15;
1588     ice_config_t *config = config_get_config_unlocked();
1589     unsigned int len;
1590     char *mount = dest_source->mount, buffer [4096];
1591 
1592     if (mountinfo == NULL || mountinfo->fallback_mount == NULL || mountinfo->fallback_override == 0)
1593     {
1594         INFO1 ("no override for %s set", dest_source->mount);
1595         return 0;
1596     }
1597     INFO2 ("for %s set to %s", dest_source->mount, mountinfo->fallback_mount);
1598     avl_tree_rlock (global.source_tree);
1599     while (loop--)
1600     {
1601         len = sizeof buffer;
1602         if (util_expand_pattern (mount, mountinfo->fallback_mount, buffer, &len) < 0)
1603         {
1604             avl_tree_unlock (global.source_tree);
1605             break;
1606         }
1607         mount = buffer;
1608 
1609         DEBUG2 ("checking for %s on %s", mount, dest);
1610         source = source_find_mount_raw (mount);
1611         if (source)
1612         {
1613             if (strcmp (source->mount, dest) == 0) // back where we started, drop out
1614             {
1615                 avl_tree_unlock (global.source_tree);
1616                 break;
1617             }
1618             thread_rwlock_wlock (&source->lock);
1619             if (source_running (source))
1620             {
1621                 avl_tree_unlock (global.source_tree);
1622                 if (source->format->type == type)
1623                 {
1624                     if (source->listeners && source->fallback.mount == NULL)
1625                     {
1626                         source->fallback.limit = 0;
1627                         source->fallback.mount = strdup (dest);
1628                         source->fallback.flags = FS_FALLBACK;
1629                         source->fallback.type = type;
1630                         source->termination_count = source->listeners;
1631                         source->client->timer_start = dest_source->client->worker->time_ms;
1632                         source->flags |= SOURCE_LISTENERS_SYNC;
1633                         source_listeners_wakeup (source);
1634                         ret = 1;
1635                     }
1636                 }
1637                 else
1638                     ERROR4("%s (%d) and %s(%d) are different formats", dest, type, mount, source->format->type);
1639                 thread_rwlock_unlock (&source->lock);
1640                 break;
1641             }
1642             thread_rwlock_unlock (&source->lock);
1643         }
1644         mountinfo = config_find_mount (config, mount);
1645         if (mountinfo == NULL || mountinfo->fallback_mount == NULL || mountinfo->fallback_override == 0)
1646         {
1647             avl_tree_unlock (global.source_tree);
1648             if (mount)
1649                 ret = fserve_set_override (mount, dest, type);
1650             break;
1651         }
1652     }
1653     return ret;
1654 }
1655 
1656 
source_set_fallback(source_t * source,const char * dest_mount)1657 void source_set_fallback (source_t *source, const char *dest_mount)
1658 {
1659     int rate = 0;
1660     client_t *client = source->client;
1661     time_t connected;
1662 
1663     if (dest_mount == NULL)
1664     {
1665         INFO1 ("No fallback on %s", source->mount);
1666         return;
1667     }
1668     if (dest_mount[0] != '/')
1669     {
1670         WARN2 ("invalid fallback on \"%s\", ignoring \"%s\"", source->mount, dest_mount);
1671         return;
1672     }
1673     if (source->listeners == 0)
1674     {
1675         INFO2 ("fallback on %s to %s, but no listeners", source->mount, dest_mount);
1676         return;
1677     }
1678 
1679     connected = client->worker->current_time.tv_sec - client->connection.con_time;
1680     if (connected > 40)
1681     {
1682         if (source->flags & SOURCE_TIMEOUT)
1683             rate = (int)rate_avg_shorten (source->in_bitrate, source->timeout);
1684         else
1685             rate = (int)rate_avg (source->in_bitrate);
1686         rate = (int)((rate / 1000) + 0.5) * 1000;
1687     }
1688     if (rate == 0 && source->limit_rate)
1689         rate = source->limit_rate;
1690 
1691     source->fallback.mount = strdup (dest_mount);
1692     source->fallback.fallback = source->mount;
1693     source->fallback.flags = FS_FALLBACK;
1694     source->fallback.limit = rate;
1695     source->fallback.type = source->format->type;
1696     INFO4 ("fallback set on %s to %s(%d) with %ld listeners", source->mount, dest_mount,
1697             source->fallback.limit, source->listeners);
1698 }
1699 
1700 
source_set_intro(source_t * source,const char * file_pattern)1701 int source_set_intro (source_t *source, const char *file_pattern)
1702 {
1703     if (file_pattern == NULL || source == NULL)
1704         return -1;
1705 
1706     ice_config_t *config = config_get_config_unlocked ();
1707     char buffer[4096];
1708     unsigned int len = sizeof buffer;
1709     int ret = snprintf (buffer, len, "%s" PATH_SEPARATOR, config->webroot_dir);
1710 
1711     do
1712     {
1713         if (ret < 1 && ret >= len)
1714             break;
1715         len -= ret;
1716         if (util_expand_pattern (source->mount, file_pattern, buffer + ret, &len) < 0)
1717             break;
1718 
1719         icefile_handle intro_file;
1720         if (file_open (&intro_file, buffer) < 0)
1721         {
1722             WARN3 ("Cannot open intro for %s \"%s\": %s", source->mount, buffer, strerror(errno));
1723             break;
1724         }
1725         format_check_t intro;
1726         intro.fd = intro_file;
1727         intro.desc = buffer;
1728         if (format_check_frames (&intro) < 0 || intro.type == FORMAT_TYPE_UNDEFINED)
1729         {
1730             WARN2 ("Failed to read intro for %s (%s)", source->mount, buffer);
1731             file_close (&intro_file);
1732             break;
1733         }
1734         if (intro.type != source->format->type)
1735         {
1736             WARN2 ("intro file seems to be a different format to %s (%s)", source->mount, buffer);
1737             file_close (&intro_file);
1738             break;
1739         }
1740         // maybe a bitrate check later.
1741         INFO3 ("intro file for %s is %s (%s)", source->mount, file_pattern, buffer);
1742         file_close (&source->intro_file);
1743         source->intro_file = intro_file;
1744         free (source->intro_filename);
1745         source->intro_filename = strdup (buffer + ret);
1746         return 0;
1747     } while (0);
1748     return -1;
1749 }
1750 
1751 
source_shutdown(source_t * source,int with_fallback)1752 void source_shutdown (source_t *source, int with_fallback)
1753 {
1754     mount_proxy *mountinfo;
1755     client_t *src_client = source->client;
1756 
1757     INFO1("Source \"%s\" exiting", source->mount);
1758 
1759     source->flags &= ~(SOURCE_ON_DEMAND);
1760     source->termination_count = source->listeners;
1761     source->client->timer_start = source->client->worker->time_ms;
1762     source->flags |= (SOURCE_TERMINATING | SOURCE_LISTENERS_SYNC);
1763     source_listeners_wakeup (source);
1764     mountinfo = config_find_mount (config_get_config(), source->mount);
1765     if (src_client->connection.con_time && src_client->parser)
1766     {
1767         /* only do these if source has been running */
1768         if (source->stats)
1769             update_source_stats (source);
1770         if (mountinfo)
1771         {
1772             if (mountinfo->on_disconnect)
1773                 source_run_script (mountinfo->on_disconnect, source->mount);
1774             auth_stream_end (mountinfo, source);
1775         }
1776     }
1777     if (mountinfo && with_fallback && global.running == ICE_RUNNING)
1778         source_set_fallback (source, mountinfo->fallback_mount);
1779     source->flags &= ~(SOURCE_TIMEOUT);
1780     config_release_config();
1781 }
1782 
1783 
_parse_audio_info(source_t * source,const char * s)1784 static void _parse_audio_info (source_t *source, const char *s)
1785 {
1786     const char *start = s;
1787     unsigned int len;
1788 
1789     while (start != NULL && *start != '\0')
1790     {
1791         if ((s = strchr (start, ';')) == NULL)
1792             len = strlen (start);
1793         else
1794         {
1795             len = (int)(s - start);
1796             s++; /* skip passed the ';' */
1797         }
1798         if (len)
1799         {
1800             char name[100], value[200];
1801             int n = sscanf (start, "%99[^=]=%199[^;\r\n]", name, value);
1802 
1803             if (n == 2 && (strncmp (name, "ice-", 4) == 0 || strncmp (name, "bitrate=", 7) == 0))
1804             {
1805                 char *esc = util_url_unescape (value);
1806                 if (esc)
1807                 {
1808                     util_dict_set (source->audio_info, name, esc);
1809                     stats_set_flags (source->stats, name, esc, STATS_COUNTERS);
1810                 }
1811                 free (esc);
1812             }
1813         }
1814         start = s;
1815     }
1816 }
1817 
1818 
compare_intro_ipcache(void * arg,void * a,void * b)1819 static int compare_intro_ipcache (void *arg, void *a, void *b)
1820 {
1821     struct node_IP_time *this = (struct node_IP_time *)a;
1822     struct node_IP_time *that = (struct node_IP_time *)b;
1823     int ret = strcmp (&this->ip[0], &that->ip[0]);
1824 
1825     if (ret && that->a.timeout)
1826     {
1827         cache_file_contents *c = arg;
1828         time_t threshold = c->file_recheck;
1829 
1830         if (c->deletions_count < 9 && that->a.timeout < threshold)
1831         {
1832             c->deletions [c->deletions_count] = that;
1833             c->deletions_count++;
1834         }
1835     }
1836     return ret;
1837 }
1838 
1839 
source_apply_preroll(mount_proxy * mountinfo,source_t * source)1840 int source_apply_preroll (mount_proxy *mountinfo, source_t *source)
1841 {
1842     do
1843     {
1844         if (mountinfo == NULL || mountinfo->preroll_log.name == NULL)
1845             break;
1846 
1847         ice_config_t *config = config_get_config_unlocked ();
1848         struct error_log *preroll = &mountinfo->preroll_log;
1849         unsigned int len = 4096;
1850         int ret;
1851         char buffer [len];
1852 
1853         ret = snprintf (buffer, len, "%s" PATH_SEPARATOR, config->log_dir);
1854         if (ret < 0 || ret >= len)
1855             break;
1856         len -= ret;
1857         if (util_expand_pattern (source->mount, mountinfo->preroll_log.name, buffer + ret, &len) < 0)
1858             break;
1859         if (source->preroll_log_id < 0)
1860             source->preroll_log_id = log_open (buffer);
1861         if (source->preroll_log_id < 0)
1862             break;
1863         INFO3 ("using pre-roll log file %s (%s) for %s", preroll->name, buffer, source->mount);
1864         // log_set_filename (source->preroll_log_id, buffer);
1865         long max_size = (preroll->size > 10000) ? preroll->size : config->preroll_log.size;
1866         log_set_trigger (source->preroll_log_id, max_size);
1867         log_set_reopen_after (source->preroll_log_id, preroll->duration);
1868         log_set_lines_kept (source->preroll_log_id, preroll->display);
1869         int archive = (preroll->archive == -1) ? config->preroll_log.archive : preroll->archive;
1870         log_set_archive_timestamp (source->preroll_log_id, archive);
1871         //DEBUG4 ("log %s, size %ld, duration %u, archive %d", preroll->name, max_size, preroll->duration, archive);
1872         log_reopen (source->preroll_log_id);
1873         return 0;
1874     } while (0);
1875 
1876     log_close (source->preroll_log_id);
1877     source->preroll_log_id = -1;
1878     return -1;
1879 }
1880 
1881 
1882 /* Apply the mountinfo details to the source */
source_apply_mount(source_t * source,mount_proxy * mountinfo)1883 static void source_apply_mount (source_t *source, mount_proxy *mountinfo)
1884 {
1885     const char *str;
1886     int val;
1887     http_parser_t *parser = NULL;
1888 
1889     if (mountinfo == NULL || strcmp (mountinfo->mountname, source->mount) == 0)
1890         INFO1 ("Applying mount information for \"%s\"", source->mount);
1891     else
1892         INFO2 ("Applying mount information for \"%s\" from \"%s\"",
1893                 source->mount, mountinfo->mountname);
1894 
1895     stats_set_args (source->stats, "listener_peak", "%lu", source->peak_listeners);
1896 
1897     /* if a setting is available in the mount details then use it, else
1898      * check the parser details. */
1899 
1900     if (source->client)
1901         parser = source->client->parser;
1902 
1903     /* to be done before possible non-utf8 stats */
1904     if (source->format && source->format->apply_settings)
1905         source->format->apply_settings (source->format, mountinfo);
1906 
1907     /* public */
1908     if (mountinfo && mountinfo->yp_public >= 0)
1909         val = mountinfo->yp_public;
1910     else
1911     {
1912         do {
1913             str = httpp_getvar (parser, "ice-public");
1914             if (str) break;
1915             str = httpp_getvar (parser, "icy-pub");
1916             if (str) break;
1917             str = httpp_getvar (parser, "x-audiocast-public");
1918             if (str) break;
1919             /* handle header from icecast v2 release */
1920             str = httpp_getvar (parser, "icy-public");
1921             if (str) break;
1922             str = source->yp_public > 0 ? "1" : "0";
1923         } while (0);
1924         val = atoi (str);
1925     }
1926     stats_set_args (source->stats, "public", "%d", val);
1927     if (source->yp_public != val)
1928     {
1929         DEBUG1 ("YP changed to %d", val);
1930         if (val)
1931             yp_add (source->mount);
1932         else
1933             yp_remove (source->mount);
1934         source->yp_public = val;
1935     }
1936 
1937     /* stream name */
1938     if (mountinfo && mountinfo->stream_name)
1939         stats_set (source->stats, "server_name", mountinfo->stream_name);
1940     else
1941     {
1942         do {
1943             str = httpp_getvar (parser, "ice-name");
1944             if (str) break;
1945             str = httpp_getvar (parser, "icy-name");
1946             if (str) break;
1947             str = httpp_getvar (parser, "x-audiocast-name");
1948             if (str) break;
1949             str = "Unspecified name";
1950         } while (0);
1951         if (source->format)
1952             stats_set_conv (source->stats, "server_name", str, source->format->charset);
1953     }
1954 
1955     /* stream description */
1956     if (mountinfo && mountinfo->stream_description)
1957         stats_set (source->stats, "server_description", mountinfo->stream_description);
1958     else
1959     {
1960         do {
1961             str = httpp_getvar (parser, "ice-description");
1962             if (str) break;
1963             str = httpp_getvar (parser, "icy-description");
1964             if (str) break;
1965             str = httpp_getvar (parser, "x-audiocast-description");
1966             if (str) break;
1967         } while (0);
1968         if (str && source->format)
1969             stats_set_conv (source->stats, "server_description", str, source->format->charset);
1970     }
1971 
1972     /* stream URL */
1973     if (mountinfo && mountinfo->stream_url)
1974         stats_set (source->stats, "server_url", mountinfo->stream_url);
1975     else
1976     {
1977         do {
1978             str = httpp_getvar (parser, "ice-url");
1979             if (str) break;
1980             str = httpp_getvar (parser, "icy-url");
1981             if (str) break;
1982             str = httpp_getvar (parser, "x-audiocast-url");
1983             if (str) break;
1984         } while (0);
1985         if (str && source->format)
1986             stats_set_conv (source->stats, "server_url", str, source->format->charset);
1987     }
1988 
1989     /* stream genre */
1990     if (mountinfo && mountinfo->stream_genre)
1991         stats_set (source->stats, "genre", mountinfo->stream_genre);
1992     else
1993     {
1994         do {
1995             str = httpp_getvar (parser, "ice-genre");
1996             if (str) break;
1997             str = httpp_getvar (parser, "icy-genre");
1998             if (str) break;
1999             str = httpp_getvar (parser, "x-audiocast-genre");
2000             if (str) break;
2001             str = "various";
2002         } while (0);
2003         if (source->format)
2004             stats_set_conv (source->stats, "genre", str, source->format->charset);
2005     }
2006 
2007     /* stream bitrate */
2008     if (mountinfo && mountinfo->bitrate)
2009     {
2010         str = mountinfo->bitrate;
2011         stats_set (source->stats, "bitrate", str);
2012     }
2013     else
2014     {
2015         do {
2016             str = httpp_getvar (parser, "ice-bitrate");
2017             if (str) break;
2018             str = httpp_getvar (parser, "icy-br");
2019             if (str) break;
2020             str = httpp_getvar (parser, "x-audiocast-bitrate");
2021         } while (0);
2022         if (str)
2023             stats_set (source->stats, "bitrate", str);
2024     }
2025 
2026     /* handle MIME-type */
2027     if (mountinfo && mountinfo->type)
2028     {
2029         if (source->format)
2030         {
2031             format_type_t type = format_get_type (mountinfo->type);
2032             if (type == FORMAT_TYPE_UNDEFINED)
2033                 WARN2 ("type specified for %s is unrecognised (%s)", source->mount, mountinfo->type);
2034             else
2035                 source->format->type = format_get_type (mountinfo->type);
2036             free (source->format->contenttype);
2037             source->format->contenttype = strdup (mountinfo->type);
2038         }
2039         stats_set (source->stats, "server_type", mountinfo->type);
2040     }
2041     else
2042         if (source->format && source->format->contenttype)
2043             stats_set (source->stats, "server_type", source->format->contenttype);
2044 
2045     if (mountinfo && mountinfo->subtype)
2046         stats_set (source->stats, "subtype", mountinfo->subtype);
2047 
2048     if (mountinfo && mountinfo->auth)
2049         stats_set (source->stats, "authenticator", mountinfo->auth->type);
2050     else
2051         stats_set (source->stats, "authenticator", NULL);
2052 
2053     source->limit_rate = 0;
2054     if (mountinfo && mountinfo->limit_rate)
2055         source->limit_rate = mountinfo->limit_rate;
2056 
2057     /* needs a better mechanism, probably via a client_t handle */
2058     free (source->dumpfilename);
2059     source->dumpfilename = NULL;
2060     if (mountinfo && mountinfo->dumpfile)
2061     {
2062         time_t now = time(NULL);
2063         struct tm local;
2064         char buffer[PATH_MAX];
2065 
2066         localtime_r (&now, &local);
2067         if (strftime (buffer, sizeof (buffer), mountinfo->dumpfile, &local) == 0)
2068         {
2069             WARN3 ("had problem on %s expanding dumpfile %s (%s)", source->mount, mountinfo->dumpfile, strerror(errno));
2070             errno = 0;
2071         }
2072         else
2073             source->dumpfilename = strdup (buffer);
2074     }
2075     source_apply_preroll (mountinfo, source);
2076 
2077     /* handle changes in intro file setting */
2078     file_close (&source->intro_file);
2079     free (source->intro_filename);
2080     source->intro_filename = NULL;
2081     cached_prune (source->intro_ipcache);
2082     free (source->intro_ipcache);
2083     source->intro_ipcache = NULL;
2084     source->intro_skip_replay = 0;
2085     if (mountinfo && mountinfo->intro_filename)
2086     {
2087         // only set here if there is data present, for type verification
2088         if (source->stream_data)
2089            source_set_intro (source, mountinfo->intro_filename);
2090 
2091         if (mountinfo->intro_skip_replay)
2092         {
2093             cache_file_contents *c = calloc (1, sizeof (cache_file_contents));
2094 
2095             source->intro_ipcache = c;
2096             c->contents = avl_tree_new (compare_intro_ipcache, c);
2097             source->intro_skip_replay = mountinfo->intro_skip_replay;
2098         }
2099     }
2100 
2101     if (mountinfo && mountinfo->source_timeout)
2102         source->timeout = mountinfo->source_timeout;
2103 
2104     if (mountinfo && mountinfo->queue_size_limit)
2105         source->queue_len_value = mountinfo->queue_size_limit;
2106 
2107     if (mountinfo && mountinfo->burst_size)
2108         source->default_burst_value = (unsigned int)mountinfo->burst_size;
2109 
2110     if (mountinfo && mountinfo->min_queue_size)
2111         source->min_queue_len_value = mountinfo->min_queue_size;
2112 
2113     source->wait_time = 0;
2114     if (mountinfo && mountinfo->wait_time)
2115         source->wait_time = (time_t)mountinfo->wait_time;
2116 }
2117 
2118 
2119 /* update the specified source with details from the config or mount.
2120  * mountinfo can be NULL in which case default settings should be taken
2121  */
source_update_settings(ice_config_t * config,source_t * source,mount_proxy * mountinfo)2122 void source_update_settings (ice_config_t *config, source_t *source, mount_proxy *mountinfo)
2123 {
2124     char *listen_url;
2125     int len;
2126 
2127     /* set global settings first */
2128     if (mountinfo == NULL)
2129     {
2130         source->queue_len_value = config->queue_size_limit;
2131         source->min_queue_len_value = config->min_queue_size;
2132         source->timeout = config->source_timeout;
2133         source->default_burst_value = config->burst_size;
2134     }
2135     stats_lock (source->stats, source->mount);
2136 
2137     len = strlen (config->hostname) + strlen(source->mount) + 16;
2138     listen_url = alloca (len);
2139     snprintf (listen_url, len, "http://%s:%d%s", config->hostname, config->port, source->mount);
2140     stats_set_flags (source->stats, "listenurl", listen_url, STATS_COUNTERS);
2141 
2142     source_apply_mount (source, mountinfo);
2143 
2144     if (source->dumpfilename)
2145         DEBUG1 ("Dumping stream to %s", source->dumpfilename);
2146     if (source->flags & SOURCE_ON_DEMAND)
2147     {
2148         DEBUG0 ("on_demand set");
2149         stats_set (source->stats, "on_demand", "1");
2150         stats_set_args (source->stats, "listeners", "%ld", source->listeners);
2151     }
2152     else
2153         stats_set (source->stats, "on_demand", NULL);
2154 
2155     if (mountinfo)
2156     {
2157         if (mountinfo->on_connect)
2158             DEBUG1 ("connect script \"%s\"", mountinfo->on_connect);
2159         if (mountinfo->on_disconnect)
2160             DEBUG1 ("disconnect script \"%s\"", mountinfo->on_disconnect);
2161         if (mountinfo->fallback_when_full)
2162             DEBUG1 ("fallback_when_full to %u", mountinfo->fallback_when_full);
2163         DEBUG1 ("max listeners to %d", mountinfo->max_listeners);
2164         stats_set_args (source->stats, "max_listeners", "%d", mountinfo->max_listeners);
2165         stats_set_flags (source->stats, "cluster_password", mountinfo->cluster_password, STATS_SLAVE|STATS_HIDDEN);
2166         if (mountinfo->hidden)
2167         {
2168             stats_set_flags (source->stats, NULL, NULL, STATS_HIDDEN);
2169             DEBUG0 ("hidden from public");
2170         }
2171         else
2172             stats_set_flags (source->stats, NULL, NULL, 0);
2173     }
2174     else
2175     {
2176         DEBUG0 ("max listeners is not specified");
2177         stats_set (source->stats, "max_listeners", "unlimited");
2178         stats_set_flags (source->stats, "cluster_password", NULL, STATS_SLAVE);
2179         stats_set_flags (source->stats, NULL, NULL, STATS_PUBLIC);
2180     }
2181     stats_release (source->stats);
2182     DEBUG1 ("public set to %d", source->yp_public);
2183     DEBUG1 ("source timeout to %u", source->timeout);
2184 }
2185 
2186 
source_client_callback(client_t * client)2187 static int source_client_callback (client_t *client)
2188 {
2189     const char *agent;
2190     source_t *source = client->shared_data;
2191 
2192     stats_event_inc(NULL, "source_client_connections");
2193 
2194     client->ops = &source_client_ops;
2195     if (source_running (source))
2196         stats_event_inc (NULL, "source_total_connections");
2197     else
2198         source_init (source);
2199     agent = httpp_getvar (source->client->parser, "user-agent");
2200     thread_rwlock_unlock (&source->lock);
2201     if (agent)
2202     {
2203         stats_lock (source->stats, source->mount);
2204         stats_set_flags (source->stats, "user_agent", agent, STATS_COUNTERS);
2205         stats_release (source->stats);
2206     }
2207     return 0;
2208 }
2209 
2210 
2211 #ifndef _WIN32
source_run_script(char * command,char * mountpoint)2212 static void source_run_script (char *command, char *mountpoint)
2213 {
2214     pid_t pid, external_pid;
2215     char *p, *comm;
2216     int wstatus;
2217 
2218     comm = p = strdup (command);
2219 #ifdef HAVE_STRSEP
2220     strsep (&p, " \t");
2221 #else
2222     if (strchr (command, ' '))  // possible misconfiguration, but unlikely to occur.
2223         INFO1 ("arguments to command on %s not supported", mountpoint);
2224 #endif
2225     if (access (comm, X_OK) != 0)
2226     {
2227         ERROR3 ("Unable to run command %s on %s (%s)", comm, mountpoint, strerror (errno));
2228         free (comm);
2229         return;
2230     }
2231     DEBUG2 ("Starting command %s on %s", comm, mountpoint);
2232 
2233     /* do a fork twice so that the command has init as parent */
2234     external_pid = fork();
2235     switch (external_pid)
2236     {
2237         case 0:     // child, don't log from here.
2238             switch (pid = fork ())
2239             {
2240                 case -1:
2241                     break;
2242                 case 0:  /* child */
2243 #ifdef HAVE_STRSEP
2244 #define MAX_SCRIPT_ARGS          20
2245                     {
2246                         int i = 1;
2247                         char *args [MAX_SCRIPT_ARGS+1], *tmp;
2248 
2249                         // default set unless overridden
2250                         args[0] = comm;
2251                         args[1] = mountpoint;
2252                         args[2] = NULL;
2253                         while (i < MAX_SCRIPT_ARGS && (tmp = strsep (&p, " \t")))
2254                         {
2255                             unsigned len = 4096;
2256                             char *str = malloc (len);
2257                             if (util_expand_pattern (mountpoint, tmp, str, &len) == 0)
2258                                 args[i] = str;
2259                             i++;
2260                         }
2261                         close (0);
2262                         close (1);
2263                         close (2);
2264                         execvp ((const char *)args[0], args);
2265                     }
2266 #else
2267                     close (0);
2268                     close (1);
2269                     close (2);
2270                     execl (command, command, mountpoint, (char *)NULL);
2271 #endif
2272                     exit(1);
2273                 default: /* parent */
2274                     break;
2275             }
2276             exit (0);
2277         case -1:    // ok, in parent context, no lock clash.
2278             ERROR1 ("Unable to fork %s", strerror (errno));
2279             break;
2280         default: /* parent */
2281             do
2282             {
2283                 if (waitpid (external_pid, &wstatus, 0) < 0)
2284                     break;
2285             } while (WIFEXITED(wstatus) == 0 && WIFSIGNALED(wstatus) == 0);
2286             break;
2287     }
2288     free (comm);
2289 }
2290 #endif
2291 
2292 
2293 /* rescan the mount list, so that xsl files are updated to show
2294  * unconnected but active fallback mountpoints
2295  */
source_recheck_mounts(int update_all)2296 void source_recheck_mounts (int update_all)
2297 {
2298     ice_config_t *config;
2299     time_t mark = time (NULL);
2300     long count = 0;
2301 
2302     avl_tree_rlock (global.source_tree);
2303 
2304     if (update_all)
2305     {
2306         avl_node *node = avl_get_first (global.source_tree);
2307         while (node)
2308         {
2309             source_t *source = (source_t*)node->key;
2310 
2311             thread_rwlock_wlock (&source->lock);
2312             config = config_get_config();
2313             if (source_available (source))
2314                 source_update_settings (config, source, config_find_mount (config, source->mount));
2315             config_release_config();
2316             thread_rwlock_unlock (&source->lock);
2317             node = avl_get_next (node);
2318         }
2319     }
2320 
2321     config = config_get_config();
2322     avl_node *node = avl_get_first (config->mounts_tree);
2323     while (node)
2324     {
2325         source_t *source;
2326         mount_proxy *mount = (mount_proxy*)node->key;
2327 
2328         node = avl_get_next (node);
2329 
2330         ++count;
2331         if ((count & 63) == 0)  // lets give others access to this every so often
2332         {
2333             avl_tree_unlock (global.source_tree);
2334             avl_tree_rlock (global.source_tree);
2335         }
2336 
2337         source = source_find_mount_raw (mount->mountname);
2338         if ((source == NULL || source_available (source) == 0) && mount->fallback_mount)
2339         {
2340             int count = -1;
2341             unsigned int len;
2342             char buffer [4096];
2343 
2344             len = sizeof buffer;
2345             if (util_expand_pattern (mount->mountname, mount->fallback_mount, buffer, &len) == 0)
2346                 count = fallback_count (config, buffer);
2347 
2348             DEBUG2 ("fallback checking %s (fallback has %d)", mount->mountname, count);
2349             if (count >= 0)
2350             {
2351                 stats_handle_t stats = stats_handle (mount->mountname);
2352                 if (source == NULL) // mark for purge if there is no source at all
2353                     stats_set_expire (stats, mark);
2354                 stats_set_flags (stats, NULL, NULL, mount->hidden?STATS_HIDDEN:0);
2355                 stats_set_args (stats, "listenurl", "http://%s:%d%s",
2356                         config->hostname, config->port, mount->mountname);
2357                 stats_set (stats, "listeners", "0");
2358                 if (mount->max_listeners < 0)
2359                     stats_set (stats, "max_listeners", "unlimited");
2360                 else
2361                     stats_set_args (stats, "max_listeners", "%d", mount->max_listeners);
2362                 stats_release (stats);
2363             }
2364         }
2365     }
2366     stats_purge (mark);
2367     avl_tree_unlock (global.source_tree);
2368     config_release_config();
2369 }
2370 
2371 
2372 /* Check whether this listener is on this source. This is only called when
2373  * there is auth. This may flag an existing listener to terminate.
2374  * return 1 if ok to add or 0 to prevent
2375  */
check_duplicate_logins(const char * mount,avl_tree * tree,client_t * client,auth_t * auth)2376 int check_duplicate_logins (const char *mount, avl_tree *tree, client_t *client, auth_t *auth)
2377 {
2378     avl_node *node;
2379 
2380     if (auth == NULL || (auth->flags & AUTH_ALLOW_LISTENER_DUP))
2381         return 1;
2382 
2383     /* allow multiple authenticated relays */
2384     if (client->username == NULL || client->flags & CLIENT_IS_SLAVE)
2385         return 1;
2386 
2387     node = avl_get_first (tree);
2388     while (node)
2389     {
2390         client_t *existing_client = (client_t *)node->key;
2391         if (existing_client->username &&
2392                 strcmp (existing_client->username, client->username) == 0)
2393         {
2394             if (auth->flags & AUTH_DEL_EXISTING_LISTENER)
2395             {
2396                 INFO2 ("Found %s on %s, dropping previous account", existing_client->username, mount);
2397                 existing_client->connection.error = 1;
2398                 return 1;
2399             }
2400             else
2401                 return 0;
2402         }
2403         node = avl_get_next (node);
2404     }
2405     return 1;
2406 }
2407 
2408 
2409 /* source required to stay around for a short while
2410  */
source_client_shutdown(client_t * client)2411 static int source_client_shutdown (client_t *client)
2412 {
2413     if (global.running == ICE_RUNNING && client->connection.discon.time)
2414     {
2415         if (client->connection.discon.time >= client->worker->current_time.tv_sec)
2416         {
2417             client->schedule_ms = client->worker->time_ms + 50;
2418             return 0;
2419         }
2420     }
2421     drop_source_from_tree ((source_t*)client->shared_data);
2422     // source locked but exit function will want it locked
2423     return -1;
2424 }
2425 
2426 
2427 /* clean up what is left from the source. */
source_client_release(client_t * client)2428 void source_client_release (client_t *client)
2429 {
2430     source_t *source = client->shared_data;
2431 
2432     source->flags &= ~(SOURCE_RUNNING|SOURCE_ON_DEMAND);
2433     client->flags &= ~CLIENT_AUTHENTICATED;
2434     /* log bytes read in access log */
2435     if (source->format)
2436         client->connection.sent_bytes = source->format->read_bytes;
2437 
2438     _free_source (source);
2439     slave_update_mounts();
2440     client_destroy (client);
2441     global_reduce_bitrate_sampling (global.out_bitrate);
2442 }
2443 
2444 
source_listener_release(source_t * source,client_t * client)2445 static int source_listener_release (source_t *source, client_t *client)
2446 {
2447     int ret;
2448     ice_config_t *config;
2449     mount_proxy *mountinfo;
2450 
2451     if (client->shared_data == source) // still attached to source?
2452     {
2453         while (1)
2454         {
2455             refbuf_t *r = client->refbuf;
2456             if (r == NULL || (r->flags & REFBUF_SHARED))
2457                 break;
2458             client->refbuf = r->next;
2459             r->next = NULL;
2460             if (source->format->detach_queue_block)
2461                 source->format->detach_queue_block (source, r);
2462             refbuf_release (r);
2463         }
2464         /* search through sources client list to find previous link in list */
2465         source_listener_detach (source, client);
2466         source->listeners--;
2467         client->shared_data = NULL;
2468         client_set_queue (client, NULL);
2469         if (source->listeners == 0)
2470             rate_reduce (source->out_bitrate, 1000);
2471     }
2472 
2473     /* change of listener numbers, so reduce scope of global sampling */
2474     global_reduce_bitrate_sampling (global.out_bitrate);
2475     DEBUG2 ("Listener %" PRIu64 " leaving %s", client->connection.id, source->mount);
2476     // reduce from global count
2477     global_lock();
2478     global.listeners--;
2479     global_unlock();
2480 
2481     config = config_get_config ();
2482     mountinfo = config_find_mount (config, source->mount);
2483 
2484     if (mountinfo && mountinfo->access_log.name)
2485         logging_access_id (&mountinfo->access_log, client);
2486 
2487     ret = auth_release_listener (client, source->mount, mountinfo);
2488     config_release_config();
2489     return ret;
2490 }
2491 
2492 
source_add_listener(const char * mount,mount_proxy * mountinfo,client_t * client)2493 int source_add_listener (const char *mount, mount_proxy *mountinfo, client_t *client)
2494 {
2495     int loop = 10, rate = 0, do_process = 0;
2496     int within_limits;
2497     source_t *source;
2498     mount_proxy *minfo = mountinfo;
2499     const char *passed_mount = mount;
2500     ice_config_t *config = config_get_config_unlocked();
2501     unsigned int len;
2502     char buffer[4096];
2503 
2504     do
2505     {
2506         int64_t stream_bitrate = 0;
2507         int flags = 0;
2508 
2509         do
2510         {
2511             if (loop == 0)
2512             {
2513                 WARN0 ("preventing a fallback loop");
2514                 return client_send_403 (client, "Fallback through too many mountpoints");
2515             }
2516             avl_tree_rlock (global.source_tree);
2517             source = source_find_mount_raw (mount);
2518             if (source)
2519             {
2520                 thread_rwlock_wlock (&source->lock);
2521                 if (source_available (source))
2522                     break;
2523                 thread_rwlock_unlock (&source->lock);
2524             }
2525             avl_tree_unlock (global.source_tree);
2526             if (minfo && minfo->limit_rate)
2527                 rate = minfo->limit_rate/8;
2528             if (minfo == NULL || minfo->fallback_mount == NULL)
2529             {
2530                 int ret = -2;
2531                 if (rate == 0)
2532                     if (sscanf (mount, "%*[^[][%d]", &rate) == 1)
2533                         rate = rate * 1000 / 8;
2534                 if (rate)
2535                 {
2536                     fbinfo f;
2537                     f.flags = flags;
2538                     f.mount = (char *)mount;
2539                     f.fallback = NULL;
2540                     f.limit = rate;
2541                     f.type = FORMAT_TYPE_UNDEFINED;
2542                     if (move_listener (client, &f) == 0)
2543                     {
2544                         /* source dead but fallback to file found */
2545                         stats_event_inc (NULL, "listener_connections");
2546                         return 0;
2547                     }
2548                     ret = -1;
2549                 }
2550                 return ret;
2551             }
2552             len = sizeof buffer;
2553             if (util_expand_pattern (mount, minfo->fallback_mount, buffer, &len) < 0)
2554                 mount = minfo->fallback_mount;
2555             else
2556                 mount = buffer;
2557             minfo = config_find_mount (config_get_config_unlocked(), mount);
2558             flags = FS_FALLBACK;
2559             loop--;
2560         } while (1);
2561 
2562         /* ok, we found a source and it is locked */
2563         avl_tree_unlock (global.source_tree);
2564 
2565         if (client->flags & CLIENT_IS_SLAVE)
2566         {
2567             INFO1 ("client %" PRIu64 " is from a slave, bypassing limits", client->connection.id);
2568             break;
2569         }
2570         if (source->format)
2571         {
2572             stream_bitrate  = 8 * rate_avg (source->in_bitrate);
2573 
2574             if (config->max_bandwidth)
2575             {
2576                 int64_t global_rate = (int64_t)8 * global_getrate_avg (global.out_bitrate);
2577 
2578                 DEBUG1 ("server outgoing bitrate is %" PRId64, global_rate);
2579                 if (global_rate + stream_bitrate > config->max_bandwidth)
2580                 {
2581                     thread_rwlock_unlock (&source->lock);
2582                     INFO0 ("server-wide outgoing bandwidth limit reached");
2583                     return client_send_403redirect (client, passed_mount, "server bandwidth reached");
2584                 }
2585             }
2586             if ((client->flags & CLIENT_AUTHENTICATED) == 0 || httpp_getvar (client->parser, "range"))
2587             {
2588                 int ret;
2589                 int (*build_headers)(format_plugin_t *, client_t *) = format_general_headers;
2590 
2591                 if (source->format->create_client_data)
2592                     build_headers = source->format->create_client_data;
2593 
2594                 client->refbuf->len = 0;
2595                 ret = build_headers (source->format, client);
2596 
2597                 if (ret < 0)
2598                 {
2599                     thread_rwlock_unlock (&source->lock);
2600                     return ret;
2601                 }
2602                 if (client->parser->req_type == httpp_req_head)
2603                     break;
2604                 if ((client->flags & CLIENT_HAS_INTRO_CONTENT) == 0 && client->refbuf->next)
2605                     break;
2606                 if ((source->flags & (SOURCE_RUNNING|SOURCE_ON_DEMAND)) == SOURCE_ON_DEMAND)
2607                 {
2608                     // inactive ondemand relay to kick off, reset client, try headers later
2609                     client->respcode = 0;
2610                     client->pos = 0;
2611                 }
2612                 stats_lock (source->stats, source->mount);
2613                 stats_set_inc (source->stats, "listener_connections");
2614                 stats_release (source->stats);
2615             }
2616         }
2617 
2618         if (mountinfo == NULL)
2619             break; /* allow adding listeners, no mount limits imposed */
2620 
2621         if (mountinfo->intro_skip_replay)
2622             listener_check_intro (source->intro_ipcache, client, mountinfo->intro_skip_replay);
2623 
2624         if (check_duplicate_logins (source->mount, source->clients, client, mountinfo->auth) == 0)
2625         {
2626             thread_rwlock_unlock (&source->lock);
2627             return client_send_403 (client, "Account already in use");
2628         }
2629 
2630         /* set a per-mount disconnect time if auth hasn't set one already */
2631         if (mountinfo->max_listener_duration && client->connection.discon.time == 0)
2632             client->connection.discon.time = time(NULL) + mountinfo->max_listener_duration;
2633 
2634         INFO3 ("max on %s is %d (cur %lu)", source->mount,
2635                 mountinfo->max_listeners, source->listeners);
2636         within_limits = 1;
2637         if (mountinfo->max_bandwidth > -1 && stream_bitrate)
2638         {
2639             DEBUG3 ("checking bandwidth limits for %s (%" PRId64 ", %" PRId64 ")",
2640                     mountinfo->mountname, stream_bitrate, mountinfo->max_bandwidth);
2641             if ((source->listeners+1) * stream_bitrate > mountinfo->max_bandwidth)
2642             {
2643                 INFO1 ("bandwidth limit reached on %s", source->mount);
2644                 within_limits = 0;
2645             }
2646         }
2647         if (within_limits)
2648         {
2649             if (mountinfo->max_listeners == -1)
2650                 break;
2651 
2652             if (source->listeners < (unsigned long)mountinfo->max_listeners)
2653                 break;
2654             INFO1 ("max listener count reached on %s", source->mount);
2655         }
2656         /* minfo starts off as mountinfo put cascades through fallbacks */
2657         if (minfo && minfo->fallback_when_full && minfo->fallback_mount)
2658         {
2659             thread_rwlock_unlock (&source->lock);
2660             len = sizeof buffer;
2661             if (util_expand_pattern (mount, minfo->fallback_mount, buffer, &len) < 0)
2662                 mount = minfo->fallback_mount;
2663             else
2664                 mount = buffer;
2665             INFO1 ("stream full trying %s", mount);
2666             loop--;
2667             continue;
2668         }
2669 
2670         /* now we fail the client */
2671         thread_rwlock_unlock (&source->lock);
2672         return client_send_403redirect (client, passed_mount, "max listeners reached");
2673 
2674     } while (1);
2675 
2676     client->connection.sent_bytes = 0;
2677 
2678     if ((client->flags & CLIENT_AUTHENTICATED) == 0)
2679     {
2680         thread_rwlock_unlock (&source->lock);
2681         return fserve_setup_client (client);
2682     }
2683 
2684     if (client->respcode == 0)
2685     {
2686         client->refbuf->len = PER_CLIENT_REFBUF_SIZE;
2687         memset (client->refbuf->data, 0, PER_CLIENT_REFBUF_SIZE);
2688     }
2689 
2690     global_lock();
2691     if (config->max_listeners)
2692     {
2693         if (config->max_listeners <= global.listeners)
2694         {
2695             global_unlock();
2696             thread_rwlock_unlock (&source->lock);
2697             return client_send_403redirect (client, passed_mount, "max listeners reached");
2698         }
2699     }
2700     global.listeners++;
2701     global_unlock();
2702 
2703     httpp_deletevar (client->parser, "range");
2704     if (client->flags & CLIENT_RANGE_END)
2705     {
2706         // range given on a stream, impose a length limit
2707         if ((off_t)client->connection.discon.offset > client->intro_offset)
2708         {
2709             client->connection.discon.offset -= client->intro_offset;
2710             client->intro_offset = 0;
2711         }
2712         else
2713         {
2714             client->flags &= ~CLIENT_RANGE_END;
2715         }
2716     }
2717     source_setup_listener (source, client);
2718     source->listeners++;
2719 
2720     if ((client->flags & CLIENT_ACTIVE) && (source->flags & SOURCE_RUNNING))
2721         do_process = 1;
2722     else
2723     {
2724         client->flags |= CLIENT_ACTIVE; // from an auth thread context
2725         worker_wakeup (client->worker);
2726     }
2727     thread_rwlock_unlock (&source->lock);
2728     global_reduce_bitrate_sampling (global.out_bitrate);
2729 
2730     stats_event_inc (NULL, "listener_connections");
2731 
2732     if (do_process) // send something back quickly
2733         return client->ops->process (client);
2734     return 0;
2735 }
2736 
2737 
2738 /* call with the source lock held, but expect the lock released on exit
2739  * as the listener may of changed threads and therefore lock needed to be
2740  * released
2741  */
source_setup_listener(source_t * source,client_t * client)2742 void source_setup_listener (source_t *source, client_t *client)
2743 {
2744     if (source->flags & SOURCE_LISTENERS_SYNC)
2745         client->ops = &listener_wait_ops;
2746     else if ((source->flags & (SOURCE_RUNNING|SOURCE_ON_DEMAND)) == SOURCE_ON_DEMAND)
2747         client->ops = &listener_pause_ops;
2748     else
2749         client->ops = &listener_client_ops;
2750     client->shared_data = source;
2751     client->queue_pos = 0;
2752     client->mount = source->mount;
2753     client->flags &= ~CLIENT_IN_FSERVE;
2754 
2755     if (client->connection.sent_bytes > 0)
2756         client->check_buffer = http_source_introfile; // may need incomplete data sending
2757     else
2758         client->check_buffer = http_source_listener; // special case for headers
2759     // add client to the source
2760     avl_insert (source->clients, client);
2761     if (source->flags & SOURCE_ON_DEMAND)
2762         source->client->connection.discon.time = 0; // a run-over with on-demand relays needs resetting
2763 
2764     if ((source->flags & (SOURCE_ON_DEMAND|SOURCE_RUNNING)) == SOURCE_ON_DEMAND)
2765     {
2766         source->client->schedule_ms = 0;
2767         client->schedule_ms += 300;
2768         worker_wakeup (source->client->worker);
2769         DEBUG1 ("woke up relay on %s", source->mount);
2770     }
2771 }
2772 
2773 
source_client_http_send(client_t * client)2774 static int source_client_http_send (client_t *client)
2775 {
2776     refbuf_t *stream;
2777     source_t *source = client->shared_data;
2778 
2779     while (client->pos < client->refbuf->len)
2780     {
2781         if (client->connection.error || format_generic_write_to_client (client) < 0)
2782         {
2783             client->schedule_ms = client->worker->time_ms + 40;
2784             if (client->connection.error == 0)
2785                 return 0; /* trap for short writes */
2786             global_lock();
2787             global.sources--;
2788             stats_event_args (NULL, "sources", "%d", global.sources);
2789             global_unlock();
2790             drop_source_from_tree (source);
2791             WARN1 ("failed to send OK response to source client for %s", source->mount);
2792             return -1;
2793         }
2794     }
2795     stream = client->refbuf->associated;
2796     client->refbuf->associated = NULL;
2797     refbuf_release (client->refbuf);
2798     client->refbuf = stream;
2799     client->pos = client->intro_offset;
2800     client->intro_offset = 0;
2801     thread_rwlock_wlock (&source->lock);
2802     return source_client_callback (client);
2803 }
2804 
2805 
source_format_init(source_t * source)2806 int source_format_init (source_t *source)
2807 {
2808     client_t *client = source->client;
2809     format_plugin_t *format = source->format;
2810 
2811     if (format->mount == NULL)
2812     {
2813         if (format->type == FORMAT_TYPE_UNDEFINED)
2814         {
2815             format_type_t format_type = FORMAT_TYPE_MPEG;
2816             const char *contenttype;
2817 
2818             DEBUG2 ("%sparser found for %s", client->parser ? "":"no ", source->mount);
2819             if (client->parser == NULL)
2820                 return 0;
2821             contenttype = httpp_getvar (client->parser, "content-type");
2822             if (contenttype)
2823             {
2824                 if (strcmp (contenttype, "application/octet-stream") != 0)
2825                     format_type = format_get_type (contenttype);
2826                 if (format_type == FORMAT_TYPE_UNDEFINED)
2827                 {
2828                     WARN1("Content-type \"%s\" not supported, assuming mpeg", contenttype);
2829                     format_type = FORMAT_TYPE_MPEG;
2830                 }
2831             }
2832             else
2833                 WARN1("No content-type for %s, Assuming content is mpeg.", source->mount);
2834             format->type = format_type;
2835         }
2836         format->mount = source->mount;
2837         if (format_get_plugin (format) < 0)
2838         {
2839             WARN1 ("plugin format failed for \"%s\"", source->mount);
2840             return -1;
2841         }
2842     }
2843     format_apply_client (format, client);
2844     return 0;
2845 }
2846 
2847 
source_swap_client(source_t * source,client_t * client)2848 static void source_swap_client (source_t *source, client_t *client)
2849 {
2850     client_t *old_client = source->client;
2851 
2852     INFO2 ("source %s hijacked by another client, terminating previous (at %"PRIu64")", source->mount, old_client->queue_pos);
2853     client->shared_data = source;
2854     source->client = client;
2855 
2856     old_client->schedule_ms = client->worker->time_ms;
2857     old_client->shared_data = NULL;
2858     old_client->flags &= ~CLIENT_AUTHENTICATED;
2859     old_client->connection.sent_bytes = source->format->read_bytes;
2860     client->queue_pos = old_client->queue_pos;
2861 
2862     source->format->read_bytes = 0;
2863     source->format->parser = source->client->parser;
2864     if (source->format->swap_client)
2865         source->format->swap_client (client, old_client);
2866 
2867     worker_wakeup (old_client->worker);
2868 }
2869 
2870 
source_startup(client_t * client,const char * uri)2871 int source_startup (client_t *client, const char *uri)
2872 {
2873     source_t *source;
2874     ice_config_t *config = config_get_config();
2875     mount_proxy *mountinfo;
2876     int source_limit = config->source_limit;
2877 
2878     config_release_config();
2879 
2880     source = source_reserve (uri, (client->flags & CLIENT_HIJACKER) ? 1 : 0);
2881     if (source)
2882     {
2883         if ((client->flags & CLIENT_HIJACKER) && source_running (source))
2884         {
2885             source_swap_client (source, client);
2886         }
2887         else
2888         {
2889             global_lock();
2890             source->client = client;
2891             if (global.sources >= source_limit)
2892             {
2893                 WARN1 ("Request to add source when maximum source limit reached %d", global.sources);
2894                 global_unlock();
2895                 thread_rwlock_unlock (&source->lock);
2896                 source_free_source (source);
2897                 return client_send_403 (client, "too many streams connected");
2898             }
2899             if (source_format_init (source) < 0)
2900             {
2901                 global_unlock();
2902                 source->client = NULL;
2903                 thread_rwlock_unlock (&source->lock);
2904                 source_free_source (source);
2905                 return client_send_403 (client, "content type not supported");
2906             }
2907             ++global.sources;
2908             source->stats = stats_lock (source->stats, source->mount);
2909             stats_release (source->stats);
2910             INFO1 ("sources count is now %d", global.sources);
2911             stats_event_args (NULL, "sources", "%d", global.sources);
2912             global_unlock();
2913         }
2914         client->respcode = 200;
2915         client->shared_data = source;
2916 
2917         config = config_get_config();
2918         mountinfo = config_find_mount (config, source->mount);
2919         source_update_settings (config, source, mountinfo);
2920         INFO1 ("source %s is ready to start", source->mount);
2921         config_release_config();
2922 
2923         if (client->server_conn && client->server_conn->shoutcast_compat)
2924         {
2925             source->flags |= SOURCE_SHOUTCAST_COMPAT;
2926             source_client_callback (client);
2927         }
2928         else
2929         {
2930             refbuf_t *ok = refbuf_new (PER_CLIENT_REFBUF_SIZE);
2931             snprintf (ok->data, PER_CLIENT_REFBUF_SIZE,
2932                     "HTTP/1.0 200 OK\r\n\r\n");
2933             ok->len = strlen (ok->data);
2934             /* we may have unprocessed data read in, so don't overwrite it */
2935             ok->associated = client->refbuf;
2936             client->refbuf = ok;
2937             client->intro_offset = client->pos;
2938             client->pos = 0;
2939             client->ops = &source_client_http_ops;
2940             thread_rwlock_unlock (&source->lock);
2941         }
2942         if ((client->flags & CLIENT_ACTIVE) == 0)
2943         {
2944             client->flags |= CLIENT_ACTIVE;
2945             worker_wakeup (client->worker);
2946         }
2947         return 0;
2948     }
2949     WARN1 ("Mountpoint %s in use", uri);
2950     return client_send_403 (client, "Mountpoint in use");
2951 }
2952 
2953 
2954 /* check to see if the source client can be moved to a less busy worker thread.
2955  * we only move the source client, not the listeners, they will move later
2956  */
source_change_worker(source_t * source,client_t * client)2957 static int source_change_worker (source_t *source, client_t *client)
2958 {
2959     worker_t *this_worker = client->worker, *worker;
2960     int ret = 0;
2961 
2962     thread_rwlock_rlock (&workers_lock);
2963     if (this_worker->move_allocations)
2964     {
2965         int bypass = is_worker_incoming (this_worker) ? 1 : 0;
2966 
2967         worker = worker_selected ();
2968         if (worker && worker != client->worker && (bypass || worker->count > 100))
2969         {
2970             long diff = bypass ? 2000000 : this_worker->count - worker->count;
2971             if (diff - (long)source->listeners < 10)
2972                 diff = 0;   // lets not move the source in this case.
2973             int base = (client->connection.id & 7) << 5;
2974             if ((diff > 2000 && worker->count > 200) || (diff > (source->listeners>>4) + base))
2975             {
2976                 char *mount = strdup (source->mount);
2977                 thread_rwlock_unlock (&source->lock);
2978 
2979                 thread_spin_lock (&this_worker->lock);
2980                 if (this_worker->move_allocations < 1000000)
2981                     this_worker->move_allocations--;
2982                 thread_spin_unlock (&this_worker->lock);
2983 
2984                 ret = client_change_worker (client, worker);
2985                 thread_rwlock_unlock (&workers_lock);
2986                 if (ret)
2987                     DEBUG3 ("moving source %s from %p to %p", mount, this_worker, worker);
2988                 else
2989                     thread_rwlock_wlock (&source->lock);
2990                 free (mount);
2991                 return ret;
2992             }
2993         }
2994     }
2995     thread_rwlock_unlock (&workers_lock);
2996     return 0;
2997 }
2998 
2999 
3000 /* move listener client to worker theread that the source is on. This will
3001  * help cache but prevent overloading a single worker with many listeners.
3002  */
listener_change_worker(client_t * client,source_t * source)3003 int listener_change_worker (client_t *client, source_t *source)
3004 {
3005     worker_t *this_worker = client->worker, *dest_worker = source->client->worker;
3006     int ret = 0, spin = 0, locked = 0;
3007     long diff = 0;
3008 
3009     do
3010     {
3011         if (is_worker_incoming (this_worker) == 0 && (this_worker->time_ms & 0x14))
3012             break;      // routine called frequently, but we do not need to reassess so frequently for normal workers
3013         if (thread_rwlock_tryrlock (&workers_lock) != 0)
3014             break;
3015         locked = 1;
3016         if (this_worker == dest_worker)
3017             dest_worker = worker_selected ();
3018 
3019         if (dest_worker && this_worker != dest_worker)
3020         {
3021             int move = 0;
3022             int adj = ((client->connection.id & 7) << 6) + 100;
3023 
3024             thread_spin_lock (&dest_worker->lock);
3025             int dest_count = dest_worker->count;
3026             thread_spin_unlock (&dest_worker->lock);
3027 
3028             thread_spin_lock (&this_worker->lock);
3029             spin = 1;
3030             if (this_worker->move_allocations == 0)
3031                 break;      // already moved many, skip for now
3032             int this_alloc = this_worker->move_allocations;
3033 
3034             if (is_worker_incoming (this_worker) == 0)
3035             {
3036                 this_worker->move_allocations--;
3037                 diff = this_worker->count - dest_count;
3038                 if (diff < adj)
3039                     break;      // ignore the move this time
3040             }
3041             move = 1;
3042             thread_spin_unlock (&this_worker->lock);
3043             spin = 0;
3044             DEBUG3 ("dest count is %d, %d, move %d", dest_count, this_alloc, move);
3045 
3046             if (move)
3047             {
3048                 thread_rwlock_unlock (&source->lock);
3049                 uint64_t  id = client->connection.id;
3050                 ret = client_change_worker (client, dest_worker);
3051                 if (ret)
3052                     DEBUG4 ("moving listener %" PRIu64 " on %s from %p to %p", id, source->mount, this_worker, dest_worker);
3053                 else
3054                     thread_rwlock_rlock (&source->lock);
3055             }
3056         }
3057     } while (0);
3058 
3059     if (spin)
3060         thread_spin_unlock (&this_worker->lock);
3061     if (locked)
3062         thread_rwlock_unlock (&workers_lock);
3063     return ret;
3064 }
3065 
3066