1 /* Icecast
2 *
3 * This program is distributed under the GNU General Public License, version 2.
4 * A copy of this license is included with this source.
5 *
6 * Copyright 2000-2004, Jack Moffitt <jack@xiph.org,
7 * Michael Smith <msmith@xiph.org>,
8 * oddsock <oddsock@xiph.org>,
9 * Karl Heyes <karl@xiph.org>
10 * and others (see AUTHORS for details).
11 * Copyright 2012-2014, Philipp "ph3-der-loewe" Schafft <lion@lion.leolix.org>,
12 */
13
14 #ifdef HAVE_CONFIG_H
15 #include <config.h>
16 #endif
17
18 #include <stdio.h>
19 #include <string.h>
20 #include <stdlib.h>
21 #include <stdarg.h>
22
23 #include <libxml/xmlmemory.h>
24 #include <libxml/parser.h>
25 #include <libxml/tree.h>
26
27 #include "thread/thread.h"
28 #include "avl/avl.h"
29 #include "httpp/httpp.h"
30 #include "net/sock.h"
31
32 #include "connection.h"
33
34 #include "source.h"
35 #include "global.h"
36 #include "refbuf.h"
37 #include "client.h"
38 #include "stats.h"
39 #include "xslt.h"
40 #include "util.h"
41 #define CATMODULE "stats"
42 #include "logging.h"
43
44 #ifdef _WIN32
45 #define atoll _atoi64
46 #define vsnprintf _vsnprintf
47 #define snprintf _snprintf
48 #endif
49
50 #define STATS_EVENT_SET 0
51 #define STATS_EVENT_INC 1
52 #define STATS_EVENT_DEC 2
53 #define STATS_EVENT_ADD 3
54 #define STATS_EVENT_SUB 4
55 #define STATS_EVENT_REMOVE 5
56 #define STATS_EVENT_HIDDEN 6
57
58 typedef struct _event_queue_tag
59 {
60 volatile stats_event_t *head;
61 volatile stats_event_t **tail;
62 } event_queue_t;
63
64 #define event_queue_init(qp) { (qp)->head = NULL; (qp)->tail = &(qp)->head; }
65
66 typedef struct _event_listener_tag
67 {
68 event_queue_t queue;
69 mutex_t mutex;
70
71 struct _event_listener_tag *next;
72 } event_listener_t;
73
74 static volatile int _stats_running = 0;
75 static thread_type *_stats_thread_id;
76 static volatile int _stats_threads = 0;
77
78 static stats_t _stats;
79 static mutex_t _stats_mutex;
80
81 static event_queue_t _global_event_queue;
82 mutex_t _global_event_mutex;
83
84 static volatile event_listener_t *_event_listeners;
85
86
87 static void *_stats_thread(void *arg);
88 static int _compare_stats(void *a, void *b, void *arg);
89 static int _compare_source_stats(void *a, void *b, void *arg);
90 static int _free_stats(void *key);
91 static int _free_source_stats(void *key);
92 static void _add_event_to_queue(stats_event_t *event, event_queue_t *queue);
93 static stats_node_t *_find_node(avl_tree *tree, const char *name);
94 static stats_source_t *_find_source(avl_tree *tree, const char *source);
95 static void _free_event(stats_event_t *event);
96 static stats_event_t *_get_event_from_queue (event_queue_t *queue);
97
98
99 /* simple helper function for creating an event */
build_event(const char * source,const char * name,const char * value)100 static stats_event_t *build_event (const char *source, const char *name, const char *value)
101 {
102 stats_event_t *event;
103
104 event = (stats_event_t *)calloc(1, sizeof(stats_event_t));
105 if (event)
106 {
107 if (source)
108 event->source = (char *)strdup(source);
109 if (name)
110 event->name = (char *)strdup(name);
111 if (value)
112 event->value = (char *)strdup(value);
113 else
114 event->action = STATS_EVENT_REMOVE;
115 }
116 return event;
117 }
118
queue_global_event(stats_event_t * event)119 static void queue_global_event (stats_event_t *event)
120 {
121 thread_mutex_lock(&_global_event_mutex);
122 _add_event_to_queue (event, &_global_event_queue);
123 thread_mutex_unlock(&_global_event_mutex);
124 }
125
stats_initialize(void)126 void stats_initialize(void)
127 {
128 _event_listeners = NULL;
129
130 /* set up global struct */
131 _stats.global_tree = avl_tree_new(_compare_stats, NULL);
132 _stats.source_tree = avl_tree_new(_compare_source_stats, NULL);
133
134 /* set up global mutex */
135 thread_mutex_create(&_stats_mutex);
136
137 /* set up stats queues */
138 event_queue_init (&_global_event_queue);
139 thread_mutex_create(&_global_event_mutex);
140
141 /* fire off the stats thread */
142 _stats_running = 1;
143 _stats_thread_id = thread_create("Stats Thread", _stats_thread, NULL, THREAD_ATTACHED);
144 }
145
stats_shutdown(void)146 void stats_shutdown(void)
147 {
148 int n;
149
150 if(!_stats_running) /* We can't shutdown if we're not running. */
151 return;
152
153 /* wait for thread to exit */
154 _stats_running = 0;
155 thread_join(_stats_thread_id);
156
157 /* wait for other threads to shut down */
158 do {
159 thread_sleep(300000);
160 thread_mutex_lock(&_stats_mutex);
161 n = _stats_threads;
162 thread_mutex_unlock(&_stats_mutex);
163 } while (n > 0);
164 ICECAST_LOG_INFO("stats thread finished");
165
166 /* free the queues */
167
168 /* destroy the queue mutexes */
169 thread_mutex_destroy(&_global_event_mutex);
170
171 thread_mutex_destroy(&_stats_mutex);
172 avl_tree_free(_stats.source_tree, _free_source_stats);
173 avl_tree_free(_stats.global_tree, _free_stats);
174
175 while (1)
176 {
177 stats_event_t *event = _get_event_from_queue (&_global_event_queue);
178 if (event == NULL) break;
179 if(event->source)
180 free(event->source);
181 if(event->value)
182 free(event->value);
183 if(event->name)
184 free(event->name);
185 free(event);
186 }
187 }
188
stats_get_stats(void)189 stats_t *stats_get_stats(void)
190 {
191 /* lock global stats
192
193 copy stats
194
195 unlock global stats
196
197 return copied stats */
198
199 return NULL;
200 }
201
202 /* simple name=tag stat create/update */
stats_event(const char * source,const char * name,const char * value)203 void stats_event(const char *source, const char *name, const char *value)
204 {
205 stats_event_t *event;
206
207 if (value && xmlCheckUTF8 ((unsigned char *)value) == 0)
208 {
209 ICECAST_LOG_WARN("seen non-UTF8 data, probably incorrect metadata (%s, %s)", name, value);
210 return;
211 }
212 event = build_event (source, name, value);
213 if (event)
214 queue_global_event (event);
215 }
216
217
218 /* wrapper for stats_event, this takes a charset to convert from */
stats_event_conv(const char * mount,const char * name,const char * value,const char * charset)219 void stats_event_conv(const char *mount, const char *name, const char *value, const char *charset)
220 {
221 const char *metadata = value;
222 xmlBufferPtr conv = xmlBufferCreate ();
223
224 if (charset)
225 {
226 xmlCharEncodingHandlerPtr handle = xmlFindCharEncodingHandler (charset);
227
228 if (handle)
229 {
230 xmlBufferPtr raw = xmlBufferCreate ();
231 xmlBufferAdd (raw, (const xmlChar *)value, strlen (value));
232 if (xmlCharEncInFunc (handle, conv, raw) > 0)
233 metadata = (char *)xmlBufferContent (conv);
234 xmlBufferFree (raw);
235 xmlCharEncCloseFunc (handle);
236 }
237 else
238 ICECAST_LOG_WARN("No charset found for \"%s\"", charset);
239 }
240
241 stats_event (mount, name, metadata);
242 xmlBufferFree (conv);
243 }
244
245 /* make stat hidden (non-zero). name can be NULL if it applies to a whole
246 * source stats tree. */
stats_event_hidden(const char * source,const char * name,int hidden)247 void stats_event_hidden (const char *source, const char *name, int hidden)
248 {
249 stats_event_t *event;
250 const char *str = NULL;
251
252 if (hidden)
253 str = "";
254 event = build_event (source, name, str);
255 if (event)
256 {
257 event->action = STATS_EVENT_HIDDEN;
258 queue_global_event (event);
259 }
260 }
261
262 /* printf style formatting for stat create/update */
stats_event_args(const char * source,char * name,char * format,...)263 void stats_event_args(const char *source, char *name, char *format, ...)
264 {
265 char buf[1024];
266 va_list val;
267 int ret;
268
269 if (name == NULL)
270 return;
271 va_start(val, format);
272 ret = vsnprintf(buf, sizeof(buf), format, val);
273 va_end(val);
274
275 if (ret < 0 || (unsigned int)ret >= sizeof (buf))
276 {
277 ICECAST_LOG_WARN("problem with formatting %s stat %s",
278 source==NULL ? "global" : source, name);
279 return;
280 }
281 stats_event(source, name, buf);
282 }
283
_get_stats(const char * source,const char * name)284 static char *_get_stats(const char *source, const char *name)
285 {
286 stats_node_t *stats = NULL;
287 stats_source_t *src = NULL;
288 char *value = NULL;
289
290 thread_mutex_lock(&_stats_mutex);
291
292 if (source == NULL) {
293 stats = _find_node(_stats.global_tree, name);
294 } else {
295 src = _find_source(_stats.source_tree, source);
296 if (src) {
297 stats = _find_node(src->stats_tree, name);
298 }
299 }
300
301 if (stats) value = (char *)strdup(stats->value);
302
303 thread_mutex_unlock(&_stats_mutex);
304
305 return value;
306 }
307
stats_get_value(const char * source,const char * name)308 char *stats_get_value(const char *source, const char *name)
309 {
310 return(_get_stats(source, name));
311 }
312
313 /* increase the value in the provided stat by 1 */
stats_event_inc(const char * source,const char * name)314 void stats_event_inc(const char *source, const char *name)
315 {
316 stats_event_t *event = build_event (source, name, NULL);
317 /* ICECAST_LOG_DEBUG("%s on %s", name, source==NULL?"global":source); */
318 if (event)
319 {
320 event->action = STATS_EVENT_INC;
321 queue_global_event (event);
322 }
323 }
324
stats_event_add(const char * source,const char * name,unsigned long value)325 void stats_event_add(const char *source, const char *name, unsigned long value)
326 {
327 stats_event_t *event = build_event (source, name, NULL);
328 /* ICECAST_LOG_DEBUG("%s on %s", name, source==NULL?"global":source); */
329 if (event)
330 {
331 event->value = malloc (16);
332 snprintf (event->value, 16, "%ld", value);
333 event->action = STATS_EVENT_ADD;
334 queue_global_event (event);
335 }
336 }
337
stats_event_sub(const char * source,const char * name,unsigned long value)338 void stats_event_sub(const char *source, const char *name, unsigned long value)
339 {
340 stats_event_t *event = build_event (source, name, NULL);
341 if (event)
342 {
343 event->value = malloc (16);
344 snprintf (event->value, 16, "%ld", value);
345 event->action = STATS_EVENT_SUB;
346 queue_global_event (event);
347 }
348 }
349
350 /* decrease the value in the provided stat by 1 */
stats_event_dec(const char * source,const char * name)351 void stats_event_dec(const char *source, const char *name)
352 {
353 /* ICECAST_LOG_DEBUG("%s on %s", name, source==NULL?"global":source); */
354 stats_event_t *event = build_event (source, name, NULL);
355 if (event)
356 {
357 event->action = STATS_EVENT_DEC;
358 queue_global_event (event);
359 }
360 }
361
362 /* note: you must call this function only when you have exclusive access
363 ** to the avl_tree
364 */
_find_node(avl_tree * stats_tree,const char * name)365 static stats_node_t *_find_node(avl_tree *stats_tree, const char *name)
366 {
367 stats_node_t *stats;
368 avl_node *node;
369 int cmp;
370
371 /* get the root node */
372 node = stats_tree->root->right;
373
374 while (node) {
375 stats = (stats_node_t *)node->key;
376 cmp = strcmp(name, stats->name);
377 if (cmp < 0)
378 node = node->left;
379 else if (cmp > 0)
380 node = node->right;
381 else
382 return stats;
383 }
384
385 /* didn't find it */
386 return NULL;
387 }
388
389 /* note: you must call this function only when you have exclusive access
390 ** to the avl_tree
391 */
_find_source(avl_tree * source_tree,const char * source)392 static stats_source_t *_find_source(avl_tree *source_tree, const char *source)
393 {
394 stats_source_t *stats;
395 avl_node *node;
396 int cmp;
397
398 /* get the root node */
399 node = source_tree->root->right;
400 while (node) {
401 stats = (stats_source_t *)node->key;
402 cmp = strcmp(source, stats->source);
403 if (cmp < 0)
404 node = node->left;
405 else if (cmp > 0)
406 node = node->right;
407 else
408 return stats;
409 }
410
411 /* didn't find it */
412 return NULL;
413 }
414
_copy_event(stats_event_t * event)415 static stats_event_t *_copy_event(stats_event_t *event)
416 {
417 stats_event_t *copy = (stats_event_t *)calloc(1, sizeof(stats_event_t));
418 if (event->source)
419 copy->source = (char *)strdup(event->source);
420 else
421 copy->source = NULL;
422 if (event->name)
423 copy->name = (char *)strdup(event->name);
424 if (event->value)
425 copy->value = (char *)strdup(event->value);
426 else
427 copy->value = NULL;
428 copy->hidden = event->hidden;
429 copy->next = NULL;
430
431 return copy;
432 }
433
434
435 /* helper to apply specialised changes to a stats node */
modify_node_event(stats_node_t * node,stats_event_t * event)436 static void modify_node_event (stats_node_t *node, stats_event_t *event)
437 {
438 char *str;
439
440 if (event->action == STATS_EVENT_HIDDEN)
441 {
442 if (event->value)
443 node->hidden = 1;
444 else
445 node->hidden = 0;
446 return;
447 }
448 if (event->action != STATS_EVENT_SET)
449 {
450 int64_t value = 0;
451
452 switch (event->action)
453 {
454 case STATS_EVENT_INC:
455 value = atoi (node->value)+1;
456 break;
457 case STATS_EVENT_DEC:
458 value = atoi (node->value)-1;
459 break;
460 case STATS_EVENT_ADD:
461 value = atoi (node->value)+atoi (event->value);
462 break;
463 case STATS_EVENT_SUB:
464 value = atoll (node->value) - atoll (event->value);
465 break;
466 default:
467 ICECAST_LOG_WARN("unhandled event (%d) for %s", event->action, event->source);
468 break;
469 }
470 str = malloc (16);
471 snprintf (str, 16, "%" PRId64, value);
472 if (event->value == NULL)
473 event->value = strdup (str);
474 }
475 else
476 str = (char *)strdup (event->value);
477 free (node->value);
478 node->value = str;
479 if (event->source)
480 ICECAST_LOG_DEBUG("update \"%s\" %s (%s)", event->source, node->name, node->value);
481 else
482 ICECAST_LOG_DEBUG("update global %s (%s)", node->name, node->value);
483 }
484
485
process_global_event(stats_event_t * event)486 static void process_global_event (stats_event_t *event)
487 {
488 stats_node_t *node;
489
490 /* ICECAST_LOG_DEBUG("global event %s %s %d", event->name, event->value, event->action); */
491 if (event->action == STATS_EVENT_REMOVE)
492 {
493 /* we're deleting */
494 node = _find_node(_stats.global_tree, event->name);
495 if (node != NULL)
496 avl_delete(_stats.global_tree, (void *)node, _free_stats);
497 return;
498 }
499 node = _find_node(_stats.global_tree, event->name);
500 if (node)
501 {
502 modify_node_event (node, event);
503 }
504 else
505 {
506 /* add node */
507 node = (stats_node_t *)calloc(1, sizeof(stats_node_t));
508 node->name = (char *)strdup(event->name);
509 node->value = (char *)strdup(event->value);
510
511 avl_insert(_stats.global_tree, (void *)node);
512 }
513 }
514
515
process_source_event(stats_event_t * event)516 static void process_source_event (stats_event_t *event)
517 {
518 stats_source_t *snode = _find_source(_stats.source_tree, event->source);
519 if (snode == NULL)
520 {
521 if (event->action == STATS_EVENT_REMOVE)
522 return;
523 snode = (stats_source_t *)calloc(1,sizeof(stats_source_t));
524 if (snode == NULL)
525 return;
526 ICECAST_LOG_DEBUG("new source stat %s", event->source);
527 snode->source = (char *)strdup(event->source);
528 snode->stats_tree = avl_tree_new(_compare_stats, NULL);
529 if (event->action == STATS_EVENT_HIDDEN)
530 snode->hidden = 1;
531 else
532 snode->hidden = 0;
533
534 avl_insert(_stats.source_tree, (void *)snode);
535 }
536 if (event->name)
537 {
538 stats_node_t *node = _find_node(snode->stats_tree, event->name);
539 if (node == NULL)
540 {
541 if (event->action == STATS_EVENT_REMOVE)
542 return;
543 /* adding node */
544 if (event->value)
545 {
546 ICECAST_LOG_DEBUG("new node %s (%s)", event->name, event->value);
547 node = (stats_node_t *)calloc(1,sizeof(stats_node_t));
548 node->name = (char *)strdup(event->name);
549 node->value = (char *)strdup(event->value);
550 node->hidden = snode->hidden;
551
552 avl_insert(snode->stats_tree, (void *)node);
553 }
554 return;
555 }
556 if (event->action == STATS_EVENT_REMOVE)
557 {
558 ICECAST_LOG_DEBUG("delete node %s", event->name);
559 avl_delete(snode->stats_tree, (void *)node, _free_stats);
560 return;
561 }
562 modify_node_event (node, event);
563 return;
564 }
565 if (event->action == STATS_EVENT_HIDDEN)
566 {
567 avl_node *node = avl_get_first (snode->stats_tree);
568
569 if (event->value)
570 snode->hidden = 1;
571 else
572 snode->hidden = 0;
573 while (node)
574 {
575 stats_node_t *stats = (stats_node_t*)node->key;
576 stats->hidden = snode->hidden;
577 node = avl_get_next (node);
578 }
579 return;
580 }
581 if (event->action == STATS_EVENT_REMOVE)
582 {
583 ICECAST_LOG_DEBUG("delete source node %s", event->source);
584 avl_delete(_stats.source_tree, (void *)snode, _free_source_stats);
585 }
586 }
587
588 /* NOTE: implicit %z is added to format string. */
__format_time(char * buffer,size_t len,const char * format)589 static inline void __format_time(char * buffer, size_t len, const char * format) {
590 time_t now = time(NULL);
591 struct tm local;
592 char tzbuffer[32];
593 char timebuffer[128];
594 #ifdef _WIN32
595 struct tm *thetime;
596 int time_days, time_hours, time_tz;
597 int tempnum1, tempnum2;
598 char sign;
599 #endif
600
601 localtime_r (&now, &local);
602 #ifndef _WIN32
603 strftime (tzbuffer, sizeof(tzbuffer), "%z", &local);
604 #else
605 thetime = gmtime (&now);
606 time_days = local.tm_yday - thetime->tm_yday;
607
608 if (time_days < -1) {
609 tempnum1 = 24;
610 } else {
611 tempnum1 = 1;
612 }
613
614 if (tempnum1 < time_days) {
615 tempnum2 = -24;
616 } else {
617 tempnum2 = time_days*24;
618 }
619
620 time_hours = (tempnum2 + local.tm_hour - thetime->tm_hour);
621 time_tz = time_hours * 60 + local.tm_min - thetime->tm_min;
622
623 if (time_tz < 0) {
624 sign = '-';
625 time_tz = -time_tz;
626 } else {
627 sign = '+';
628 }
629
630 snprintf(tzbuffer, sizeof(tzbuffer), "%c%.2d%.2d", sign, time_tz / 60, time_tz % 60);
631 #endif
632 strftime (timebuffer, sizeof(timebuffer), format, &local);
633
634 snprintf(buffer, len, "%s%s", timebuffer, tzbuffer);
635 }
636
stats_event_time(const char * mount,const char * name)637 void stats_event_time (const char *mount, const char *name)
638 {
639 char buffer[100];
640
641 __format_time(buffer, sizeof(buffer), "%a, %d %b %Y %H:%M:%S ");
642 stats_event (mount, name, buffer);
643 }
644
645
stats_event_time_iso8601(const char * mount,const char * name)646 void stats_event_time_iso8601 (const char *mount, const char *name)
647 {
648 char buffer[100];
649
650 __format_time(buffer, sizeof(buffer), "%Y-%m-%dT%H:%M:%S");
651 stats_event (mount, name, buffer);
652 }
653
654
stats_global(ice_config_t * config)655 void stats_global (ice_config_t *config)
656 {
657 stats_event (NULL, "server_id", config->server_id);
658 stats_event (NULL, "host", config->hostname);
659 stats_event (NULL, "location", config->location);
660 stats_event (NULL, "admin", config->admin);
661 }
662
663
_stats_thread(void * arg)664 static void *_stats_thread(void *arg)
665 {
666 stats_event_t *event;
667 stats_event_t *copy;
668 event_listener_t *listener;
669
670 stats_event_time (NULL, "server_start");
671 stats_event_time_iso8601 (NULL, "server_start_iso8601");
672
673 /* global currently active stats */
674 stats_event (NULL, "clients", "0");
675 stats_event (NULL, "connections", "0");
676 stats_event (NULL, "sources", "0");
677 stats_event (NULL, "stats", "0");
678 stats_event (NULL, "listeners", "0");
679
680 /* global accumulating stats */
681 stats_event (NULL, "client_connections", "0");
682 stats_event (NULL, "source_client_connections", "0");
683 stats_event (NULL, "source_relay_connections", "0");
684 stats_event (NULL, "source_total_connections", "0");
685 stats_event (NULL, "stats_connections", "0");
686 stats_event (NULL, "listener_connections", "0");
687
688 ICECAST_LOG_INFO("stats thread started");
689 while (_stats_running) {
690 thread_mutex_lock(&_global_event_mutex);
691 if (_global_event_queue.head != NULL) {
692 /* grab the next event from the queue */
693 event = _get_event_from_queue (&_global_event_queue);
694 thread_mutex_unlock(&_global_event_mutex);
695
696 if (event == NULL)
697 continue;
698 event->next = NULL;
699
700 thread_mutex_lock(&_stats_mutex);
701
702 /* check if we are dealing with a global or source event */
703 if (event->source == NULL)
704 process_global_event (event);
705 else
706 process_source_event (event);
707
708 /* now we have an event that's been processed into the running stats */
709 /* this event should get copied to event listeners' queues */
710 listener = (event_listener_t *)_event_listeners;
711 while (listener) {
712 copy = _copy_event(event);
713 thread_mutex_lock (&listener->mutex);
714 _add_event_to_queue (copy, &listener->queue);
715 thread_mutex_unlock (&listener->mutex);
716
717 listener = listener->next;
718 }
719
720 /* now we need to destroy the event */
721 _free_event(event);
722
723 thread_mutex_unlock(&_stats_mutex);
724 continue;
725 }
726 else
727 {
728 thread_mutex_unlock(&_global_event_mutex);
729 }
730
731 thread_sleep(300000);
732 }
733
734 return NULL;
735 }
736
737 /* you must have the _stats_mutex locked here */
_unregister_listener(event_listener_t * listener)738 static void _unregister_listener(event_listener_t *listener)
739 {
740 event_listener_t **prev = (event_listener_t **)&_event_listeners,
741 *current = *prev;
742 while (current)
743 {
744 if (current == listener)
745 {
746 *prev = current->next;
747 break;
748 }
749 prev = ¤t->next;
750 current = *prev;
751 }
752 }
753
754
_make_event_from_node(stats_node_t * node,char * source)755 static stats_event_t *_make_event_from_node(stats_node_t *node, char *source)
756 {
757 stats_event_t *event = (stats_event_t *)malloc(sizeof(stats_event_t));
758
759 if (source != NULL)
760 event->source = (char *)strdup(source);
761 else
762 event->source = NULL;
763 event->name = (char *)strdup(node->name);
764 event->value = (char *)strdup(node->value);
765 event->hidden = node->hidden;
766 event->action = STATS_EVENT_SET;
767 event->next = NULL;
768
769 return event;
770 }
771
772
_add_event_to_queue(stats_event_t * event,event_queue_t * queue)773 static void _add_event_to_queue(stats_event_t *event, event_queue_t *queue)
774 {
775 *queue->tail = event;
776 queue->tail = (volatile stats_event_t **)&event->next;
777 }
778
779
_get_event_from_queue(event_queue_t * queue)780 static stats_event_t *_get_event_from_queue (event_queue_t *queue)
781 {
782 stats_event_t *event = NULL;
783
784 if (queue && queue->head)
785 {
786 event = (stats_event_t *)queue->head;
787 queue->head = event->next;
788 if (queue->head == NULL)
789 queue->tail = &queue->head;
790 }
791
792 return event;
793 }
794
_send_event_to_client(stats_event_t * event,client_t * client)795 static int _send_event_to_client(stats_event_t *event, client_t *client)
796 {
797 int len;
798 char buf [200];
799
800 /* send data to the client!!!! */
801 len = snprintf (buf, sizeof (buf), "EVENT %s %s %s\n",
802 (event->source != NULL) ? event->source : "global",
803 event->name ? event->name : "null",
804 event->value ? event->value : "null");
805 if (len > 0 && len < (int)sizeof (buf))
806 {
807 client_send_bytes (client, buf, len);
808 if (client->con->error)
809 return -1;
810 }
811 return 0;
812 }
813
814
_dump_stats_to_doc(xmlNodePtr root,const char * show_mount,int hidden)815 static xmlNodePtr _dump_stats_to_doc (xmlNodePtr root, const char *show_mount, int hidden)
816 {
817 avl_node *avlnode;
818 xmlNodePtr ret = NULL;
819
820 thread_mutex_lock(&_stats_mutex);
821 /* general stats first */
822 avlnode = avl_get_first(_stats.global_tree);
823 while (avlnode)
824 {
825 stats_node_t *stat = avlnode->key;
826 if (stat->hidden <= hidden)
827 xmlNewTextChild (root, NULL, XMLSTR(stat->name), XMLSTR(stat->value));
828 avlnode = avl_get_next (avlnode);
829 }
830 /* now per mount stats */
831 avlnode = avl_get_first(_stats.source_tree);
832 while (avlnode)
833 {
834 stats_source_t *source = (stats_source_t *)avlnode->key;
835 if (source->hidden <= hidden &&
836 (show_mount == NULL || strcmp (show_mount, source->source) == 0))
837 {
838 avl_node *avlnode2 = avl_get_first (source->stats_tree);
839 xmlNodePtr xmlnode = xmlNewTextChild (root, NULL, XMLSTR("source"), NULL);
840
841 xmlSetProp (xmlnode, XMLSTR("mount"), XMLSTR(source->source));
842 if (ret == NULL)
843 ret = xmlnode;
844 while (avlnode2)
845 {
846 stats_node_t *stat = avlnode2->key;
847 xmlNewTextChild (xmlnode, NULL, XMLSTR(stat->name), XMLSTR(stat->value));
848 avlnode2 = avl_get_next (avlnode2);
849 }
850 }
851 avlnode = avl_get_next (avlnode);
852 }
853 thread_mutex_unlock(&_stats_mutex);
854 return ret;
855 }
856
857
858 /* factoring out code for stats loops
859 ** this function copies all stats to queue, and registers
860 ** the queue for all new events atomically.
861 ** note: mutex must already be created!
862 */
_register_listener(event_listener_t * listener)863 static void _register_listener (event_listener_t *listener)
864 {
865 avl_node *node;
866 avl_node *node2;
867 stats_event_t *event;
868 stats_source_t *source;
869
870 thread_mutex_lock(&_stats_mutex);
871
872 /* first we fill our queue with the current stats */
873
874 /* start with the global stats */
875 node = avl_get_first(_stats.global_tree);
876 while (node) {
877 event = _make_event_from_node((stats_node_t *)node->key, NULL);
878 _add_event_to_queue (event, &listener->queue);
879
880 node = avl_get_next(node);
881 }
882
883 /* now the stats for each source */
884 node = avl_get_first(_stats.source_tree);
885 while (node) {
886 source = (stats_source_t *)node->key;
887 node2 = avl_get_first(source->stats_tree);
888 while (node2) {
889 event = _make_event_from_node((stats_node_t *)node2->key, source->source);
890 _add_event_to_queue (event, &listener->queue);
891
892 node2 = avl_get_next(node2);
893 }
894
895 node = avl_get_next(node);
896 }
897
898 /* now we register to receive future event notices */
899 listener->next = (event_listener_t *)_event_listeners;
900 _event_listeners = listener;
901
902 thread_mutex_unlock(&_stats_mutex);
903 }
904
stats_connection(void * arg)905 void *stats_connection(void *arg)
906 {
907 client_t *client = (client_t *)arg;
908 stats_event_t *event;
909 event_listener_t listener;
910
911 ICECAST_LOG_INFO("stats client starting");
912
913 event_queue_init (&listener.queue);
914 /* increment the thread count */
915 thread_mutex_lock(&_stats_mutex);
916 _stats_threads++;
917 stats_event_args (NULL, "stats", "%d", _stats_threads);
918 thread_mutex_unlock(&_stats_mutex);
919
920 thread_mutex_create (&(listener.mutex));
921
922 _register_listener (&listener);
923
924 while (_stats_running) {
925 thread_mutex_lock (&listener.mutex);
926 event = _get_event_from_queue (&listener.queue);
927 thread_mutex_unlock (&listener.mutex);
928 if (event != NULL) {
929 if (_send_event_to_client(event, client) < 0) {
930 _free_event(event);
931 break;
932 }
933 _free_event(event);
934 continue;
935 }
936 thread_sleep (500000);
937 }
938
939 thread_mutex_lock(&_stats_mutex);
940 _unregister_listener (&listener);
941 _stats_threads--;
942 stats_event_args (NULL, "stats", "%d", _stats_threads);
943 thread_mutex_unlock(&_stats_mutex);
944
945 thread_mutex_destroy (&listener.mutex);
946 client_destroy (client);
947 ICECAST_LOG_INFO("stats client finished");
948
949 return NULL;
950 }
951
952
stats_callback(client_t * client,void * notused)953 void stats_callback (client_t *client, void *notused)
954 {
955 if (client->con->error)
956 {
957 client_destroy (client);
958 return;
959 }
960 client_set_queue (client, NULL);
961 thread_create("Stats Connection", stats_connection, (void *)client, THREAD_DETACHED);
962 }
963
964
965 typedef struct _source_xml_tag {
966 char *mount;
967 xmlNodePtr node;
968
969 struct _source_xml_tag *next;
970 } source_xml_t;
971
972
stats_transform_xslt(client_t * client,const char * uri)973 void stats_transform_xslt(client_t *client, const char *uri)
974 {
975 xmlDocPtr doc;
976 char *xslpath = util_get_path_from_normalised_uri (uri);
977 const char *mount = httpp_get_query_param (client->parser, "mount");
978
979 doc = stats_get_xml (0, mount);
980
981 xslt_transform(doc, xslpath, client);
982
983 xmlFreeDoc(doc);
984 free (xslpath);
985 }
986
stats_get_xml(int show_hidden,const char * show_mount)987 xmlDocPtr stats_get_xml(int show_hidden, const char *show_mount)
988 {
989 xmlDocPtr doc;
990 xmlNodePtr node;
991
992 doc = xmlNewDoc (XMLSTR("1.0"));
993 node = xmlNewDocNode (doc, NULL, XMLSTR("icestats"), NULL);
994 xmlDocSetRootElement(doc, node);
995
996 node = _dump_stats_to_doc (node, show_mount, show_hidden);
997
998 return doc;
999 }
1000
1001
_compare_stats(void * arg,void * a,void * b)1002 static int _compare_stats(void *arg, void *a, void *b)
1003 {
1004 stats_node_t *nodea = (stats_node_t *)a;
1005 stats_node_t *nodeb = (stats_node_t *)b;
1006
1007 return strcmp(nodea->name, nodeb->name);
1008 }
1009
_compare_source_stats(void * arg,void * a,void * b)1010 static int _compare_source_stats(void *arg, void *a, void *b)
1011 {
1012 stats_source_t *nodea = (stats_source_t *)a;
1013 stats_source_t *nodeb = (stats_source_t *)b;
1014
1015 return strcmp(nodea->source, nodeb->source);
1016 }
1017
_free_stats(void * key)1018 static int _free_stats(void *key)
1019 {
1020 stats_node_t *node = (stats_node_t *)key;
1021 free(node->value);
1022 free(node->name);
1023 free(node);
1024
1025 return 1;
1026 }
1027
_free_source_stats(void * key)1028 static int _free_source_stats(void *key)
1029 {
1030 stats_source_t *node = (stats_source_t *)key;
1031 avl_tree_free(node->stats_tree, _free_stats);
1032 free(node->source);
1033 free(node);
1034
1035 return 1;
1036 }
1037
_free_event(stats_event_t * event)1038 static void _free_event(stats_event_t *event)
1039 {
1040 if (event->source) free(event->source);
1041 if (event->name) free(event->name);
1042 if (event->value) free(event->value);
1043 free(event);
1044 }
1045
1046
stats_get_streams(void)1047 refbuf_t *stats_get_streams (void)
1048 {
1049 #define STREAMLIST_BLKSIZE 4096
1050 avl_node *node;
1051 unsigned int remaining = STREAMLIST_BLKSIZE;
1052 refbuf_t *start = refbuf_new (remaining), *cur = start;
1053 char *buffer = cur->data;
1054
1055 /* now the stats for each source */
1056 thread_mutex_lock (&_stats_mutex);
1057 node = avl_get_first(_stats.source_tree);
1058 while (node)
1059 {
1060 int ret;
1061 stats_source_t *source = (stats_source_t *)node->key;
1062
1063 if (source->hidden == 0)
1064 {
1065 if (remaining <= strlen (source->source) + 3)
1066 {
1067 cur->len = STREAMLIST_BLKSIZE - remaining;
1068 cur->next = refbuf_new (STREAMLIST_BLKSIZE);
1069 remaining = STREAMLIST_BLKSIZE;
1070 cur = cur->next;
1071 buffer = cur->data;
1072 }
1073 ret = snprintf (buffer, remaining, "%s\r\n", source->source);
1074 if (ret > 0)
1075 {
1076 buffer += ret;
1077 remaining -= ret;
1078 }
1079 }
1080 node = avl_get_next(node);
1081 }
1082 thread_mutex_unlock (&_stats_mutex);
1083 cur->len = STREAMLIST_BLKSIZE - remaining;
1084 return start;
1085 }
1086
1087
1088
1089 /* This removes any source stats from virtual mountpoints, ie mountpoints
1090 * where no source_t exists. This function requires the global sources lock
1091 * to be held before calling.
1092 */
stats_clear_virtual_mounts(void)1093 void stats_clear_virtual_mounts (void)
1094 {
1095 avl_node *snode;
1096
1097 thread_mutex_lock (&_stats_mutex);
1098 snode = avl_get_first(_stats.source_tree);
1099 while (snode)
1100 {
1101 stats_source_t *src = (stats_source_t *)snode->key;
1102 source_t *source = source_find_mount_raw (src->source);
1103
1104 if (source == NULL)
1105 {
1106 /* no source_t is reserved so remove them now */
1107 snode = avl_get_next (snode);
1108 ICECAST_LOG_DEBUG("releasing %s stats", src->source);
1109 avl_delete (_stats.source_tree, src, _free_source_stats);
1110 continue;
1111 }
1112
1113 snode = avl_get_next (snode);
1114 }
1115 thread_mutex_unlock (&_stats_mutex);
1116 }
1117
1118