1 /* Icecast
2 *
3 * This program is distributed under the GNU General Public License, version 2.
4 * A copy of this license is included with this source.
5 *
6 * Copyright 2000-2014, Karl Heyes <karl@kheyes.plus.com>
7 * Copyright 2000-2004, Jack Moffitt <jack@xiph.org>,
8 * Michael Smith <msmith@xiph.org>,
9 * oddsock <oddsock@xiph.org>,
10 * Karl Heyes <karl@xiph.org>
11 * and others (see AUTHORS for details).
12 */
13
14 /* -*- c-basic-offset: 4; indent-tabs-mode: nil; -*- */
15 #ifdef HAVE_CONFIG_H
16 #include <config.h>
17 #endif
18
19 #ifdef WIN32
20 #include <winsock2.h>
21 #include <windows.h>
22 #endif
23
24 #include "compat.h"
25 #include <stdio.h>
26 #include <stdlib.h>
27 #include <string.h>
28 #include <limits.h>
29 #include <sys/types.h>
30 #include <ogg/ogg.h>
31 #include <errno.h>
32
33 #ifdef HAVE_UNISTD_H
34 #include <unistd.h>
35 #endif
36 #ifdef HAVE_SYS_SOCKET_H
37 #include <sys/socket.h>
38 #endif
39 #ifdef HAVE_SYS_WAIT_H
40 #include <sys/wait.h>
41 #endif
42
43 #include "thread/thread.h"
44 #include "avl/avl.h"
45 #include "httpp/httpp.h"
46 #include "net/sock.h"
47
48 #include "connection.h"
49 #include "global.h"
50 #include "refbuf.h"
51 #include "client.h"
52 #include "stats.h"
53 #include "logging.h"
54 #include "cfgfile.h"
55 #include "util.h"
56 #include "source.h"
57 #include "format.h"
58 #include "fserve.h"
59 #include "auth.h"
60 #include "slave.h"
61
62 #undef CATMODULE
63 #define CATMODULE "source"
64
65 #define MAX_FALLBACK_DEPTH 10
66
67
68 /* avl tree helper */
69 static void _parse_audio_info (source_t *source, const char *s);
70 static void source_client_release (client_t *client);
71 static int source_listener_release (source_t *source, client_t *client);
72 static int source_client_read (client_t *client);
73 static int source_client_shutdown (client_t *client);
74 static int source_client_http_send (client_t *client);
75 static int send_to_listener (client_t *client);
76 static int send_listener (source_t *source, client_t *client);
77 static int wait_for_restart (client_t *client);
78 static int wait_for_other_listeners (client_t *client);
79
80 static int http_source_listener (client_t *client);
81 static int http_source_intro (client_t *client);
82 static int http_source_introfile (client_t *client);
83 static int locate_start_on_queue (source_t *source, client_t *client);
84 static int listener_change_worker (client_t *client, source_t *source);
85 static int source_change_worker (source_t *source, client_t *client);
86 static int source_client_callback (client_t *client);
87 static int source_set_override (mount_proxy *mountinfo, source_t *dest_source, format_type_t type);
88
89 #ifdef _WIN32
90 #define source_run_script(x,y) WARN0("on [dis]connect scripts disabled");
91 #else
92 static void source_run_script (char *command, char *mountpoint);
93 #endif
94
95 struct _client_functions source_client_ops =
96 {
97 source_client_read,
98 client_destroy
99 };
100
101 struct _client_functions source_client_halt_ops =
102 {
103 source_client_shutdown,
104 source_client_release
105 };
106
107 struct _client_functions listener_client_ops =
108 {
109 send_to_listener,
110 client_destroy
111 };
112
113 struct _client_functions listener_pause_ops =
114 {
115 wait_for_restart,
116 client_destroy
117 };
118
119 struct _client_functions listener_wait_ops =
120 {
121 wait_for_other_listeners,
122 client_destroy
123 };
124
125 struct _client_functions source_client_http_ops =
126 {
127 source_client_http_send,
128 source_client_release
129 };
130
131
132 /* Allocate a new source with the stated mountpoint, if one already
133 * exists with that mountpoint in the global source tree then return
134 * NULL.
135 */
source_reserve(const char * mount,int flags)136 source_t *source_reserve (const char *mount, int flags)
137 {
138 source_t *src = NULL;
139
140 do
141 {
142 avl_tree_wlock (global.source_tree);
143 src = source_find_mount_raw (mount);
144 if (src)
145 {
146 if ((flags & 1) == 0)
147 src = NULL;
148 else if (src->flags & SOURCE_LISTENERS_SYNC)
149 src = NULL;
150 break;
151 }
152
153 src = calloc (1, sizeof(source_t));
154 if (src == NULL)
155 break;
156
157 /* make duplicates for strings or similar */
158 src->mount = strdup (mount);
159 src->listener_send_trigger = 16000;
160 src->format = calloc (1, sizeof(format_plugin_t));
161 src->clients = avl_tree_new (client_compare, NULL);
162 src->intro_file = -1;
163 src->preroll_log_id = -1;
164
165 thread_rwlock_create (&src->lock);
166 thread_spin_create (&src->shrink_lock);
167 src->flags |= SOURCE_RESERVED;
168
169 avl_insert (global.source_tree, src);
170
171 } while (0);
172
173 if (src)
174 thread_rwlock_wlock (&src->lock);
175 avl_tree_unlock (global.source_tree);
176 return src;
177 }
178
179
180 /* Find a mount with this raw name - ignoring fallbacks. You should have the
181 * global source tree locked to call this.
182 */
source_find_mount_raw(const char * mount)183 source_t *source_find_mount_raw(const char *mount)
184 {
185 source_t *source;
186 avl_node *node;
187 int cmp;
188
189 if (!mount) {
190 return NULL;
191 }
192 /* get the root node */
193 node = global.source_tree->root->right;
194
195 while (node) {
196 source = (source_t *)node->key;
197 cmp = strcmp (mount, source->mount);
198 if (cmp < 0)
199 node = node->left;
200 else if (cmp > 0)
201 node = node->right;
202 else
203 return source;
204 }
205
206 /* didn't find it */
207 return NULL;
208 }
209
210
211 /* Search for mount, if the mount is there but not currently running then
212 * check the fallback, and so on. Must have a global source lock to call
213 * this function.
214 */
source_find_mount(const char * mount)215 source_t *source_find_mount (const char *mount)
216 {
217 source_t *source = NULL;
218 ice_config_t *config;
219 mount_proxy *mountinfo;
220 int depth = 0;
221
222 config = config_get_config();
223 while (mount && depth < MAX_FALLBACK_DEPTH)
224 {
225 source = source_find_mount_raw (mount);
226
227 if (source)
228 {
229 if (source_available (source))
230 break;
231 }
232
233 /* we either have a source which is not active (relay) or no source
234 * at all. Check the mounts list for fallback settings
235 */
236 mountinfo = config_find_mount (config, mount);
237 source = NULL;
238
239 if (mountinfo == NULL)
240 break;
241 mount = mountinfo->fallback_mount;
242 depth++;
243 }
244
245 config_release_config();
246 return source;
247 }
248
249
source_compare_sources(void * arg,void * a,void * b)250 int source_compare_sources(void *arg, void *a, void *b)
251 {
252 source_t *srca = (source_t *)a;
253 source_t *srcb = (source_t *)b;
254
255 return strcmp(srca->mount, srcb->mount);
256 }
257
258
source_clear_source(source_t * source)259 void source_clear_source (source_t *source)
260 {
261 DEBUG2 ("clearing source \"%s\" %p", source->mount, source);
262
263 if (source->dumpfile)
264 {
265 INFO1 ("Closing dumpfile for %s", source->mount);
266 fclose (source->dumpfile);
267 source->dumpfile = NULL;
268 }
269
270 /* flush out the stream data, we don't want any left over */
271 while (source->stream_data)
272 {
273 refbuf_t *to_go = source->stream_data;
274 source->stream_data = to_go->next;
275 to_go->next = NULL;
276 if (source->format->detach_queue_block)
277 source->format->detach_queue_block (source, to_go);
278 refbuf_release (to_go);
279 }
280 source->min_queue_point = NULL;
281 source->stream_data_tail = NULL;
282
283 source->min_queue_size = 0;
284 source->min_queue_offset = 0;
285 source->default_burst_size = 0;
286 source->queue_size = 0;
287 source->queue_size_limit = 0;
288 source->client_stats_update = 0;
289 source->shrink_pos = 0;
290 source->shrink_time = 0;
291 util_dict_free (source->audio_info);
292 source->audio_info = NULL;
293 rate_free (source->out_bitrate);
294 source->out_bitrate = NULL;
295 rate_free (source->in_bitrate);
296 source->in_bitrate = NULL;
297
298 free(source->dumpfilename);
299 source->dumpfilename = NULL;
300
301 free (source->intro_filename);
302 source->intro_filename = NULL;
303 file_close (&source->intro_file);
304 log_close (source->preroll_log_id);
305 source->preroll_log_id = -1;
306 }
307
308
309 /* the internal free function. at this point we know the source is
310 * not on the source tree */
_free_source(void * p)311 static int _free_source (void *p)
312 {
313 source_t *source = p;
314 source_clear_source (source);
315
316 /* make sure all YP entries have gone */
317 yp_remove (source->mount);
318
319 /* There should be no listeners on this mount */
320 if (source->listeners)
321 WARN3("active listeners on mountpoint %s (%ld, %ld)", source->mount, source->listeners, source->termination_count);
322 avl_tree_free (source->clients, NULL);
323
324 thread_rwlock_unlock (&source->lock);
325 thread_rwlock_destroy (&source->lock);
326 thread_spin_destroy (&source->shrink_lock);
327
328 INFO1 ("freeing source \"%s\"", source->mount);
329 format_plugin_clear (source->format, source->client);
330
331 cached_prune (source->intro_ipcache);
332 free (source->intro_ipcache);
333 source->intro_ipcache = NULL;
334
335 free (source->format);
336 free (source->mount);
337 free (source);
338 return 1;
339 }
340
341
342 // drop source from tree, so it cannot be found by name. No lock on source on entry but
343 // lock still active on return (stats cleared)
drop_source_from_tree(source_t * source)344 static void drop_source_from_tree (source_t *source)
345 {
346 if (source->flags & SOURCE_RESERVED)
347 {
348 avl_tree_wlock (global.source_tree);
349 avl_delete (global.source_tree, source, NULL);
350
351 source->flags &= ~SOURCE_RESERVED;
352 // this is only called from the sources client processing
353 if (source->stats)
354 {
355 DEBUG1 ("stats still referenced on %s", source->mount);
356 stats_lock (source->stats, source->mount);
357 stats_set (source->stats, NULL, NULL);
358 source->stats = 0;
359 }
360 avl_tree_unlock (global.source_tree);
361 DEBUG2 ("removed source %s (%p) from tree", source->mount, source);
362 }
363 thread_rwlock_wlock (&source->lock);
364 }
365
366
367 /* Remove the provided source from the global tree and free it */
source_free_source(source_t * source)368 void source_free_source (source_t *source)
369 {
370 //INFO1 ("source %s to be freed", source->mount);
371 drop_source_from_tree (source);
372 _free_source (source);
373 }
374
375
source_find_client(source_t * source,uint64_t id)376 client_t *source_find_client(source_t *source, uint64_t id)
377 {
378 client_t fakeclient;
379 void *result = NULL;
380
381 fakeclient.connection.id = id;
382
383 avl_get_by_key (source->clients, &fakeclient, &result);
384 return result;
385 }
386
listener_skips_intro(cache_file_contents * cache,client_t * client,int allow)387 static void listener_skips_intro (cache_file_contents *cache, client_t *client, int allow)
388 {
389 struct node_IP_time *result = calloc (1, sizeof (struct node_IP_time));
390
391 snprintf (result->ip, sizeof (result->ip), "%s", client->connection.ip);
392 result->a.timeout = client->worker->current_time.tv_sec + allow;
393 avl_insert (cache->contents, result);
394 DEBUG1 ("Added intro skip entry for %s", &result->ip[0]);
395 }
396
397
listener_check_intro(cache_file_contents * cache,client_t * client,int allow)398 static int listener_check_intro (cache_file_contents *cache, client_t *client, int allow)
399 {
400 int i, ret = 0;
401
402 if (cache == NULL || cache->contents == NULL)
403 return 0;
404 cache->deletions_count = 0;
405 do
406 {
407 struct node_IP_time *result;
408 const char *ip = client->connection.ip;
409 time_t now = client->worker->current_time.tv_sec;
410
411 cache->file_recheck = now;
412 if (avl_get_by_key (cache->contents, (char*)ip, (void*)&result) == 0)
413 {
414 ret = 1;
415 if (result->a.timeout > now)
416 {
417 DEBUG1 ("skipping intro for %s", result->ip);
418 client->intro_offset = -1;
419 break;
420 }
421 DEBUG1 ("found intro skip entry for %s, refreshing", result->ip);
422 result->a.timeout = now + allow;
423 }
424 } while (0);
425
426 for (i = 0; i < cache->deletions_count; ++i)
427 {
428 struct node_IP_time *to_go = cache->deletions[i];
429
430 INFO1 ("removing %s from intro list", &(to_go->ip[0]));
431 avl_delete (cache->contents, &(to_go->ip[0]), cached_treenode_free);
432 }
433 return ret;
434 }
435
436
source_convert_qvalue(source_t * source,uint32_t value)437 uint32_t source_convert_qvalue (source_t *source, uint32_t value)
438 {
439 if (value & 0x80000000)
440 { // so in secs;
441 value &= ~0x80000000;
442 return source->incoming_rate * value;
443 }
444 return value;
445 }
446
447
448 /* Update stats from source processing, this should be called regulary (every
449 * few seconds) to keep totals up to date.
450 */
update_source_stats(source_t * source)451 static void update_source_stats (source_t *source)
452 {
453 unsigned long incoming_rate = (long)rate_avg (source->in_bitrate);
454 unsigned long kbytes_sent = (source->format->sent_bytes - source->bytes_sent_at_update)/1024;
455 unsigned long kbytes_read = source->bytes_read_since_update/1024;
456
457 stats_lock (source->stats, source->mount);
458 stats_set_args (source->stats, "outgoing_kbitrate", "%ld",
459 (long)(8 * rate_avg (source->out_bitrate))/1024);
460 stats_set_args (source->stats, "incoming_bitrate", "%ld", (8 * incoming_rate));
461 stats_set_args (source->stats, "total_bytes_read", "%"PRIu64, source->format->read_bytes);
462 stats_set_args (source->stats, "total_bytes_sent", "%"PRIu64, source->format->sent_bytes);
463 stats_set_args (source->stats, "total_mbytes_sent",
464 "%"PRIu64, source->format->sent_bytes/(1024*1024));
465 stats_set_args (source->stats, "queue_size", "%u", source->queue_size);
466 if (source->client->connection.con_time)
467 {
468 worker_t *worker = source->client->worker;
469 stats_set_args (source->stats, "connected", "%"PRIu64,
470 (uint64_t)(worker->current_time.tv_sec - source->client->connection.con_time));
471 }
472 stats_release (source->stats);
473 stats_event_add (NULL, "stream_kbytes_sent", kbytes_sent);
474 stats_event_add (NULL, "stream_kbytes_read", kbytes_read);
475 if (incoming_rate)
476 {
477 int log = 0;
478 uint32_t qlen = (float)source_convert_qvalue (source, source->queue_len_value);
479 if (qlen > 0)
480 {
481 float ratio = source->queue_size_limit / (float)qlen;
482 if (ratio < 0.85 || ratio > 1.15)
483 log = 1; // sizeable change in result so log it
484 }
485 source->queue_size_limit = qlen;
486 source->min_queue_size = source_convert_qvalue (source, source->min_queue_len_value);
487 source->default_burst_size = source_convert_qvalue (source, source->default_burst_value);
488 //DEBUG3 ("%s, burst %d, %d", source->mount, (source->default_burst_value&(1<<31))?1:0, source->default_burst_value&(~(1<<31)));
489
490 // sanity checks
491 if (source->default_burst_size > 50000000)
492 source->default_burst_size = 100000;
493 if (source->queue_size_limit > 1000000000)
494 source->queue_size_limit = 1000000;
495 if (source->min_queue_size > 50000000 || source->min_queue_size < source->default_burst_size)
496 source->min_queue_size = source->default_burst_size;
497 if (source->min_queue_size + (incoming_rate<<2) > source->queue_size_limit)
498 {
499 source->queue_size_limit = source->min_queue_size + (incoming_rate<<2);
500 INFO1 ("Adjusting queue size limit higher to allow for a minimum on %s", source->mount);
501 source->queue_len_value = source->queue_size_limit;
502 }
503
504 if (log)
505 {
506 DEBUG2 ("%s queue size set to %u", source->mount, source->queue_size_limit);
507 DEBUG2 ("%s min queue size set to %u", source->mount, source->min_queue_size);
508 DEBUG2 ("%s burst size set to %u", source->mount, source->default_burst_size);
509 }
510 }
511
512 source->bytes_sent_at_update = source->format->sent_bytes;
513 source->bytes_read_since_update %= 1024;
514 source->listener_send_trigger = incoming_rate < 8000 ? 8000 : (8000 + (incoming_rate>>4));
515 if (incoming_rate)
516 source->incoming_adj = 2000000/incoming_rate;
517 else
518 source->incoming_adj = 20;
519 source->stats_interval = 5 + (global.sources >> 10);
520 }
521
522
source_add_queue_buffer(source_t * source,refbuf_t * r)523 void source_add_queue_buffer (source_t *source, refbuf_t *r)
524 {
525 source->bytes_read_since_update += r->len;
526
527 r->flags |= SOURCE_QUEUE_BLOCK;
528
529 /* append buffer to the in-flight data queue, */
530 if (source->stream_data == NULL)
531 {
532 mount_proxy *mountinfo = config_find_mount (config_get_config(), source->mount);
533 if (mountinfo)
534 {
535 source_set_intro (source, mountinfo->intro_filename);
536 source_set_override (mountinfo, source, source->format->type);
537 }
538 config_release_config();
539
540 source->stream_data = r;
541 source->min_queue_point = r;
542 source->min_queue_offset = 0;
543 }
544 if (source->stream_data_tail)
545 source->stream_data_tail->next = r;
546 source->buffer_count++;
547
548 source->stream_data_tail = r;
549 source->queue_size += r->len;
550 source->wakeup = 1;
551
552 /* move the starting point for new listeners */
553 source->min_queue_offset += r->len;
554
555 if ((source->buffer_count & 3) == 3)
556 source->incoming_rate = (long)rate_avg (source->in_bitrate);
557
558 /* save stream to file */
559 if (source->dumpfile && source->format->write_buf_to_file)
560 source->format->write_buf_to_file (source, r);
561
562 if (source->shrink_time == 0 && (source->buffer_count & 31) == 31)
563 {
564 // kick off timed response to find oldest buffer. Every so many buffers
565 source->shrink_pos = source->client->queue_pos - source->min_queue_offset;
566 source->shrink_time = source->client->worker->time_ms + 600;
567 }
568 }
569
570
571 /* get some data from the source. The stream data is placed in a refbuf
572 * and sent back, however NULL is also valid as in the case of a short
573 * timeout and there's no data pending.
574 */
source_read(source_t * source)575 int source_read (source_t *source)
576 {
577 client_t *client = source->client;
578 refbuf_t *refbuf = NULL;
579 int skip = 1, loop = 1;
580 time_t current = client->worker->current_time.tv_sec;
581 unsigned long queue_size_target = 0;
582 int fds = 0;
583
584 if (global.running != ICE_RUNNING)
585 source->flags &= ~SOURCE_RUNNING;
586 do
587 {
588 source->wakeup = 0;
589 client->schedule_ms = client->worker->time_ms;
590 if (source->flags & SOURCE_LISTENERS_SYNC)
591 {
592 if (source->termination_count > 0)
593 {
594 if (client->timer_start + 1000 < client->worker->time_ms)
595 {
596 source->flags &= ~(SOURCE_RUNNING|SOURCE_LISTENERS_SYNC);
597 WARN1 ("stopping %s as sync mode lasted too long", source->mount);
598 }
599 client->schedule_ms += 30;
600 return 0;
601 }
602 if (source->fallback.mount)
603 {
604 DEBUG1 ("listeners have now moved to %s", source->fallback.mount);
605 free (source->fallback.mount);
606 source->fallback.mount = NULL;
607 }
608 source->flags &= ~SOURCE_LISTENERS_SYNC;
609 }
610 rate_add (source->out_bitrate, 0, client->worker->time_ms);
611 global_add_bitrates (global.out_bitrate, 0, client->worker->time_ms);
612
613 if (source->prev_listeners != source->listeners)
614 {
615 INFO2("listener count on %s now %lu", source->mount, source->listeners);
616 source->prev_listeners = source->listeners;
617 stats_lock (source->stats, source->mount);
618 stats_set_args (source->stats, "listeners", "%lu", source->listeners);
619 if (source->listeners > source->peak_listeners)
620 {
621 source->peak_listeners = source->listeners;
622 stats_set_args (source->stats, "listener_peak", "%lu", source->peak_listeners);
623 }
624 stats_release (source->stats);
625 }
626 if (current >= source->client_stats_update)
627 {
628 update_source_stats (source);
629 if (current - client->connection.con_time < source->stats_interval)
630 source->client_stats_update = current + 1;
631 else
632 source->client_stats_update = current + source->stats_interval;
633 if (source_change_worker (source, client))
634 return 1;
635 }
636
637 fds = util_timed_wait_for_fd (client->connection.sock, 0);
638 if (fds < 0)
639 {
640 if (! sock_recoverable (sock_error()))
641 {
642 WARN0 ("Error while waiting on socket, Disconnecting source");
643 source->flags &= ~SOURCE_RUNNING;
644 return 0;
645 }
646 break;
647 }
648 if (fds == 0)
649 {
650 if (source->last_read + (time_t)3 == current)
651 WARN1 ("Nothing received on %s for 3 seconds", source->mount);
652 if (source->last_read + (time_t)source->timeout < current)
653 {
654 DEBUG3 ("last %ld, timeout %d, now %ld", (long)source->last_read,
655 source->timeout, (long)current);
656 WARN1 ("Disconnecting %s due to socket timeout", source->mount);
657 source->flags &= ~SOURCE_RUNNING;
658 source->flags |= SOURCE_TIMEOUT;
659 return 0;
660 }
661 source->skip_duration = (int)((source->skip_duration + 12) * 1.1);
662 if (source->skip_duration > 400)
663 source->skip_duration = 400;
664 break;
665 }
666
667 source->last_read = current;
668 unsigned int prev_qsize = source->queue_size;
669 do
670 {
671 refbuf = source->format->get_buffer (source);
672 if (refbuf)
673 source_add_queue_buffer (source, refbuf);
674
675 skip = 0;
676
677 if (client->connection.error)
678 {
679 INFO1 ("End of Stream %s", source->mount);
680 source->flags &= ~SOURCE_RUNNING;
681 return 0;
682 }
683 loop--;
684 } while (loop);
685
686 if (source->queue_size != prev_qsize)
687 {
688 uint64_t sync_off = source->min_queue_offset, off = sync_off;
689 refbuf_t *sync_point = source->min_queue_point, *ref = sync_point;
690 while (off > source->min_queue_size)
691 {
692 refbuf_t *to_release = ref;
693 if (to_release && to_release->next)
694 {
695 if (to_release->flags & SOURCE_BLOCK_SYNC)
696 {
697 sync_off = off;
698 sync_point = ref;
699 }
700 off -= to_release->len;
701 ref = to_release->next;
702 continue;
703 }
704 break;
705 }
706 source->min_queue_offset = sync_off;
707 source->min_queue_point = sync_point;
708 source->skip_duration = (long)(source->skip_duration * 0.9);
709 }
710
711 if (source->shrink_time)
712 {
713 if (source->shrink_time > client->worker->time_ms)
714 break; // not time yet to consider the purging point
715 queue_size_target = (source->client->queue_pos - source->shrink_pos);
716 source->shrink_pos = 0;
717 source->shrink_time = 0;
718 }
719 /* lets see if we have too much/little data in the queue */
720 if ((queue_size_target < source->min_queue_size) || (queue_size_target > source->queue_size_limit))
721 queue_size_target = (source->listeners) ? source->queue_size_limit : source->min_queue_size;
722
723 loop = 48 + (source->incoming_rate >> 13); // scale max on high bitrates
724 queue_size_target += 8000; // lets not be too tight to the limit
725 while (source->queue_size > queue_size_target && loop)
726 {
727 refbuf_t *to_go = source->stream_data;
728 if (to_go == NULL || to_go->next == NULL) // always leave at least one on the queue
729 break;
730 source->stream_data = to_go->next;
731 source->queue_size -= to_go->len;
732 if (source->min_queue_point == to_go)
733 {
734 // adjust min queue in line with expectations
735 source->min_queue_offset -= to_go->len;
736 source->min_queue_point = to_go->next;
737 }
738 to_go->next = NULL;
739 if (source->format->detach_queue_block)
740 source->format->detach_queue_block (source, to_go);
741 refbuf_release (to_go);
742 loop--;
743 }
744 } while (0);
745
746 if (skip)
747 client->schedule_ms += source->skip_duration;
748 return 0;
749 }
750
751
source_listeners_wakeup(source_t * source)752 void source_listeners_wakeup (source_t *source)
753 {
754 client_t *s = source->client;
755 avl_node *node = avl_get_first (source->clients);
756 while (node)
757 {
758 client_t *client = (client_t *)node->key;
759 if (s->schedule_ms + 100 < client->schedule_ms)
760 DEBUG2 ("listener on %s was ahead by %ld", source->mount, (long)(client->schedule_ms - s->schedule_ms));
761 client->schedule_ms = 0;
762 node = avl_get_next (node);
763 }
764 }
765
766
source_client_read(client_t * client)767 static int source_client_read (client_t *client)
768 {
769 source_t *source = client->shared_data;
770
771 if (source == NULL)
772 {
773 INFO1 ("source client from %s hijacked", client->connection.ip);
774 return -1;
775 }
776
777 thread_rwlock_wlock (&source->lock);
778 if (client->connection.discon.time &&
779 client->connection.discon.time <= client->worker->current_time.tv_sec)
780 {
781 source->flags &= ~SOURCE_RUNNING;
782 INFO1 ("streaming duration expired on %s", source->mount);
783 }
784 if (source_running (source))
785 {
786 if (source->limit_rate)
787 {
788 if (source->limit_rate < (8 * source->incoming_rate) && global.running == ICE_RUNNING)
789 {
790 rate_add (source->in_bitrate, 0, client->worker->current_time.tv_sec);
791 source->incoming_rate = (long)rate_avg (source->in_bitrate);
792 thread_rwlock_unlock (&source->lock);
793 client->schedule_ms += 310;
794 return 0;
795 }
796 }
797 if (source_read (source) > 0)
798 return 1;
799 if (source_running (source))
800 {
801 thread_rwlock_unlock (&source->lock);
802 return 0;
803 }
804 }
805 if ((source->flags & SOURCE_TERMINATING) == 0)
806 {
807 source_shutdown (source, 1);
808
809 if (source->wait_time == 0)
810 {
811 thread_rwlock_unlock (&source->lock);
812 drop_source_from_tree (source);
813 }
814 }
815
816 if (source->termination_count && source->termination_count <= (long)source->listeners)
817 {
818 if (client->timer_start + 1000 < client->worker->time_ms)
819 {
820 WARN2 ("%ld listeners still to process in terminating %s", source->termination_count, source->mount);
821 if (source->listeners != source->clients->length)
822 {
823 WARN3 ("source %s has inconsistent listeners (%ld, %u)", source->mount, source->listeners, source->clients->length);
824 source->listeners = source->clients->length;
825 }
826 source->flags &= ~SOURCE_TERMINATING;
827 }
828 else
829 DEBUG4 ("%p %s waiting (%lu, %lu)", source, source->mount, source->termination_count, source->listeners);
830 client->schedule_ms = client->worker->time_ms + 50;
831 }
832 else
833 {
834 if (source->listeners)
835 {
836 INFO1 ("listeners on terminating source %s, rechecking", source->mount);
837 source->termination_count = source->listeners;
838 client->timer_start = client->worker->time_ms;
839 source->flags &= ~SOURCE_PAUSE_LISTENERS;
840 source->flags |= (SOURCE_TERMINATING|SOURCE_LISTENERS_SYNC);
841 source_listeners_wakeup (source);
842 thread_rwlock_unlock (&source->lock);
843 return 0;
844 }
845 free (source->fallback.mount);
846 source->fallback.mount = NULL;
847 source->flags &= ~SOURCE_LISTENERS_SYNC;
848 client->connection.discon.time = 0;
849 client->ops = &source_client_halt_ops;
850 global_lock();
851 global.sources--;
852 stats_event_args (NULL, "sources", "%d", global.sources);
853 global_unlock();
854 if (source->wait_time == 0 || global.running != ICE_RUNNING)
855 {
856 INFO1 ("no more listeners on %s", source->mount);
857 return -1; // don't unlock source as the release is called which requires it
858 }
859 /* set a wait time for leaving the source reserved */
860 client->connection.discon.time = client->worker->current_time.tv_sec + source->wait_time;
861 client->schedule_ms = client->worker->time_ms + (1000 * source->wait_time);
862 INFO2 ("listeners gone, keeping %s reserved for %ld seconds", source->mount, (long)source->wait_time);
863 }
864 thread_rwlock_unlock (&source->lock);
865 return 0;
866 }
867
868
source_add_bytes_sent(struct rate_calc * out_bitrate,unsigned long written,uint64_t milli,uint64_t * sent_bytes)869 void source_add_bytes_sent (struct rate_calc *out_bitrate, unsigned long written, uint64_t milli, uint64_t *sent_bytes)
870 {
871 rate_add_sum (out_bitrate, written, milli, sent_bytes);
872 global_add_bitrates (global.out_bitrate, written, milli);
873 }
874
875
source_queue_advance(client_t * client)876 static int source_queue_advance (client_t *client)
877 {
878 static unsigned char offset = 0;
879 unsigned long written = 0;
880 source_t *source = client->shared_data;
881 refbuf_t *refbuf;
882 uint64_t lag;
883
884 if (client->refbuf == NULL && locate_start_on_queue (source, client) < 0)
885 return -1;
886
887 lag = source->client->queue_pos - client->queue_pos;
888
889 if (client->flags & CLIENT_HAS_INTRO_CONTENT) abort(); // trap
890
891 if (lag == 0)
892 {
893 // most listeners will be through here, so a minor spread should limit a wave of sends
894 int ret = (offset & 31);
895 offset++; // this can be a race as it helps for randomizing
896 client->schedule_ms += 5 + ((source->incoming_adj>>1) + ret);
897 client->wakeup = &source->wakeup; // allow for quick wakeup
898 return -1;
899 }
900 client->wakeup = NULL;
901 if (lag > source->queue_size || (lag == source->queue_size && client->pos))
902 {
903 INFO4 ("Client %" PRIu64 " (%s) has fallen too far behind (%"PRIu64") on %s, removing",
904 client->connection.id, client->connection.ip, client->queue_pos, source->mount);
905 stats_lock (source->stats, source->mount);
906 stats_set_inc (source->stats, "slow_listeners");
907 stats_release (source->stats);
908 client->refbuf = NULL;
909 client->connection.error = 1;
910 return -1;
911 }
912 if ((lag+source->incoming_rate) > source->queue_size_limit && client->connection.error == 0)
913 {
914 // if the listener is really lagging but has been received a decent
915 // amount of data then allow a requeue, else allow the drop
916 if (client->counter > (source->queue_size_limit << 1))
917 {
918 const char *p = httpp_get_query_param (client->parser, "norequeue");
919 if (p == NULL)
920 {
921 // we may need to copy the complete frame for private use
922 if (client->pos < client->refbuf->len)
923 {
924 refbuf_t *copy = source->format->qblock_copy (client->refbuf);
925 client->refbuf = copy;
926 client->flags |= CLIENT_HAS_INTRO_CONTENT;
927 DEBUG2 ("client %s requeued copy on %s", client->connection.ip, source->mount);
928 }
929 else
930 {
931 client->refbuf = NULL;
932 client->pos = 0;
933 }
934 client->timer_start = 0;
935 client->check_buffer = http_source_introfile;
936 // do not be too eager to refill socket buffer
937 client->schedule_ms += source->incoming_rate < 16000 ? source->incoming_rate/16 : 800;
938 return -1;
939 }
940 }
941 }
942 int loop = 50;
943 while (--loop)
944 {
945 refbuf = client->refbuf;
946 if ((refbuf->flags & SOURCE_QUEUE_BLOCK) == 0 || refbuf->len > 66000) abort();
947
948 int ret = 0;
949
950 if (client->pos < refbuf->len)
951 ret = source->format->write_buf_to_client (client);
952 if (ret > 0)
953 written += ret;
954 if (client->pos >= refbuf->len)
955 {
956 if (refbuf->next)
957 {
958 client->refbuf = refbuf->next;
959 client->pos = 0;
960 continue;
961 }
962 }
963 break;
964 }
965 if (written)
966 source_add_bytes_sent (source->out_bitrate, written, client->worker->time_ms, &source->format->sent_bytes);
967 return -1;
968 }
969
970
locate_start_on_queue(source_t * source,client_t * client)971 static int locate_start_on_queue (source_t *source, client_t *client)
972 {
973 refbuf_t *refbuf;
974 long lag = 0;
975
976 /* we only want to attempt a burst at connection time, not midstream
977 * however streams like theora may not have the most recent page marked as
978 * a starting point, so look for one from the burst point */
979 if (client->connection.error || source->stream_data_tail == NULL)
980 return -1;
981 refbuf = source->stream_data_tail;
982 if (client->connection.sent_bytes > source->min_queue_offset && (refbuf->flags & SOURCE_BLOCK_SYNC))
983 {
984 lag = refbuf->len;
985 }
986 else
987 {
988 size_t size = source->min_queue_size;
989 uint32_t v = -1;
990 const char *param = httpp_getvar (client->parser, "initial-burst");
991
992 if (param)
993 config_qsizing_conv_a2n (param, &v);
994 else
995 {
996 param = httpp_get_query_param (client->parser, "burst");
997 if (param)
998 config_qsizing_conv_a2n (param, &v);
999 }
1000 if (param)
1001 {
1002 v = source_convert_qvalue (source, (uint32_t)v);
1003 DEBUG4 ("listener from %s (on %s) requested burst (%s, %u)", &client->connection.ip[0], source->mount, param, v);
1004 }
1005 else
1006 v = source->default_burst_size;
1007
1008 if (v > client->connection.sent_bytes)
1009 {
1010 v -= client->connection.sent_bytes; /* have we sent data already */
1011 refbuf = source->min_queue_point;
1012 lag = source->min_queue_offset;
1013 // DEBUG3 ("size %lld, v %lld, lag %ld", size, v, lag);
1014 while (size > v && refbuf && refbuf->next)
1015 {
1016 size -= refbuf->len;
1017 lag -= refbuf->len;
1018 refbuf = refbuf->next;
1019 }
1020 if (lag < 0)
1021 ERROR1 ("Odd, lag is negative %ld", lag);
1022 }
1023 else
1024 lag = refbuf->len;
1025 }
1026
1027 while (refbuf)
1028 {
1029 if (refbuf->flags & SOURCE_BLOCK_SYNC)
1030 {
1031 client_set_queue (client, NULL);
1032 client->refbuf = refbuf;
1033 client->intro_offset = -1;
1034 client->pos = 0;
1035 client->counter = 0;
1036 client->queue_pos = source->client->queue_pos - lag;
1037 client->flags &= ~CLIENT_HAS_INTRO_CONTENT;
1038 DEBUG4 ("%s Joining queue on %s (%"PRIu64 ", %"PRIu64 ")", &client->connection.ip[0], source->mount, source->client->queue_pos, client->queue_pos);
1039 return 0;
1040 }
1041 lag -= refbuf->len;
1042 refbuf = refbuf->next;
1043 }
1044 client->schedule_ms += 150;
1045 return -1;
1046 }
1047
source_preroll_logging(source_t * source,client_t * client)1048 static void source_preroll_logging (source_t *source, client_t *client)
1049 {
1050 if (source->intro_filename == NULL || client->intro_offset < 0 || (client->flags & CLIENT_HAS_INTRO_CONTENT))
1051 return; // content provided separately, auth or queue block copy
1052 if (source->preroll_log_id < 0)
1053 {
1054 ice_config_t *config = config_get_config();
1055 if (config->preroll_log.logid >= 0)
1056 logging_preroll (config->preroll_log.logid, source->intro_filename, client);
1057 config_release_config();
1058 }
1059 else
1060 logging_preroll (source->preroll_log_id, source->intro_filename, client);
1061 }
1062
1063
http_source_introfile(client_t * client)1064 static int http_source_introfile (client_t *client)
1065 {
1066 source_t *source = client->shared_data;
1067 long duration, rate = source->incoming_rate, incoming_rate;
1068 int assumed = 0;
1069
1070 //DEBUG2 ("client intro_pos is %ld, sent bytes is %ld", client->intro_offset, client->connection.sent_bytes);
1071 if (format_file_read (client, source->format, source->intro_file) < 0)
1072 {
1073 source_preroll_logging (source, client);
1074 if (source->stream_data_tail)
1075 {
1076 if (source->intro_skip_replay)
1077 listener_skips_intro (source->intro_ipcache, client, source->intro_skip_replay);
1078 /* better find the right place in queue for this client */
1079 if (source->format->detach_queue_block)
1080 source->format->detach_queue_block (source, client->refbuf); // in case of private queue
1081 client_set_queue (client, NULL);
1082 client->check_buffer = source_queue_advance;
1083 client->intro_offset = -1;
1084 return source_queue_advance (client);
1085 }
1086 client->schedule_ms += 100;
1087 client->intro_offset = source->intro_start; /* replay intro file */
1088 return -1;
1089 }
1090
1091 if (rate == 0) // stream must of just started, make an assumption, re-evaluate next time
1092 {
1093 rate = 32000;
1094 assumed = 1;
1095 }
1096 if (client->timer_start == 0)
1097 {
1098 long to_send = 0;
1099 if (client->connection.sent_bytes < source->default_burst_size)
1100 to_send = source->default_burst_size - client->connection.sent_bytes;
1101 duration = (long)((float)to_send / rate);
1102 client->aux_data = duration + 8;
1103 client->timer_start = client->worker->current_time.tv_sec - client->aux_data;
1104 client->counter = 8 * rate;
1105 }
1106 incoming_rate = rate;
1107 duration = (client->worker->current_time.tv_sec - client->timer_start);
1108 if (duration)
1109 rate = (long)((float)client->counter / duration);
1110 // DEBUG4 ("duration %lu, counter %ld, rate %ld, bytes %ld", duration, client->counter, rate, client->connection.sent_bytes);
1111
1112 if (assumed)
1113 client->timer_start = 0; // force a reset for next time around, ignore rate checks this time
1114
1115 if (rate > incoming_rate)
1116 {
1117 //DEBUG1 ("rate too high %lu, delaying", rate);
1118 client->schedule_ms += 40;
1119 rate_add (source->in_bitrate, 0, client->worker->current_time.tv_sec);
1120 return -1;
1121 }
1122 if (source->format->sent_bytes > (incoming_rate << 5) && // allow at least 30+ seconds on the stream
1123 (duration - client->aux_data) > 15 &&
1124 rate < (incoming_rate>>1))
1125 {
1126 INFO3 ("Dropped listener %s (%" PRIu64 "), running too slow on %s", &client->connection.ip[0], client->connection.id, source->mount);
1127 source_preroll_logging (source, client);
1128 client->connection.error = 1;
1129 client_set_queue (client, NULL);
1130 return -1; // assume a slow/stalled connection so drop
1131 }
1132
1133 int ret = source->format->write_buf_to_client (client);
1134 if (client->connection.error)
1135 source_preroll_logging (source, client);
1136 return ret;
1137 }
1138
1139
http_source_intro(client_t * client)1140 static int http_source_intro (client_t *client)
1141 {
1142 /* we only need to send the intro if nothing else has been sent */
1143 if (client->intro_offset < 0)
1144 {
1145 client_set_queue (client, NULL);
1146 client->check_buffer = source_queue_advance;
1147 return source_queue_advance (client);
1148 }
1149 source_t *source = client->shared_data;
1150 refbuf_t *n = client->refbuf ? client->refbuf->next : NULL;
1151 if (n)
1152 client->refbuf->next = NULL;
1153 refbuf_release (client->refbuf);
1154 client->refbuf = n;
1155 client->pos = 0;
1156 client->intro_offset = source->intro_start;
1157 client->check_buffer = http_source_introfile;
1158 return http_source_introfile (client);
1159 }
1160
1161
http_source_listener(client_t * client)1162 static int http_source_listener (client_t *client)
1163 {
1164 refbuf_t *refbuf = client->refbuf;
1165 source_t *source = client->shared_data;
1166 int ret;
1167
1168 if (refbuf == NULL || client->pos == refbuf->len)
1169 {
1170 client->check_buffer = http_source_intro;
1171 return http_source_intro (client);
1172 }
1173 if (source->queue_size == 0)
1174 {
1175 client->schedule_ms += 500;
1176 return -1; /* postpone processing until data on queue */
1177 }
1178
1179 if (client->respcode == 0)
1180 {
1181 int (*build_headers)(format_plugin_t *, client_t *) = format_general_headers;
1182
1183 if (source_running (source) == 0)
1184 {
1185 client->schedule_ms += 200;
1186 return -1;
1187 }
1188 if (source->format->create_client_data)
1189 build_headers = source->format->create_client_data;
1190
1191 refbuf->len = 0;
1192 if (build_headers (source->format, client) < 0)
1193 {
1194 ERROR1 ("internal problem, dropping client %" PRIu64, client->connection.id);
1195 return -1;
1196 }
1197 stats_lock (source->stats, source->mount);
1198 stats_set_inc (source->stats, "listener_connections");
1199 stats_release (source->stats);
1200 }
1201 ret = format_generic_write_to_client (client);
1202 if (client->pos == refbuf->len)
1203 {
1204 if (client->flags & CLIENT_AUTHENTICATED)
1205 {
1206 client->check_buffer = http_source_intro;
1207 client->connection.sent_bytes = 0;
1208 return ret;
1209 }
1210 client->connection.error = 1;
1211 return -1;
1212 }
1213 client->schedule_ms += 200;
1214 return ret;
1215 }
1216
1217
1218 // detach client from the source, enter with lock (probably read) and exit with write lock.
source_listener_detach(source_t * source,client_t * client)1219 void source_listener_detach (source_t *source, client_t *client)
1220 {
1221 client->wakeup = NULL;
1222 if (client->check_buffer != http_source_listener) // not in http headers
1223 {
1224 refbuf_t *ref = client->refbuf;
1225
1226 if (ref)
1227 {
1228 if (ref->flags & REFBUF_SHARED) // on the queue
1229 {
1230 if (client->connection.error == 0 && client->pos < ref->len && source->fallback.mount)
1231 {
1232 /* make a private copy so that a write can complete later */
1233 refbuf_t *copy = source->format->qblock_copy (client->refbuf);
1234
1235 client->refbuf = copy;
1236 client->flags |= CLIENT_HAS_INTRO_CONTENT;
1237 }
1238 else
1239 client->refbuf = NULL;
1240 }
1241 }
1242 client->check_buffer = source->format->write_buf_to_client;
1243 }
1244 else
1245 client->check_buffer = NULL;
1246 thread_rwlock_unlock (&source->lock); // read lock in use!
1247 thread_rwlock_wlock (&source->lock);
1248 avl_delete (source->clients, client, NULL);
1249 }
1250
1251
1252 /* used to hold listeners in waiting over a relay restart. Handling of a failed relay also
1253 * needs to occur.
1254 */
wait_for_restart(client_t * client)1255 static int wait_for_restart (client_t *client)
1256 {
1257 source_t *source = client->shared_data;
1258
1259 if (client->timer_start && client->worker->current_time.tv_sec - client->timer_start > 15)
1260 {
1261 INFO1 ("Dropping listener, stuck in %s too long", source->mount);
1262 client->connection.error = 1; // in here too long, drop client
1263 }
1264
1265 if (source_running (source) || client->connection.error ||
1266 (source->flags & SOURCE_PAUSE_LISTENERS) == 0 ||
1267 (source->flags & (SOURCE_TERMINATING|SOURCE_LISTENERS_SYNC)))
1268 {
1269 client->ops = &listener_client_ops;
1270 return 0;
1271 }
1272
1273 if (source->flags & SOURCE_LISTENERS_SYNC)
1274 client->schedule_ms = client->worker->time_ms + 100;
1275 else
1276 client->schedule_ms = client->worker->time_ms + 300;
1277 return 0;
1278 }
1279
1280
1281 /* used to hold listeners that have already been processed while other listeners
1282 * are still to be done
1283 */
wait_for_other_listeners(client_t * client)1284 static int wait_for_other_listeners (client_t *client)
1285 {
1286 source_t *source = client->shared_data;
1287
1288 if ((source->flags & (SOURCE_TERMINATING|SOURCE_LISTENERS_SYNC)) == SOURCE_LISTENERS_SYNC)
1289 {
1290 client->schedule_ms = client->worker->time_ms + 150;
1291 return 0;
1292 }
1293 client->ops = &listener_client_ops;
1294 return 0;
1295 }
1296
1297
1298 /* general send routine per listener.
1299 */
send_to_listener(client_t * client)1300 static int send_to_listener (client_t *client)
1301 {
1302 source_t *source = client->shared_data;
1303 int ret;
1304
1305 if (source == NULL)
1306 return -1;
1307 if (thread_rwlock_tryrlock (&source->lock) != 0)
1308 {
1309 client->schedule_ms = client->worker->time_ms + 4;
1310 return 0; // probably busy, check next client, come back to this
1311 }
1312 ret = send_listener (source, client);
1313 if (ret == 1)
1314 return 1; // client moved, and source unlocked
1315 if (ret < 0)
1316 ret = source_listener_release (source, client);
1317 thread_rwlock_unlock (&source->lock);
1318 return ret;
1319 }
1320
1321
listener_waiting_on_source(source_t * source,client_t * client)1322 int listener_waiting_on_source (source_t *source, client_t *client)
1323 {
1324 int read_lock = 1, ret = 0;
1325 while (1)
1326 {
1327 if (client->connection.error)
1328 {
1329 source_listener_detach (source, client); // return with write lock
1330 read_lock = 0; // skip the possible reacquiring of the lock later.
1331 source->listeners--;
1332 client->shared_data = NULL;
1333 ret = -1;
1334 break;
1335 }
1336 if (source->fallback.mount)
1337 {
1338 source_listener_detach (source, client);
1339 source->listeners--;
1340 thread_rwlock_unlock (&source->lock);
1341 client->shared_data = NULL;
1342 ret = move_listener (client, &source->fallback);
1343 thread_rwlock_wlock (&source->lock);
1344 if (ret <= 0)
1345 {
1346 source->termination_count--;
1347 return ret;
1348 }
1349 read_lock = 0;
1350 source->listeners++;
1351 source_setup_listener (source, client);
1352 ret = 0;
1353 }
1354 if (source->flags & SOURCE_TERMINATING)
1355 {
1356 if ((source->flags & SOURCE_PAUSE_LISTENERS) && global.running == ICE_RUNNING)
1357 {
1358 if (client->refbuf && (client->refbuf->flags & SOURCE_QUEUE_BLOCK))
1359 client->refbuf = NULL;
1360 client->ops = &listener_pause_ops;
1361 client->flags |= CLIENT_HAS_MOVED;
1362 client->schedule_ms = client->worker->time_ms + 60;
1363 client->timer_start = client->worker->current_time.tv_sec;
1364 break;
1365 }
1366 ret = -1;
1367 break;
1368 }
1369 client->ops = &listener_wait_ops;
1370 client->schedule_ms = client->worker->time_ms + 100;
1371 break;
1372 }
1373 if (read_lock) // acquire write lock if still with read lock
1374 {
1375 thread_rwlock_unlock (&source->lock);
1376 thread_rwlock_wlock (&source->lock);
1377 }
1378 source->termination_count--;
1379 return ret;
1380 }
1381
1382
send_listener(source_t * source,client_t * client)1383 static int send_listener (source_t *source, client_t *client)
1384 {
1385 int bytes;
1386 int loop = 40; /* max number of iterations in one go */
1387 long total_written = 0, limiter = source->listener_send_trigger;
1388 int ret = 0, lag;
1389 worker_t *worker = client->worker;
1390 time_t now = worker->current_time.tv_sec;
1391
1392 client->schedule_ms = worker->time_ms;
1393
1394 if (source->flags & SOURCE_LISTENERS_SYNC)
1395 return listener_waiting_on_source (source, client);
1396
1397 /* check for limited listener time */
1398 if (client->flags & CLIENT_RANGE_END)
1399 {
1400 if (client->connection.discon.offset <= client->connection.sent_bytes)
1401 return -1;
1402 }
1403 else if (client->connection.discon.time && now >= client->connection.discon.time)
1404 {
1405 INFO1 ("time limit reached for client #%" PRIu64, client->connection.id);
1406 client->connection.error = 1;
1407 return -1;
1408 }
1409 if (source_running (source) == 0)
1410 {
1411 DEBUG0 ("source not running, listener will wait");
1412 client->schedule_ms += 100;
1413 return 0;
1414 }
1415
1416 // do we migrate this listener to the same handler as the source client
1417 if (listener_change_worker (client, source))
1418 return 1;
1419
1420 lag = source->client->queue_pos - client->queue_pos;
1421
1422 /* progessive slowdown if nearing max bandwidth. */
1423 if (global.max_rate)
1424 {
1425 if (throttle_sends > 2) /* exceeded limit, skip */
1426 {
1427 client->schedule_ms += 40 + (client->throttle * 3);
1428 return 0;
1429 }
1430 if (throttle_sends > 1) /* slow down any multiple sends */
1431 {
1432 loop = 3;
1433 client->schedule_ms += (client->throttle * 4);
1434 }
1435 if (throttle_sends > 0)
1436 {
1437 /* make lagging listeners, lag further on high server bandwidth use */
1438 if (lag > (source->incoming_rate*2))
1439 client->schedule_ms += 100 + (client->throttle * 3);
1440 }
1441 }
1442 // set between 1 and 25
1443 client->throttle = source->incoming_adj > 25 ? 25 : (source->incoming_adj > 0 ? source->incoming_adj : 1);
1444 while (1)
1445 {
1446 /* lets not send too much to one client in one go, but don't
1447 sleep for too long if more data can be sent */
1448 if (loop == 0 || total_written > limiter)
1449 {
1450 client->schedule_ms += 25;
1451 break;
1452 }
1453 bytes = client->check_buffer (client);
1454 if (bytes < 0)
1455 {
1456 if (client->connection.error || (total_written == 0 && connection_unreadable (&client->connection)))
1457 {
1458 ret = -1;
1459 break;
1460 }
1461 client->schedule_ms += 15;
1462 break; /* can't write any more */
1463 }
1464
1465 total_written += bytes;
1466 loop--;
1467 }
1468 if (total_written)
1469 {
1470 rate_add_sum (source->out_bitrate, total_written, worker->time_ms, &source->format->sent_bytes);
1471 global_add_bitrates (global.out_bitrate, total_written, worker->time_ms);
1472 }
1473
1474 if (source->shrink_time && client->connection.error == 0)
1475 {
1476 lag = source->client->queue_pos - client->queue_pos;
1477 if (lag > source->queue_size_limit)
1478 lag = source->queue_size_limit; // impose a higher lag value
1479 thread_spin_lock (&source->shrink_lock);
1480 if (client->queue_pos < source->shrink_pos)
1481 source->shrink_pos = source->client->queue_pos - lag;
1482 thread_spin_unlock (&source->shrink_lock);
1483 }
1484 return ret;
1485 }
1486
1487
1488 /* Perform any initialisation before the stream data is processed, the header
1489 * info is processed by now and the format details are setup
1490 */
source_init(source_t * source)1491 void source_init (source_t *source)
1492 {
1493 mount_proxy *mountinfo;
1494
1495 if (source->dumpfilename != NULL)
1496 {
1497 unsigned int len;
1498 char buffer [4096];
1499
1500 len = sizeof buffer;
1501 if (util_expand_pattern (source->mount, source->dumpfilename, buffer, &len) == 0)
1502 {
1503 INFO2 ("dumpfile \"%s\" for %s", buffer, source->mount);
1504 source->dumpfile = fopen (buffer, "ab");
1505 if (source->dumpfile == NULL)
1506 {
1507 WARN2("Cannot open dump file \"%s\" for appending: %s, disabling.",
1508 buffer, strerror(errno));
1509 }
1510 }
1511 }
1512
1513 /* start off the statistics */
1514 stats_event_inc (NULL, "source_total_connections");
1515 source->stats = stats_lock (source->stats, source->mount);
1516 stats_set_flags (source->stats, "slow_listeners", "0", STATS_COUNTERS);
1517 stats_set (source->stats, "server_type", source->format->contenttype);
1518 stats_set_flags (source->stats, "listener_peak", "0", STATS_COUNTERS);
1519 stats_set_args (source->stats, "listener_peak", "%lu", source->peak_listeners);
1520 stats_set_flags (source->stats, "listener_connections", "0", STATS_COUNTERS);
1521 stats_set_time (source->stats, "stream_start", STATS_COUNTERS, source->client->worker->current_time.tv_sec);
1522 stats_set_flags (source->stats, "total_mbytes_sent", "0", STATS_COUNTERS);
1523 stats_set_flags (source->stats, "total_bytes_sent", "0", STATS_COUNTERS);
1524 stats_set_flags (source->stats, "total_bytes_read", "0", STATS_COUNTERS);
1525 stats_set_flags (source->stats, "outgoing_kbitrate", "0", STATS_COUNTERS);
1526 stats_set_flags (source->stats, "incoming_bitrate", "0", STATS_COUNTERS);
1527 stats_set_flags (source->stats, "queue_size", "0", STATS_COUNTERS);
1528 stats_set_flags (source->stats, "connected", "0", STATS_COUNTERS);
1529 stats_set_flags (source->stats, "source_ip", source->client->connection.ip, STATS_COUNTERS);
1530
1531 source->last_read = time(NULL);
1532 source->prev_listeners = -1;
1533 source->bytes_sent_at_update = 0;
1534 source->stats_interval = 5;
1535 /* so the first set of average stats after 4 seconds */
1536 source->client_stats_update = source->last_read + 4;
1537 source->skip_duration = 40;
1538 source->buffer_count = 0;
1539 source->queue_size_limit = 200000000; // initial sizing
1540 source->default_burst_size = 300000;
1541 source->min_queue_size = 600000;
1542
1543 util_dict_free (source->audio_info);
1544 source->audio_info = util_dict_new();
1545 if (source->client)
1546 {
1547 const char *str = httpp_getvar(source->client->parser, "ice-audio-info");
1548 if (str)
1549 {
1550 _parse_audio_info (source, str);
1551 stats_set_flags (source->stats, "audio_info", str, STATS_GENERAL);
1552 }
1553 source->client->queue_pos = 0;
1554 }
1555 stats_release (source->stats);
1556 rate_free (source->in_bitrate);
1557 source->in_bitrate = rate_setup (60, 1);
1558 rate_free (source->out_bitrate);
1559 source->out_bitrate = rate_setup (9000, 1000);
1560
1561 source->flags |= SOURCE_RUNNING;
1562
1563 mountinfo = config_find_mount (config_get_config(), source->mount);
1564 if (mountinfo)
1565 {
1566 if (mountinfo->max_stream_duration)
1567 source->client->connection.discon.time = source->client->worker->current_time.tv_sec + mountinfo->max_stream_duration;
1568 if (mountinfo->on_connect)
1569 source_run_script (mountinfo->on_connect, source->mount);
1570 auth_stream_start (mountinfo, source);
1571 }
1572 config_release_config();
1573
1574 INFO1 ("Source %s initialised", source->mount);
1575
1576 /* on demand relays should of already called this */
1577 if ((source->flags & SOURCE_ON_DEMAND) == 0)
1578 slave_update_mounts();
1579 source->flags &= ~SOURCE_ON_DEMAND;
1580 }
1581
1582
source_set_override(mount_proxy * mountinfo,source_t * dest_source,format_type_t type)1583 static int source_set_override (mount_proxy *mountinfo, source_t *dest_source, format_type_t type)
1584 {
1585 source_t *source;
1586 const char *dest = dest_source->mount;
1587 int ret = 0, loop = 15;
1588 ice_config_t *config = config_get_config_unlocked();
1589 unsigned int len;
1590 char *mount = dest_source->mount, buffer [4096];
1591
1592 if (mountinfo == NULL || mountinfo->fallback_mount == NULL || mountinfo->fallback_override == 0)
1593 {
1594 INFO1 ("no override for %s set", dest_source->mount);
1595 return 0;
1596 }
1597 INFO2 ("for %s set to %s", dest_source->mount, mountinfo->fallback_mount);
1598 avl_tree_rlock (global.source_tree);
1599 while (loop--)
1600 {
1601 len = sizeof buffer;
1602 if (util_expand_pattern (mount, mountinfo->fallback_mount, buffer, &len) < 0)
1603 {
1604 avl_tree_unlock (global.source_tree);
1605 break;
1606 }
1607 mount = buffer;
1608
1609 DEBUG2 ("checking for %s on %s", mount, dest);
1610 source = source_find_mount_raw (mount);
1611 if (source)
1612 {
1613 if (strcmp (source->mount, dest) == 0) // back where we started, drop out
1614 {
1615 avl_tree_unlock (global.source_tree);
1616 break;
1617 }
1618 thread_rwlock_wlock (&source->lock);
1619 if (source_running (source))
1620 {
1621 avl_tree_unlock (global.source_tree);
1622 if (source->format->type == type)
1623 {
1624 if (source->listeners && source->fallback.mount == NULL)
1625 {
1626 source->fallback.limit = 0;
1627 source->fallback.mount = strdup (dest);
1628 source->fallback.flags = FS_FALLBACK;
1629 source->fallback.type = type;
1630 source->termination_count = source->listeners;
1631 source->client->timer_start = dest_source->client->worker->time_ms;
1632 source->flags |= SOURCE_LISTENERS_SYNC;
1633 source_listeners_wakeup (source);
1634 ret = 1;
1635 }
1636 }
1637 else
1638 ERROR4("%s (%d) and %s(%d) are different formats", dest, type, mount, source->format->type);
1639 thread_rwlock_unlock (&source->lock);
1640 break;
1641 }
1642 thread_rwlock_unlock (&source->lock);
1643 }
1644 mountinfo = config_find_mount (config, mount);
1645 if (mountinfo == NULL || mountinfo->fallback_mount == NULL || mountinfo->fallback_override == 0)
1646 {
1647 avl_tree_unlock (global.source_tree);
1648 if (mount)
1649 ret = fserve_set_override (mount, dest, type);
1650 break;
1651 }
1652 }
1653 return ret;
1654 }
1655
1656
source_set_fallback(source_t * source,const char * dest_mount)1657 void source_set_fallback (source_t *source, const char *dest_mount)
1658 {
1659 int rate = 0;
1660 client_t *client = source->client;
1661 time_t connected;
1662
1663 if (dest_mount == NULL)
1664 {
1665 INFO1 ("No fallback on %s", source->mount);
1666 return;
1667 }
1668 if (dest_mount[0] != '/')
1669 {
1670 WARN2 ("invalid fallback on \"%s\", ignoring \"%s\"", source->mount, dest_mount);
1671 return;
1672 }
1673 if (source->listeners == 0)
1674 {
1675 INFO2 ("fallback on %s to %s, but no listeners", source->mount, dest_mount);
1676 return;
1677 }
1678
1679 connected = client->worker->current_time.tv_sec - client->connection.con_time;
1680 if (connected > 40)
1681 {
1682 if (source->flags & SOURCE_TIMEOUT)
1683 rate = (int)rate_avg_shorten (source->in_bitrate, source->timeout);
1684 else
1685 rate = (int)rate_avg (source->in_bitrate);
1686 rate = (int)((rate / 1000) + 0.5) * 1000;
1687 }
1688 if (rate == 0 && source->limit_rate)
1689 rate = source->limit_rate;
1690
1691 source->fallback.mount = strdup (dest_mount);
1692 source->fallback.fallback = source->mount;
1693 source->fallback.flags = FS_FALLBACK;
1694 source->fallback.limit = rate;
1695 source->fallback.type = source->format->type;
1696 INFO4 ("fallback set on %s to %s(%d) with %ld listeners", source->mount, dest_mount,
1697 source->fallback.limit, source->listeners);
1698 }
1699
1700
source_set_intro(source_t * source,const char * file_pattern)1701 int source_set_intro (source_t *source, const char *file_pattern)
1702 {
1703 if (file_pattern == NULL || source == NULL)
1704 return -1;
1705
1706 ice_config_t *config = config_get_config_unlocked ();
1707 char buffer[4096];
1708 unsigned int len = sizeof buffer;
1709 int ret = snprintf (buffer, len, "%s" PATH_SEPARATOR, config->webroot_dir);
1710
1711 do
1712 {
1713 if (ret < 1 && ret >= len)
1714 break;
1715 len -= ret;
1716 if (util_expand_pattern (source->mount, file_pattern, buffer + ret, &len) < 0)
1717 break;
1718
1719 icefile_handle intro_file;
1720 if (file_open (&intro_file, buffer) < 0)
1721 {
1722 WARN3 ("Cannot open intro for %s \"%s\": %s", source->mount, buffer, strerror(errno));
1723 break;
1724 }
1725 format_check_t intro;
1726 intro.fd = intro_file;
1727 intro.desc = buffer;
1728 if (format_check_frames (&intro) < 0 || intro.type == FORMAT_TYPE_UNDEFINED)
1729 {
1730 WARN2 ("Failed to read intro for %s (%s)", source->mount, buffer);
1731 file_close (&intro_file);
1732 break;
1733 }
1734 if (intro.type != source->format->type)
1735 {
1736 WARN2 ("intro file seems to be a different format to %s (%s)", source->mount, buffer);
1737 file_close (&intro_file);
1738 break;
1739 }
1740 // maybe a bitrate check later.
1741 INFO3 ("intro file for %s is %s (%s)", source->mount, file_pattern, buffer);
1742 file_close (&source->intro_file);
1743 source->intro_file = intro_file;
1744 free (source->intro_filename);
1745 source->intro_filename = strdup (buffer + ret);
1746 return 0;
1747 } while (0);
1748 return -1;
1749 }
1750
1751
source_shutdown(source_t * source,int with_fallback)1752 void source_shutdown (source_t *source, int with_fallback)
1753 {
1754 mount_proxy *mountinfo;
1755 client_t *src_client = source->client;
1756
1757 INFO1("Source \"%s\" exiting", source->mount);
1758
1759 source->flags &= ~(SOURCE_ON_DEMAND);
1760 source->termination_count = source->listeners;
1761 source->client->timer_start = source->client->worker->time_ms;
1762 source->flags |= (SOURCE_TERMINATING | SOURCE_LISTENERS_SYNC);
1763 source_listeners_wakeup (source);
1764 mountinfo = config_find_mount (config_get_config(), source->mount);
1765 if (src_client->connection.con_time && src_client->parser)
1766 {
1767 /* only do these if source has been running */
1768 if (source->stats)
1769 update_source_stats (source);
1770 if (mountinfo)
1771 {
1772 if (mountinfo->on_disconnect)
1773 source_run_script (mountinfo->on_disconnect, source->mount);
1774 auth_stream_end (mountinfo, source);
1775 }
1776 }
1777 if (mountinfo && with_fallback && global.running == ICE_RUNNING)
1778 source_set_fallback (source, mountinfo->fallback_mount);
1779 source->flags &= ~(SOURCE_TIMEOUT);
1780 config_release_config();
1781 }
1782
1783
_parse_audio_info(source_t * source,const char * s)1784 static void _parse_audio_info (source_t *source, const char *s)
1785 {
1786 const char *start = s;
1787 unsigned int len;
1788
1789 while (start != NULL && *start != '\0')
1790 {
1791 if ((s = strchr (start, ';')) == NULL)
1792 len = strlen (start);
1793 else
1794 {
1795 len = (int)(s - start);
1796 s++; /* skip passed the ';' */
1797 }
1798 if (len)
1799 {
1800 char name[100], value[200];
1801 int n = sscanf (start, "%99[^=]=%199[^;\r\n]", name, value);
1802
1803 if (n == 2 && (strncmp (name, "ice-", 4) == 0 || strncmp (name, "bitrate=", 7) == 0))
1804 {
1805 char *esc = util_url_unescape (value);
1806 if (esc)
1807 {
1808 util_dict_set (source->audio_info, name, esc);
1809 stats_set_flags (source->stats, name, esc, STATS_COUNTERS);
1810 }
1811 free (esc);
1812 }
1813 }
1814 start = s;
1815 }
1816 }
1817
1818
compare_intro_ipcache(void * arg,void * a,void * b)1819 static int compare_intro_ipcache (void *arg, void *a, void *b)
1820 {
1821 struct node_IP_time *this = (struct node_IP_time *)a;
1822 struct node_IP_time *that = (struct node_IP_time *)b;
1823 int ret = strcmp (&this->ip[0], &that->ip[0]);
1824
1825 if (ret && that->a.timeout)
1826 {
1827 cache_file_contents *c = arg;
1828 time_t threshold = c->file_recheck;
1829
1830 if (c->deletions_count < 9 && that->a.timeout < threshold)
1831 {
1832 c->deletions [c->deletions_count] = that;
1833 c->deletions_count++;
1834 }
1835 }
1836 return ret;
1837 }
1838
1839
source_apply_preroll(mount_proxy * mountinfo,source_t * source)1840 int source_apply_preroll (mount_proxy *mountinfo, source_t *source)
1841 {
1842 do
1843 {
1844 if (mountinfo == NULL || mountinfo->preroll_log.name == NULL)
1845 break;
1846
1847 ice_config_t *config = config_get_config_unlocked ();
1848 struct error_log *preroll = &mountinfo->preroll_log;
1849 unsigned int len = 4096;
1850 int ret;
1851 char buffer [len];
1852
1853 ret = snprintf (buffer, len, "%s" PATH_SEPARATOR, config->log_dir);
1854 if (ret < 0 || ret >= len)
1855 break;
1856 len -= ret;
1857 if (util_expand_pattern (source->mount, mountinfo->preroll_log.name, buffer + ret, &len) < 0)
1858 break;
1859 if (source->preroll_log_id < 0)
1860 source->preroll_log_id = log_open (buffer);
1861 if (source->preroll_log_id < 0)
1862 break;
1863 INFO3 ("using pre-roll log file %s (%s) for %s", preroll->name, buffer, source->mount);
1864 // log_set_filename (source->preroll_log_id, buffer);
1865 long max_size = (preroll->size > 10000) ? preroll->size : config->preroll_log.size;
1866 log_set_trigger (source->preroll_log_id, max_size);
1867 log_set_reopen_after (source->preroll_log_id, preroll->duration);
1868 log_set_lines_kept (source->preroll_log_id, preroll->display);
1869 int archive = (preroll->archive == -1) ? config->preroll_log.archive : preroll->archive;
1870 log_set_archive_timestamp (source->preroll_log_id, archive);
1871 //DEBUG4 ("log %s, size %ld, duration %u, archive %d", preroll->name, max_size, preroll->duration, archive);
1872 log_reopen (source->preroll_log_id);
1873 return 0;
1874 } while (0);
1875
1876 log_close (source->preroll_log_id);
1877 source->preroll_log_id = -1;
1878 return -1;
1879 }
1880
1881
1882 /* Apply the mountinfo details to the source */
source_apply_mount(source_t * source,mount_proxy * mountinfo)1883 static void source_apply_mount (source_t *source, mount_proxy *mountinfo)
1884 {
1885 const char *str;
1886 int val;
1887 http_parser_t *parser = NULL;
1888
1889 if (mountinfo == NULL || strcmp (mountinfo->mountname, source->mount) == 0)
1890 INFO1 ("Applying mount information for \"%s\"", source->mount);
1891 else
1892 INFO2 ("Applying mount information for \"%s\" from \"%s\"",
1893 source->mount, mountinfo->mountname);
1894
1895 stats_set_args (source->stats, "listener_peak", "%lu", source->peak_listeners);
1896
1897 /* if a setting is available in the mount details then use it, else
1898 * check the parser details. */
1899
1900 if (source->client)
1901 parser = source->client->parser;
1902
1903 /* to be done before possible non-utf8 stats */
1904 if (source->format && source->format->apply_settings)
1905 source->format->apply_settings (source->format, mountinfo);
1906
1907 /* public */
1908 if (mountinfo && mountinfo->yp_public >= 0)
1909 val = mountinfo->yp_public;
1910 else
1911 {
1912 do {
1913 str = httpp_getvar (parser, "ice-public");
1914 if (str) break;
1915 str = httpp_getvar (parser, "icy-pub");
1916 if (str) break;
1917 str = httpp_getvar (parser, "x-audiocast-public");
1918 if (str) break;
1919 /* handle header from icecast v2 release */
1920 str = httpp_getvar (parser, "icy-public");
1921 if (str) break;
1922 str = source->yp_public > 0 ? "1" : "0";
1923 } while (0);
1924 val = atoi (str);
1925 }
1926 stats_set_args (source->stats, "public", "%d", val);
1927 if (source->yp_public != val)
1928 {
1929 DEBUG1 ("YP changed to %d", val);
1930 if (val)
1931 yp_add (source->mount);
1932 else
1933 yp_remove (source->mount);
1934 source->yp_public = val;
1935 }
1936
1937 /* stream name */
1938 if (mountinfo && mountinfo->stream_name)
1939 stats_set (source->stats, "server_name", mountinfo->stream_name);
1940 else
1941 {
1942 do {
1943 str = httpp_getvar (parser, "ice-name");
1944 if (str) break;
1945 str = httpp_getvar (parser, "icy-name");
1946 if (str) break;
1947 str = httpp_getvar (parser, "x-audiocast-name");
1948 if (str) break;
1949 str = "Unspecified name";
1950 } while (0);
1951 if (source->format)
1952 stats_set_conv (source->stats, "server_name", str, source->format->charset);
1953 }
1954
1955 /* stream description */
1956 if (mountinfo && mountinfo->stream_description)
1957 stats_set (source->stats, "server_description", mountinfo->stream_description);
1958 else
1959 {
1960 do {
1961 str = httpp_getvar (parser, "ice-description");
1962 if (str) break;
1963 str = httpp_getvar (parser, "icy-description");
1964 if (str) break;
1965 str = httpp_getvar (parser, "x-audiocast-description");
1966 if (str) break;
1967 } while (0);
1968 if (str && source->format)
1969 stats_set_conv (source->stats, "server_description", str, source->format->charset);
1970 }
1971
1972 /* stream URL */
1973 if (mountinfo && mountinfo->stream_url)
1974 stats_set (source->stats, "server_url", mountinfo->stream_url);
1975 else
1976 {
1977 do {
1978 str = httpp_getvar (parser, "ice-url");
1979 if (str) break;
1980 str = httpp_getvar (parser, "icy-url");
1981 if (str) break;
1982 str = httpp_getvar (parser, "x-audiocast-url");
1983 if (str) break;
1984 } while (0);
1985 if (str && source->format)
1986 stats_set_conv (source->stats, "server_url", str, source->format->charset);
1987 }
1988
1989 /* stream genre */
1990 if (mountinfo && mountinfo->stream_genre)
1991 stats_set (source->stats, "genre", mountinfo->stream_genre);
1992 else
1993 {
1994 do {
1995 str = httpp_getvar (parser, "ice-genre");
1996 if (str) break;
1997 str = httpp_getvar (parser, "icy-genre");
1998 if (str) break;
1999 str = httpp_getvar (parser, "x-audiocast-genre");
2000 if (str) break;
2001 str = "various";
2002 } while (0);
2003 if (source->format)
2004 stats_set_conv (source->stats, "genre", str, source->format->charset);
2005 }
2006
2007 /* stream bitrate */
2008 if (mountinfo && mountinfo->bitrate)
2009 {
2010 str = mountinfo->bitrate;
2011 stats_set (source->stats, "bitrate", str);
2012 }
2013 else
2014 {
2015 do {
2016 str = httpp_getvar (parser, "ice-bitrate");
2017 if (str) break;
2018 str = httpp_getvar (parser, "icy-br");
2019 if (str) break;
2020 str = httpp_getvar (parser, "x-audiocast-bitrate");
2021 } while (0);
2022 if (str)
2023 stats_set (source->stats, "bitrate", str);
2024 }
2025
2026 /* handle MIME-type */
2027 if (mountinfo && mountinfo->type)
2028 {
2029 if (source->format)
2030 {
2031 format_type_t type = format_get_type (mountinfo->type);
2032 if (type == FORMAT_TYPE_UNDEFINED)
2033 WARN2 ("type specified for %s is unrecognised (%s)", source->mount, mountinfo->type);
2034 else
2035 source->format->type = format_get_type (mountinfo->type);
2036 free (source->format->contenttype);
2037 source->format->contenttype = strdup (mountinfo->type);
2038 }
2039 stats_set (source->stats, "server_type", mountinfo->type);
2040 }
2041 else
2042 if (source->format && source->format->contenttype)
2043 stats_set (source->stats, "server_type", source->format->contenttype);
2044
2045 if (mountinfo && mountinfo->subtype)
2046 stats_set (source->stats, "subtype", mountinfo->subtype);
2047
2048 if (mountinfo && mountinfo->auth)
2049 stats_set (source->stats, "authenticator", mountinfo->auth->type);
2050 else
2051 stats_set (source->stats, "authenticator", NULL);
2052
2053 source->limit_rate = 0;
2054 if (mountinfo && mountinfo->limit_rate)
2055 source->limit_rate = mountinfo->limit_rate;
2056
2057 /* needs a better mechanism, probably via a client_t handle */
2058 free (source->dumpfilename);
2059 source->dumpfilename = NULL;
2060 if (mountinfo && mountinfo->dumpfile)
2061 {
2062 time_t now = time(NULL);
2063 struct tm local;
2064 char buffer[PATH_MAX];
2065
2066 localtime_r (&now, &local);
2067 if (strftime (buffer, sizeof (buffer), mountinfo->dumpfile, &local) == 0)
2068 {
2069 WARN3 ("had problem on %s expanding dumpfile %s (%s)", source->mount, mountinfo->dumpfile, strerror(errno));
2070 errno = 0;
2071 }
2072 else
2073 source->dumpfilename = strdup (buffer);
2074 }
2075 source_apply_preroll (mountinfo, source);
2076
2077 /* handle changes in intro file setting */
2078 file_close (&source->intro_file);
2079 free (source->intro_filename);
2080 source->intro_filename = NULL;
2081 cached_prune (source->intro_ipcache);
2082 free (source->intro_ipcache);
2083 source->intro_ipcache = NULL;
2084 source->intro_skip_replay = 0;
2085 if (mountinfo && mountinfo->intro_filename)
2086 {
2087 // only set here if there is data present, for type verification
2088 if (source->stream_data)
2089 source_set_intro (source, mountinfo->intro_filename);
2090
2091 if (mountinfo->intro_skip_replay)
2092 {
2093 cache_file_contents *c = calloc (1, sizeof (cache_file_contents));
2094
2095 source->intro_ipcache = c;
2096 c->contents = avl_tree_new (compare_intro_ipcache, c);
2097 source->intro_skip_replay = mountinfo->intro_skip_replay;
2098 }
2099 }
2100
2101 if (mountinfo && mountinfo->source_timeout)
2102 source->timeout = mountinfo->source_timeout;
2103
2104 if (mountinfo && mountinfo->queue_size_limit)
2105 source->queue_len_value = mountinfo->queue_size_limit;
2106
2107 if (mountinfo && mountinfo->burst_size)
2108 source->default_burst_value = (unsigned int)mountinfo->burst_size;
2109
2110 if (mountinfo && mountinfo->min_queue_size)
2111 source->min_queue_len_value = mountinfo->min_queue_size;
2112
2113 source->wait_time = 0;
2114 if (mountinfo && mountinfo->wait_time)
2115 source->wait_time = (time_t)mountinfo->wait_time;
2116 }
2117
2118
2119 /* update the specified source with details from the config or mount.
2120 * mountinfo can be NULL in which case default settings should be taken
2121 */
source_update_settings(ice_config_t * config,source_t * source,mount_proxy * mountinfo)2122 void source_update_settings (ice_config_t *config, source_t *source, mount_proxy *mountinfo)
2123 {
2124 char *listen_url;
2125 int len;
2126
2127 /* set global settings first */
2128 if (mountinfo == NULL)
2129 {
2130 source->queue_len_value = config->queue_size_limit;
2131 source->min_queue_len_value = config->min_queue_size;
2132 source->timeout = config->source_timeout;
2133 source->default_burst_value = config->burst_size;
2134 }
2135 stats_lock (source->stats, source->mount);
2136
2137 len = strlen (config->hostname) + strlen(source->mount) + 16;
2138 listen_url = alloca (len);
2139 snprintf (listen_url, len, "http://%s:%d%s", config->hostname, config->port, source->mount);
2140 stats_set_flags (source->stats, "listenurl", listen_url, STATS_COUNTERS);
2141
2142 source_apply_mount (source, mountinfo);
2143
2144 if (source->dumpfilename)
2145 DEBUG1 ("Dumping stream to %s", source->dumpfilename);
2146 if (source->flags & SOURCE_ON_DEMAND)
2147 {
2148 DEBUG0 ("on_demand set");
2149 stats_set (source->stats, "on_demand", "1");
2150 stats_set_args (source->stats, "listeners", "%ld", source->listeners);
2151 }
2152 else
2153 stats_set (source->stats, "on_demand", NULL);
2154
2155 if (mountinfo)
2156 {
2157 if (mountinfo->on_connect)
2158 DEBUG1 ("connect script \"%s\"", mountinfo->on_connect);
2159 if (mountinfo->on_disconnect)
2160 DEBUG1 ("disconnect script \"%s\"", mountinfo->on_disconnect);
2161 if (mountinfo->fallback_when_full)
2162 DEBUG1 ("fallback_when_full to %u", mountinfo->fallback_when_full);
2163 DEBUG1 ("max listeners to %d", mountinfo->max_listeners);
2164 stats_set_args (source->stats, "max_listeners", "%d", mountinfo->max_listeners);
2165 stats_set_flags (source->stats, "cluster_password", mountinfo->cluster_password, STATS_SLAVE|STATS_HIDDEN);
2166 if (mountinfo->hidden)
2167 {
2168 stats_set_flags (source->stats, NULL, NULL, STATS_HIDDEN);
2169 DEBUG0 ("hidden from public");
2170 }
2171 else
2172 stats_set_flags (source->stats, NULL, NULL, 0);
2173 }
2174 else
2175 {
2176 DEBUG0 ("max listeners is not specified");
2177 stats_set (source->stats, "max_listeners", "unlimited");
2178 stats_set_flags (source->stats, "cluster_password", NULL, STATS_SLAVE);
2179 stats_set_flags (source->stats, NULL, NULL, STATS_PUBLIC);
2180 }
2181 stats_release (source->stats);
2182 DEBUG1 ("public set to %d", source->yp_public);
2183 DEBUG1 ("source timeout to %u", source->timeout);
2184 }
2185
2186
source_client_callback(client_t * client)2187 static int source_client_callback (client_t *client)
2188 {
2189 const char *agent;
2190 source_t *source = client->shared_data;
2191
2192 stats_event_inc(NULL, "source_client_connections");
2193
2194 client->ops = &source_client_ops;
2195 if (source_running (source))
2196 stats_event_inc (NULL, "source_total_connections");
2197 else
2198 source_init (source);
2199 agent = httpp_getvar (source->client->parser, "user-agent");
2200 thread_rwlock_unlock (&source->lock);
2201 if (agent)
2202 {
2203 stats_lock (source->stats, source->mount);
2204 stats_set_flags (source->stats, "user_agent", agent, STATS_COUNTERS);
2205 stats_release (source->stats);
2206 }
2207 return 0;
2208 }
2209
2210
2211 #ifndef _WIN32
source_run_script(char * command,char * mountpoint)2212 static void source_run_script (char *command, char *mountpoint)
2213 {
2214 pid_t pid, external_pid;
2215 char *p, *comm;
2216 int wstatus;
2217
2218 comm = p = strdup (command);
2219 #ifdef HAVE_STRSEP
2220 strsep (&p, " \t");
2221 #else
2222 if (strchr (command, ' ')) // possible misconfiguration, but unlikely to occur.
2223 INFO1 ("arguments to command on %s not supported", mountpoint);
2224 #endif
2225 if (access (comm, X_OK) != 0)
2226 {
2227 ERROR3 ("Unable to run command %s on %s (%s)", comm, mountpoint, strerror (errno));
2228 free (comm);
2229 return;
2230 }
2231 DEBUG2 ("Starting command %s on %s", comm, mountpoint);
2232
2233 /* do a fork twice so that the command has init as parent */
2234 external_pid = fork();
2235 switch (external_pid)
2236 {
2237 case 0: // child, don't log from here.
2238 switch (pid = fork ())
2239 {
2240 case -1:
2241 break;
2242 case 0: /* child */
2243 #ifdef HAVE_STRSEP
2244 #define MAX_SCRIPT_ARGS 20
2245 {
2246 int i = 1;
2247 char *args [MAX_SCRIPT_ARGS+1], *tmp;
2248
2249 // default set unless overridden
2250 args[0] = comm;
2251 args[1] = mountpoint;
2252 args[2] = NULL;
2253 while (i < MAX_SCRIPT_ARGS && (tmp = strsep (&p, " \t")))
2254 {
2255 unsigned len = 4096;
2256 char *str = malloc (len);
2257 if (util_expand_pattern (mountpoint, tmp, str, &len) == 0)
2258 args[i] = str;
2259 i++;
2260 }
2261 close (0);
2262 close (1);
2263 close (2);
2264 execvp ((const char *)args[0], args);
2265 }
2266 #else
2267 close (0);
2268 close (1);
2269 close (2);
2270 execl (command, command, mountpoint, (char *)NULL);
2271 #endif
2272 exit(1);
2273 default: /* parent */
2274 break;
2275 }
2276 exit (0);
2277 case -1: // ok, in parent context, no lock clash.
2278 ERROR1 ("Unable to fork %s", strerror (errno));
2279 break;
2280 default: /* parent */
2281 do
2282 {
2283 if (waitpid (external_pid, &wstatus, 0) < 0)
2284 break;
2285 } while (WIFEXITED(wstatus) == 0 && WIFSIGNALED(wstatus) == 0);
2286 break;
2287 }
2288 free (comm);
2289 }
2290 #endif
2291
2292
2293 /* rescan the mount list, so that xsl files are updated to show
2294 * unconnected but active fallback mountpoints
2295 */
source_recheck_mounts(int update_all)2296 void source_recheck_mounts (int update_all)
2297 {
2298 ice_config_t *config;
2299 time_t mark = time (NULL);
2300 long count = 0;
2301
2302 avl_tree_rlock (global.source_tree);
2303
2304 if (update_all)
2305 {
2306 avl_node *node = avl_get_first (global.source_tree);
2307 while (node)
2308 {
2309 source_t *source = (source_t*)node->key;
2310
2311 thread_rwlock_wlock (&source->lock);
2312 config = config_get_config();
2313 if (source_available (source))
2314 source_update_settings (config, source, config_find_mount (config, source->mount));
2315 config_release_config();
2316 thread_rwlock_unlock (&source->lock);
2317 node = avl_get_next (node);
2318 }
2319 }
2320
2321 config = config_get_config();
2322 avl_node *node = avl_get_first (config->mounts_tree);
2323 while (node)
2324 {
2325 source_t *source;
2326 mount_proxy *mount = (mount_proxy*)node->key;
2327
2328 node = avl_get_next (node);
2329
2330 ++count;
2331 if ((count & 63) == 0) // lets give others access to this every so often
2332 {
2333 avl_tree_unlock (global.source_tree);
2334 avl_tree_rlock (global.source_tree);
2335 }
2336
2337 source = source_find_mount_raw (mount->mountname);
2338 if ((source == NULL || source_available (source) == 0) && mount->fallback_mount)
2339 {
2340 int count = -1;
2341 unsigned int len;
2342 char buffer [4096];
2343
2344 len = sizeof buffer;
2345 if (util_expand_pattern (mount->mountname, mount->fallback_mount, buffer, &len) == 0)
2346 count = fallback_count (config, buffer);
2347
2348 DEBUG2 ("fallback checking %s (fallback has %d)", mount->mountname, count);
2349 if (count >= 0)
2350 {
2351 stats_handle_t stats = stats_handle (mount->mountname);
2352 if (source == NULL) // mark for purge if there is no source at all
2353 stats_set_expire (stats, mark);
2354 stats_set_flags (stats, NULL, NULL, mount->hidden?STATS_HIDDEN:0);
2355 stats_set_args (stats, "listenurl", "http://%s:%d%s",
2356 config->hostname, config->port, mount->mountname);
2357 stats_set (stats, "listeners", "0");
2358 if (mount->max_listeners < 0)
2359 stats_set (stats, "max_listeners", "unlimited");
2360 else
2361 stats_set_args (stats, "max_listeners", "%d", mount->max_listeners);
2362 stats_release (stats);
2363 }
2364 }
2365 }
2366 stats_purge (mark);
2367 avl_tree_unlock (global.source_tree);
2368 config_release_config();
2369 }
2370
2371
2372 /* Check whether this listener is on this source. This is only called when
2373 * there is auth. This may flag an existing listener to terminate.
2374 * return 1 if ok to add or 0 to prevent
2375 */
check_duplicate_logins(const char * mount,avl_tree * tree,client_t * client,auth_t * auth)2376 int check_duplicate_logins (const char *mount, avl_tree *tree, client_t *client, auth_t *auth)
2377 {
2378 avl_node *node;
2379
2380 if (auth == NULL || (auth->flags & AUTH_ALLOW_LISTENER_DUP))
2381 return 1;
2382
2383 /* allow multiple authenticated relays */
2384 if (client->username == NULL || client->flags & CLIENT_IS_SLAVE)
2385 return 1;
2386
2387 node = avl_get_first (tree);
2388 while (node)
2389 {
2390 client_t *existing_client = (client_t *)node->key;
2391 if (existing_client->username &&
2392 strcmp (existing_client->username, client->username) == 0)
2393 {
2394 if (auth->flags & AUTH_DEL_EXISTING_LISTENER)
2395 {
2396 INFO2 ("Found %s on %s, dropping previous account", existing_client->username, mount);
2397 existing_client->connection.error = 1;
2398 return 1;
2399 }
2400 else
2401 return 0;
2402 }
2403 node = avl_get_next (node);
2404 }
2405 return 1;
2406 }
2407
2408
2409 /* source required to stay around for a short while
2410 */
source_client_shutdown(client_t * client)2411 static int source_client_shutdown (client_t *client)
2412 {
2413 if (global.running == ICE_RUNNING && client->connection.discon.time)
2414 {
2415 if (client->connection.discon.time >= client->worker->current_time.tv_sec)
2416 {
2417 client->schedule_ms = client->worker->time_ms + 50;
2418 return 0;
2419 }
2420 }
2421 drop_source_from_tree ((source_t*)client->shared_data);
2422 // source locked but exit function will want it locked
2423 return -1;
2424 }
2425
2426
2427 /* clean up what is left from the source. */
source_client_release(client_t * client)2428 void source_client_release (client_t *client)
2429 {
2430 source_t *source = client->shared_data;
2431
2432 source->flags &= ~(SOURCE_RUNNING|SOURCE_ON_DEMAND);
2433 client->flags &= ~CLIENT_AUTHENTICATED;
2434 /* log bytes read in access log */
2435 if (source->format)
2436 client->connection.sent_bytes = source->format->read_bytes;
2437
2438 _free_source (source);
2439 slave_update_mounts();
2440 client_destroy (client);
2441 global_reduce_bitrate_sampling (global.out_bitrate);
2442 }
2443
2444
source_listener_release(source_t * source,client_t * client)2445 static int source_listener_release (source_t *source, client_t *client)
2446 {
2447 int ret;
2448 ice_config_t *config;
2449 mount_proxy *mountinfo;
2450
2451 if (client->shared_data == source) // still attached to source?
2452 {
2453 while (1)
2454 {
2455 refbuf_t *r = client->refbuf;
2456 if (r == NULL || (r->flags & REFBUF_SHARED))
2457 break;
2458 client->refbuf = r->next;
2459 r->next = NULL;
2460 if (source->format->detach_queue_block)
2461 source->format->detach_queue_block (source, r);
2462 refbuf_release (r);
2463 }
2464 /* search through sources client list to find previous link in list */
2465 source_listener_detach (source, client);
2466 source->listeners--;
2467 client->shared_data = NULL;
2468 client_set_queue (client, NULL);
2469 if (source->listeners == 0)
2470 rate_reduce (source->out_bitrate, 1000);
2471 }
2472
2473 /* change of listener numbers, so reduce scope of global sampling */
2474 global_reduce_bitrate_sampling (global.out_bitrate);
2475 DEBUG2 ("Listener %" PRIu64 " leaving %s", client->connection.id, source->mount);
2476 // reduce from global count
2477 global_lock();
2478 global.listeners--;
2479 global_unlock();
2480
2481 config = config_get_config ();
2482 mountinfo = config_find_mount (config, source->mount);
2483
2484 if (mountinfo && mountinfo->access_log.name)
2485 logging_access_id (&mountinfo->access_log, client);
2486
2487 ret = auth_release_listener (client, source->mount, mountinfo);
2488 config_release_config();
2489 return ret;
2490 }
2491
2492
source_add_listener(const char * mount,mount_proxy * mountinfo,client_t * client)2493 int source_add_listener (const char *mount, mount_proxy *mountinfo, client_t *client)
2494 {
2495 int loop = 10, rate = 0, do_process = 0;
2496 int within_limits;
2497 source_t *source;
2498 mount_proxy *minfo = mountinfo;
2499 const char *passed_mount = mount;
2500 ice_config_t *config = config_get_config_unlocked();
2501 unsigned int len;
2502 char buffer[4096];
2503
2504 do
2505 {
2506 int64_t stream_bitrate = 0;
2507 int flags = 0;
2508
2509 do
2510 {
2511 if (loop == 0)
2512 {
2513 WARN0 ("preventing a fallback loop");
2514 return client_send_403 (client, "Fallback through too many mountpoints");
2515 }
2516 avl_tree_rlock (global.source_tree);
2517 source = source_find_mount_raw (mount);
2518 if (source)
2519 {
2520 thread_rwlock_wlock (&source->lock);
2521 if (source_available (source))
2522 break;
2523 thread_rwlock_unlock (&source->lock);
2524 }
2525 avl_tree_unlock (global.source_tree);
2526 if (minfo && minfo->limit_rate)
2527 rate = minfo->limit_rate/8;
2528 if (minfo == NULL || minfo->fallback_mount == NULL)
2529 {
2530 int ret = -2;
2531 if (rate == 0)
2532 if (sscanf (mount, "%*[^[][%d]", &rate) == 1)
2533 rate = rate * 1000 / 8;
2534 if (rate)
2535 {
2536 fbinfo f;
2537 f.flags = flags;
2538 f.mount = (char *)mount;
2539 f.fallback = NULL;
2540 f.limit = rate;
2541 f.type = FORMAT_TYPE_UNDEFINED;
2542 if (move_listener (client, &f) == 0)
2543 {
2544 /* source dead but fallback to file found */
2545 stats_event_inc (NULL, "listener_connections");
2546 return 0;
2547 }
2548 ret = -1;
2549 }
2550 return ret;
2551 }
2552 len = sizeof buffer;
2553 if (util_expand_pattern (mount, minfo->fallback_mount, buffer, &len) < 0)
2554 mount = minfo->fallback_mount;
2555 else
2556 mount = buffer;
2557 minfo = config_find_mount (config_get_config_unlocked(), mount);
2558 flags = FS_FALLBACK;
2559 loop--;
2560 } while (1);
2561
2562 /* ok, we found a source and it is locked */
2563 avl_tree_unlock (global.source_tree);
2564
2565 if (client->flags & CLIENT_IS_SLAVE)
2566 {
2567 INFO1 ("client %" PRIu64 " is from a slave, bypassing limits", client->connection.id);
2568 break;
2569 }
2570 if (source->format)
2571 {
2572 stream_bitrate = 8 * rate_avg (source->in_bitrate);
2573
2574 if (config->max_bandwidth)
2575 {
2576 int64_t global_rate = (int64_t)8 * global_getrate_avg (global.out_bitrate);
2577
2578 DEBUG1 ("server outgoing bitrate is %" PRId64, global_rate);
2579 if (global_rate + stream_bitrate > config->max_bandwidth)
2580 {
2581 thread_rwlock_unlock (&source->lock);
2582 INFO0 ("server-wide outgoing bandwidth limit reached");
2583 return client_send_403redirect (client, passed_mount, "server bandwidth reached");
2584 }
2585 }
2586 if ((client->flags & CLIENT_AUTHENTICATED) == 0 || httpp_getvar (client->parser, "range"))
2587 {
2588 int ret;
2589 int (*build_headers)(format_plugin_t *, client_t *) = format_general_headers;
2590
2591 if (source->format->create_client_data)
2592 build_headers = source->format->create_client_data;
2593
2594 client->refbuf->len = 0;
2595 ret = build_headers (source->format, client);
2596
2597 if (ret < 0)
2598 {
2599 thread_rwlock_unlock (&source->lock);
2600 return ret;
2601 }
2602 if (client->parser->req_type == httpp_req_head)
2603 break;
2604 if ((client->flags & CLIENT_HAS_INTRO_CONTENT) == 0 && client->refbuf->next)
2605 break;
2606 if ((source->flags & (SOURCE_RUNNING|SOURCE_ON_DEMAND)) == SOURCE_ON_DEMAND)
2607 {
2608 // inactive ondemand relay to kick off, reset client, try headers later
2609 client->respcode = 0;
2610 client->pos = 0;
2611 }
2612 stats_lock (source->stats, source->mount);
2613 stats_set_inc (source->stats, "listener_connections");
2614 stats_release (source->stats);
2615 }
2616 }
2617
2618 if (mountinfo == NULL)
2619 break; /* allow adding listeners, no mount limits imposed */
2620
2621 if (mountinfo->intro_skip_replay)
2622 listener_check_intro (source->intro_ipcache, client, mountinfo->intro_skip_replay);
2623
2624 if (check_duplicate_logins (source->mount, source->clients, client, mountinfo->auth) == 0)
2625 {
2626 thread_rwlock_unlock (&source->lock);
2627 return client_send_403 (client, "Account already in use");
2628 }
2629
2630 /* set a per-mount disconnect time if auth hasn't set one already */
2631 if (mountinfo->max_listener_duration && client->connection.discon.time == 0)
2632 client->connection.discon.time = time(NULL) + mountinfo->max_listener_duration;
2633
2634 INFO3 ("max on %s is %d (cur %lu)", source->mount,
2635 mountinfo->max_listeners, source->listeners);
2636 within_limits = 1;
2637 if (mountinfo->max_bandwidth > -1 && stream_bitrate)
2638 {
2639 DEBUG3 ("checking bandwidth limits for %s (%" PRId64 ", %" PRId64 ")",
2640 mountinfo->mountname, stream_bitrate, mountinfo->max_bandwidth);
2641 if ((source->listeners+1) * stream_bitrate > mountinfo->max_bandwidth)
2642 {
2643 INFO1 ("bandwidth limit reached on %s", source->mount);
2644 within_limits = 0;
2645 }
2646 }
2647 if (within_limits)
2648 {
2649 if (mountinfo->max_listeners == -1)
2650 break;
2651
2652 if (source->listeners < (unsigned long)mountinfo->max_listeners)
2653 break;
2654 INFO1 ("max listener count reached on %s", source->mount);
2655 }
2656 /* minfo starts off as mountinfo put cascades through fallbacks */
2657 if (minfo && minfo->fallback_when_full && minfo->fallback_mount)
2658 {
2659 thread_rwlock_unlock (&source->lock);
2660 len = sizeof buffer;
2661 if (util_expand_pattern (mount, minfo->fallback_mount, buffer, &len) < 0)
2662 mount = minfo->fallback_mount;
2663 else
2664 mount = buffer;
2665 INFO1 ("stream full trying %s", mount);
2666 loop--;
2667 continue;
2668 }
2669
2670 /* now we fail the client */
2671 thread_rwlock_unlock (&source->lock);
2672 return client_send_403redirect (client, passed_mount, "max listeners reached");
2673
2674 } while (1);
2675
2676 client->connection.sent_bytes = 0;
2677
2678 if ((client->flags & CLIENT_AUTHENTICATED) == 0)
2679 {
2680 thread_rwlock_unlock (&source->lock);
2681 return fserve_setup_client (client);
2682 }
2683
2684 if (client->respcode == 0)
2685 {
2686 client->refbuf->len = PER_CLIENT_REFBUF_SIZE;
2687 memset (client->refbuf->data, 0, PER_CLIENT_REFBUF_SIZE);
2688 }
2689
2690 global_lock();
2691 if (config->max_listeners)
2692 {
2693 if (config->max_listeners <= global.listeners)
2694 {
2695 global_unlock();
2696 thread_rwlock_unlock (&source->lock);
2697 return client_send_403redirect (client, passed_mount, "max listeners reached");
2698 }
2699 }
2700 global.listeners++;
2701 global_unlock();
2702
2703 httpp_deletevar (client->parser, "range");
2704 if (client->flags & CLIENT_RANGE_END)
2705 {
2706 // range given on a stream, impose a length limit
2707 if ((off_t)client->connection.discon.offset > client->intro_offset)
2708 {
2709 client->connection.discon.offset -= client->intro_offset;
2710 client->intro_offset = 0;
2711 }
2712 else
2713 {
2714 client->flags &= ~CLIENT_RANGE_END;
2715 }
2716 }
2717 source_setup_listener (source, client);
2718 source->listeners++;
2719
2720 if ((client->flags & CLIENT_ACTIVE) && (source->flags & SOURCE_RUNNING))
2721 do_process = 1;
2722 else
2723 {
2724 client->flags |= CLIENT_ACTIVE; // from an auth thread context
2725 worker_wakeup (client->worker);
2726 }
2727 thread_rwlock_unlock (&source->lock);
2728 global_reduce_bitrate_sampling (global.out_bitrate);
2729
2730 stats_event_inc (NULL, "listener_connections");
2731
2732 if (do_process) // send something back quickly
2733 return client->ops->process (client);
2734 return 0;
2735 }
2736
2737
2738 /* call with the source lock held, but expect the lock released on exit
2739 * as the listener may of changed threads and therefore lock needed to be
2740 * released
2741 */
source_setup_listener(source_t * source,client_t * client)2742 void source_setup_listener (source_t *source, client_t *client)
2743 {
2744 if (source->flags & SOURCE_LISTENERS_SYNC)
2745 client->ops = &listener_wait_ops;
2746 else if ((source->flags & (SOURCE_RUNNING|SOURCE_ON_DEMAND)) == SOURCE_ON_DEMAND)
2747 client->ops = &listener_pause_ops;
2748 else
2749 client->ops = &listener_client_ops;
2750 client->shared_data = source;
2751 client->queue_pos = 0;
2752 client->mount = source->mount;
2753 client->flags &= ~CLIENT_IN_FSERVE;
2754
2755 if (client->connection.sent_bytes > 0)
2756 client->check_buffer = http_source_introfile; // may need incomplete data sending
2757 else
2758 client->check_buffer = http_source_listener; // special case for headers
2759 // add client to the source
2760 avl_insert (source->clients, client);
2761 if (source->flags & SOURCE_ON_DEMAND)
2762 source->client->connection.discon.time = 0; // a run-over with on-demand relays needs resetting
2763
2764 if ((source->flags & (SOURCE_ON_DEMAND|SOURCE_RUNNING)) == SOURCE_ON_DEMAND)
2765 {
2766 source->client->schedule_ms = 0;
2767 client->schedule_ms += 300;
2768 worker_wakeup (source->client->worker);
2769 DEBUG1 ("woke up relay on %s", source->mount);
2770 }
2771 }
2772
2773
source_client_http_send(client_t * client)2774 static int source_client_http_send (client_t *client)
2775 {
2776 refbuf_t *stream;
2777 source_t *source = client->shared_data;
2778
2779 while (client->pos < client->refbuf->len)
2780 {
2781 if (client->connection.error || format_generic_write_to_client (client) < 0)
2782 {
2783 client->schedule_ms = client->worker->time_ms + 40;
2784 if (client->connection.error == 0)
2785 return 0; /* trap for short writes */
2786 global_lock();
2787 global.sources--;
2788 stats_event_args (NULL, "sources", "%d", global.sources);
2789 global_unlock();
2790 drop_source_from_tree (source);
2791 WARN1 ("failed to send OK response to source client for %s", source->mount);
2792 return -1;
2793 }
2794 }
2795 stream = client->refbuf->associated;
2796 client->refbuf->associated = NULL;
2797 refbuf_release (client->refbuf);
2798 client->refbuf = stream;
2799 client->pos = client->intro_offset;
2800 client->intro_offset = 0;
2801 thread_rwlock_wlock (&source->lock);
2802 return source_client_callback (client);
2803 }
2804
2805
source_format_init(source_t * source)2806 int source_format_init (source_t *source)
2807 {
2808 client_t *client = source->client;
2809 format_plugin_t *format = source->format;
2810
2811 if (format->mount == NULL)
2812 {
2813 if (format->type == FORMAT_TYPE_UNDEFINED)
2814 {
2815 format_type_t format_type = FORMAT_TYPE_MPEG;
2816 const char *contenttype;
2817
2818 DEBUG2 ("%sparser found for %s", client->parser ? "":"no ", source->mount);
2819 if (client->parser == NULL)
2820 return 0;
2821 contenttype = httpp_getvar (client->parser, "content-type");
2822 if (contenttype)
2823 {
2824 if (strcmp (contenttype, "application/octet-stream") != 0)
2825 format_type = format_get_type (contenttype);
2826 if (format_type == FORMAT_TYPE_UNDEFINED)
2827 {
2828 WARN1("Content-type \"%s\" not supported, assuming mpeg", contenttype);
2829 format_type = FORMAT_TYPE_MPEG;
2830 }
2831 }
2832 else
2833 WARN1("No content-type for %s, Assuming content is mpeg.", source->mount);
2834 format->type = format_type;
2835 }
2836 format->mount = source->mount;
2837 if (format_get_plugin (format) < 0)
2838 {
2839 WARN1 ("plugin format failed for \"%s\"", source->mount);
2840 return -1;
2841 }
2842 }
2843 format_apply_client (format, client);
2844 return 0;
2845 }
2846
2847
source_swap_client(source_t * source,client_t * client)2848 static void source_swap_client (source_t *source, client_t *client)
2849 {
2850 client_t *old_client = source->client;
2851
2852 INFO2 ("source %s hijacked by another client, terminating previous (at %"PRIu64")", source->mount, old_client->queue_pos);
2853 client->shared_data = source;
2854 source->client = client;
2855
2856 old_client->schedule_ms = client->worker->time_ms;
2857 old_client->shared_data = NULL;
2858 old_client->flags &= ~CLIENT_AUTHENTICATED;
2859 old_client->connection.sent_bytes = source->format->read_bytes;
2860 client->queue_pos = old_client->queue_pos;
2861
2862 source->format->read_bytes = 0;
2863 source->format->parser = source->client->parser;
2864 if (source->format->swap_client)
2865 source->format->swap_client (client, old_client);
2866
2867 worker_wakeup (old_client->worker);
2868 }
2869
2870
source_startup(client_t * client,const char * uri)2871 int source_startup (client_t *client, const char *uri)
2872 {
2873 source_t *source;
2874 ice_config_t *config = config_get_config();
2875 mount_proxy *mountinfo;
2876 int source_limit = config->source_limit;
2877
2878 config_release_config();
2879
2880 source = source_reserve (uri, (client->flags & CLIENT_HIJACKER) ? 1 : 0);
2881 if (source)
2882 {
2883 if ((client->flags & CLIENT_HIJACKER) && source_running (source))
2884 {
2885 source_swap_client (source, client);
2886 }
2887 else
2888 {
2889 global_lock();
2890 source->client = client;
2891 if (global.sources >= source_limit)
2892 {
2893 WARN1 ("Request to add source when maximum source limit reached %d", global.sources);
2894 global_unlock();
2895 thread_rwlock_unlock (&source->lock);
2896 source_free_source (source);
2897 return client_send_403 (client, "too many streams connected");
2898 }
2899 if (source_format_init (source) < 0)
2900 {
2901 global_unlock();
2902 source->client = NULL;
2903 thread_rwlock_unlock (&source->lock);
2904 source_free_source (source);
2905 return client_send_403 (client, "content type not supported");
2906 }
2907 ++global.sources;
2908 source->stats = stats_lock (source->stats, source->mount);
2909 stats_release (source->stats);
2910 INFO1 ("sources count is now %d", global.sources);
2911 stats_event_args (NULL, "sources", "%d", global.sources);
2912 global_unlock();
2913 }
2914 client->respcode = 200;
2915 client->shared_data = source;
2916
2917 config = config_get_config();
2918 mountinfo = config_find_mount (config, source->mount);
2919 source_update_settings (config, source, mountinfo);
2920 INFO1 ("source %s is ready to start", source->mount);
2921 config_release_config();
2922
2923 if (client->server_conn && client->server_conn->shoutcast_compat)
2924 {
2925 source->flags |= SOURCE_SHOUTCAST_COMPAT;
2926 source_client_callback (client);
2927 }
2928 else
2929 {
2930 refbuf_t *ok = refbuf_new (PER_CLIENT_REFBUF_SIZE);
2931 snprintf (ok->data, PER_CLIENT_REFBUF_SIZE,
2932 "HTTP/1.0 200 OK\r\n\r\n");
2933 ok->len = strlen (ok->data);
2934 /* we may have unprocessed data read in, so don't overwrite it */
2935 ok->associated = client->refbuf;
2936 client->refbuf = ok;
2937 client->intro_offset = client->pos;
2938 client->pos = 0;
2939 client->ops = &source_client_http_ops;
2940 thread_rwlock_unlock (&source->lock);
2941 }
2942 if ((client->flags & CLIENT_ACTIVE) == 0)
2943 {
2944 client->flags |= CLIENT_ACTIVE;
2945 worker_wakeup (client->worker);
2946 }
2947 return 0;
2948 }
2949 WARN1 ("Mountpoint %s in use", uri);
2950 return client_send_403 (client, "Mountpoint in use");
2951 }
2952
2953
2954 /* check to see if the source client can be moved to a less busy worker thread.
2955 * we only move the source client, not the listeners, they will move later
2956 */
source_change_worker(source_t * source,client_t * client)2957 static int source_change_worker (source_t *source, client_t *client)
2958 {
2959 worker_t *this_worker = client->worker, *worker;
2960 int ret = 0;
2961
2962 thread_rwlock_rlock (&workers_lock);
2963 if (this_worker->move_allocations)
2964 {
2965 int bypass = is_worker_incoming (this_worker) ? 1 : 0;
2966
2967 worker = worker_selected ();
2968 if (worker && worker != client->worker && (bypass || worker->count > 100))
2969 {
2970 long diff = bypass ? 2000000 : this_worker->count - worker->count;
2971 if (diff - (long)source->listeners < 10)
2972 diff = 0; // lets not move the source in this case.
2973 int base = (client->connection.id & 7) << 5;
2974 if ((diff > 2000 && worker->count > 200) || (diff > (source->listeners>>4) + base))
2975 {
2976 char *mount = strdup (source->mount);
2977 thread_rwlock_unlock (&source->lock);
2978
2979 thread_spin_lock (&this_worker->lock);
2980 if (this_worker->move_allocations < 1000000)
2981 this_worker->move_allocations--;
2982 thread_spin_unlock (&this_worker->lock);
2983
2984 ret = client_change_worker (client, worker);
2985 thread_rwlock_unlock (&workers_lock);
2986 if (ret)
2987 DEBUG3 ("moving source %s from %p to %p", mount, this_worker, worker);
2988 else
2989 thread_rwlock_wlock (&source->lock);
2990 free (mount);
2991 return ret;
2992 }
2993 }
2994 }
2995 thread_rwlock_unlock (&workers_lock);
2996 return 0;
2997 }
2998
2999
3000 /* move listener client to worker theread that the source is on. This will
3001 * help cache but prevent overloading a single worker with many listeners.
3002 */
listener_change_worker(client_t * client,source_t * source)3003 int listener_change_worker (client_t *client, source_t *source)
3004 {
3005 worker_t *this_worker = client->worker, *dest_worker = source->client->worker;
3006 int ret = 0, spin = 0, locked = 0;
3007 long diff = 0;
3008
3009 do
3010 {
3011 if (is_worker_incoming (this_worker) == 0 && (this_worker->time_ms & 0x14))
3012 break; // routine called frequently, but we do not need to reassess so frequently for normal workers
3013 if (thread_rwlock_tryrlock (&workers_lock) != 0)
3014 break;
3015 locked = 1;
3016 if (this_worker == dest_worker)
3017 dest_worker = worker_selected ();
3018
3019 if (dest_worker && this_worker != dest_worker)
3020 {
3021 int move = 0;
3022 int adj = ((client->connection.id & 7) << 6) + 100;
3023
3024 thread_spin_lock (&dest_worker->lock);
3025 int dest_count = dest_worker->count;
3026 thread_spin_unlock (&dest_worker->lock);
3027
3028 thread_spin_lock (&this_worker->lock);
3029 spin = 1;
3030 if (this_worker->move_allocations == 0)
3031 break; // already moved many, skip for now
3032 int this_alloc = this_worker->move_allocations;
3033
3034 if (is_worker_incoming (this_worker) == 0)
3035 {
3036 this_worker->move_allocations--;
3037 diff = this_worker->count - dest_count;
3038 if (diff < adj)
3039 break; // ignore the move this time
3040 }
3041 move = 1;
3042 thread_spin_unlock (&this_worker->lock);
3043 spin = 0;
3044 DEBUG3 ("dest count is %d, %d, move %d", dest_count, this_alloc, move);
3045
3046 if (move)
3047 {
3048 thread_rwlock_unlock (&source->lock);
3049 uint64_t id = client->connection.id;
3050 ret = client_change_worker (client, dest_worker);
3051 if (ret)
3052 DEBUG4 ("moving listener %" PRIu64 " on %s from %p to %p", id, source->mount, this_worker, dest_worker);
3053 else
3054 thread_rwlock_rlock (&source->lock);
3055 }
3056 }
3057 } while (0);
3058
3059 if (spin)
3060 thread_spin_unlock (&this_worker->lock);
3061 if (locked)
3062 thread_rwlock_unlock (&workers_lock);
3063 return ret;
3064 }
3065
3066