1 /* Icecast
2  *
3  * This program is distributed under the GNU General Public License, version 2.
4  * A copy of this license is included with this source.
5  *
6  * Copyright 2000-2004, Jack Moffitt <jack@xiph.org,
7  *                      Michael Smith <msmith@xiph.org>,
8  *                      oddsock <oddsock@xiph.org>,
9  *                      Karl Heyes <karl@xiph.org>
10  *                      and others (see AUTHORS for details).
11  * Copyright 2012-2014, Philipp "ph3-der-loewe" Schafft <lion@lion.leolix.org>,
12  */
13 
14 #ifdef HAVE_CONFIG_H
15 #include <config.h>
16 #endif
17 
18 #include <stdio.h>
19 #include <string.h>
20 #include <stdlib.h>
21 #include <stdarg.h>
22 
23 #include <libxml/xmlmemory.h>
24 #include <libxml/parser.h>
25 #include <libxml/tree.h>
26 
27 #include "thread/thread.h"
28 #include "avl/avl.h"
29 #include "httpp/httpp.h"
30 #include "net/sock.h"
31 
32 #include "connection.h"
33 
34 #include "source.h"
35 #include "global.h"
36 #include "refbuf.h"
37 #include "client.h"
38 #include "stats.h"
39 #include "xslt.h"
40 #include "util.h"
41 #define CATMODULE "stats"
42 #include "logging.h"
43 
44 #ifdef _WIN32
45 #define atoll _atoi64
46 #define vsnprintf _vsnprintf
47 #define snprintf _snprintf
48 #endif
49 
50 #define STATS_EVENT_SET     0
51 #define STATS_EVENT_INC     1
52 #define STATS_EVENT_DEC     2
53 #define STATS_EVENT_ADD     3
54 #define STATS_EVENT_SUB     4
55 #define STATS_EVENT_REMOVE  5
56 #define STATS_EVENT_HIDDEN  6
57 
58 typedef struct _event_queue_tag
59 {
60     volatile stats_event_t *head;
61     volatile stats_event_t **tail;
62 } event_queue_t;
63 
64 #define event_queue_init(qp)    { (qp)->head = NULL; (qp)->tail = &(qp)->head; }
65 
66 typedef struct _event_listener_tag
67 {
68     event_queue_t queue;
69     mutex_t mutex;
70 
71     struct _event_listener_tag *next;
72 } event_listener_t;
73 
74 static volatile int _stats_running = 0;
75 static thread_type *_stats_thread_id;
76 static volatile int _stats_threads = 0;
77 
78 static stats_t _stats;
79 static mutex_t _stats_mutex;
80 
81 static event_queue_t _global_event_queue;
82 mutex_t _global_event_mutex;
83 
84 static volatile event_listener_t *_event_listeners;
85 
86 
87 static void *_stats_thread(void *arg);
88 static int _compare_stats(void *a, void *b, void *arg);
89 static int _compare_source_stats(void *a, void *b, void *arg);
90 static int _free_stats(void *key);
91 static int _free_source_stats(void *key);
92 static void _add_event_to_queue(stats_event_t *event, event_queue_t *queue);
93 static stats_node_t *_find_node(avl_tree *tree, const char *name);
94 static stats_source_t *_find_source(avl_tree *tree, const char *source);
95 static void _free_event(stats_event_t *event);
96 static stats_event_t *_get_event_from_queue (event_queue_t *queue);
97 
98 
99 /* simple helper function for creating an event */
build_event(const char * source,const char * name,const char * value)100 static stats_event_t *build_event (const char *source, const char *name, const char *value)
101 {
102     stats_event_t *event;
103 
104     event = (stats_event_t *)calloc(1, sizeof(stats_event_t));
105     if (event)
106     {
107         if (source)
108             event->source = (char *)strdup(source);
109         if (name)
110             event->name = (char *)strdup(name);
111         if (value)
112             event->value = (char *)strdup(value);
113         else
114             event->action = STATS_EVENT_REMOVE;
115     }
116     return event;
117 }
118 
queue_global_event(stats_event_t * event)119 static void queue_global_event (stats_event_t *event)
120 {
121     thread_mutex_lock(&_global_event_mutex);
122     _add_event_to_queue (event, &_global_event_queue);
123     thread_mutex_unlock(&_global_event_mutex);
124 }
125 
stats_initialize(void)126 void stats_initialize(void)
127 {
128     _event_listeners = NULL;
129 
130     /* set up global struct */
131     _stats.global_tree = avl_tree_new(_compare_stats, NULL);
132     _stats.source_tree = avl_tree_new(_compare_source_stats, NULL);
133 
134     /* set up global mutex */
135     thread_mutex_create(&_stats_mutex);
136 
137     /* set up stats queues */
138     event_queue_init (&_global_event_queue);
139     thread_mutex_create(&_global_event_mutex);
140 
141     /* fire off the stats thread */
142     _stats_running = 1;
143     _stats_thread_id = thread_create("Stats Thread", _stats_thread, NULL, THREAD_ATTACHED);
144 }
145 
stats_shutdown(void)146 void stats_shutdown(void)
147 {
148     int n;
149 
150     if(!_stats_running) /* We can't shutdown if we're not running. */
151         return;
152 
153     /* wait for thread to exit */
154     _stats_running = 0;
155     thread_join(_stats_thread_id);
156 
157     /* wait for other threads to shut down */
158     do {
159         thread_sleep(300000);
160         thread_mutex_lock(&_stats_mutex);
161         n = _stats_threads;
162         thread_mutex_unlock(&_stats_mutex);
163     } while (n > 0);
164     ICECAST_LOG_INFO("stats thread finished");
165 
166     /* free the queues */
167 
168     /* destroy the queue mutexes */
169     thread_mutex_destroy(&_global_event_mutex);
170 
171     thread_mutex_destroy(&_stats_mutex);
172     avl_tree_free(_stats.source_tree, _free_source_stats);
173     avl_tree_free(_stats.global_tree, _free_stats);
174 
175     while (1)
176     {
177         stats_event_t *event = _get_event_from_queue (&_global_event_queue);
178         if (event == NULL) break;
179         if(event->source)
180             free(event->source);
181         if(event->value)
182             free(event->value);
183         if(event->name)
184             free(event->name);
185         free(event);
186     }
187 }
188 
stats_get_stats(void)189 stats_t *stats_get_stats(void)
190 {
191     /* lock global stats
192 
193      copy stats
194 
195      unlock global stats
196 
197      return copied stats */
198 
199     return NULL;
200 }
201 
202 /* simple name=tag stat create/update */
stats_event(const char * source,const char * name,const char * value)203 void stats_event(const char *source, const char *name, const char *value)
204 {
205     stats_event_t *event;
206 
207     if (value && xmlCheckUTF8 ((unsigned char *)value) == 0)
208     {
209         ICECAST_LOG_WARN("seen non-UTF8 data, probably incorrect metadata (%s, %s)", name, value);
210         return;
211     }
212     event = build_event (source, name, value);
213     if (event)
214         queue_global_event (event);
215 }
216 
217 
218 /* wrapper for stats_event, this takes a charset to convert from */
stats_event_conv(const char * mount,const char * name,const char * value,const char * charset)219 void stats_event_conv(const char *mount, const char *name, const char *value, const char *charset)
220 {
221     const char *metadata = value;
222     xmlBufferPtr conv = xmlBufferCreate ();
223 
224     if (charset)
225     {
226         xmlCharEncodingHandlerPtr handle = xmlFindCharEncodingHandler (charset);
227 
228         if (handle)
229         {
230             xmlBufferPtr raw = xmlBufferCreate ();
231             xmlBufferAdd (raw, (const xmlChar *)value, strlen (value));
232             if (xmlCharEncInFunc (handle, conv, raw) > 0)
233                 metadata = (char *)xmlBufferContent (conv);
234             xmlBufferFree (raw);
235             xmlCharEncCloseFunc (handle);
236         }
237         else
238             ICECAST_LOG_WARN("No charset found for \"%s\"", charset);
239     }
240 
241     stats_event (mount, name, metadata);
242     xmlBufferFree (conv);
243 }
244 
245 /* make stat hidden (non-zero). name can be NULL if it applies to a whole
246  * source stats tree. */
stats_event_hidden(const char * source,const char * name,int hidden)247 void stats_event_hidden (const char *source, const char *name, int hidden)
248 {
249     stats_event_t *event;
250     const char *str = NULL;
251 
252     if (hidden)
253         str = "";
254     event = build_event (source, name, str);
255     if (event)
256     {
257         event->action = STATS_EVENT_HIDDEN;
258         queue_global_event (event);
259     }
260 }
261 
262 /* printf style formatting for stat create/update */
stats_event_args(const char * source,char * name,char * format,...)263 void stats_event_args(const char *source, char *name, char *format, ...)
264 {
265     char buf[1024];
266     va_list val;
267     int ret;
268 
269     if (name == NULL)
270         return;
271     va_start(val, format);
272     ret = vsnprintf(buf, sizeof(buf), format, val);
273     va_end(val);
274 
275     if (ret < 0 || (unsigned int)ret >= sizeof (buf))
276     {
277         ICECAST_LOG_WARN("problem with formatting %s stat %s",
278                 source==NULL ? "global" : source, name);
279         return;
280     }
281     stats_event(source, name, buf);
282 }
283 
_get_stats(const char * source,const char * name)284 static char *_get_stats(const char *source, const char *name)
285 {
286     stats_node_t *stats = NULL;
287     stats_source_t *src = NULL;
288     char *value = NULL;
289 
290     thread_mutex_lock(&_stats_mutex);
291 
292     if (source == NULL) {
293         stats = _find_node(_stats.global_tree, name);
294     } else {
295         src = _find_source(_stats.source_tree, source);
296         if (src) {
297             stats = _find_node(src->stats_tree, name);
298         }
299     }
300 
301     if (stats) value = (char *)strdup(stats->value);
302 
303     thread_mutex_unlock(&_stats_mutex);
304 
305     return value;
306 }
307 
stats_get_value(const char * source,const char * name)308 char *stats_get_value(const char *source, const char *name)
309 {
310     return(_get_stats(source, name));
311 }
312 
313 /* increase the value in the provided stat by 1 */
stats_event_inc(const char * source,const char * name)314 void stats_event_inc(const char *source, const char *name)
315 {
316     stats_event_t *event = build_event (source, name, NULL);
317     /* ICECAST_LOG_DEBUG("%s on %s", name, source==NULL?"global":source); */
318     if (event)
319     {
320         event->action = STATS_EVENT_INC;
321         queue_global_event (event);
322     }
323 }
324 
stats_event_add(const char * source,const char * name,unsigned long value)325 void stats_event_add(const char *source, const char *name, unsigned long value)
326 {
327     stats_event_t *event = build_event (source, name, NULL);
328     /* ICECAST_LOG_DEBUG("%s on %s", name, source==NULL?"global":source); */
329     if (event)
330     {
331         event->value = malloc (16);
332         snprintf (event->value, 16, "%ld", value);
333         event->action = STATS_EVENT_ADD;
334         queue_global_event (event);
335     }
336 }
337 
stats_event_sub(const char * source,const char * name,unsigned long value)338 void stats_event_sub(const char *source, const char *name, unsigned long value)
339 {
340     stats_event_t *event = build_event (source, name, NULL);
341     if (event)
342     {
343         event->value = malloc (16);
344         snprintf (event->value, 16, "%ld", value);
345         event->action = STATS_EVENT_SUB;
346         queue_global_event (event);
347     }
348 }
349 
350 /* decrease the value in the provided stat by 1 */
stats_event_dec(const char * source,const char * name)351 void stats_event_dec(const char *source, const char *name)
352 {
353     /* ICECAST_LOG_DEBUG("%s on %s", name, source==NULL?"global":source); */
354     stats_event_t *event = build_event (source, name, NULL);
355     if (event)
356     {
357         event->action = STATS_EVENT_DEC;
358         queue_global_event (event);
359     }
360 }
361 
362 /* note: you must call this function only when you have exclusive access
363 ** to the avl_tree
364 */
_find_node(avl_tree * stats_tree,const char * name)365 static stats_node_t *_find_node(avl_tree *stats_tree, const char *name)
366 {
367     stats_node_t *stats;
368     avl_node *node;
369     int cmp;
370 
371     /* get the root node */
372     node = stats_tree->root->right;
373 
374     while (node) {
375         stats = (stats_node_t *)node->key;
376         cmp = strcmp(name, stats->name);
377         if (cmp < 0)
378             node = node->left;
379         else if (cmp > 0)
380             node = node->right;
381         else
382             return stats;
383     }
384 
385     /* didn't find it */
386     return NULL;
387 }
388 
389 /* note: you must call this function only when you have exclusive access
390 ** to the avl_tree
391 */
_find_source(avl_tree * source_tree,const char * source)392 static stats_source_t *_find_source(avl_tree *source_tree, const char *source)
393 {
394     stats_source_t *stats;
395     avl_node *node;
396     int cmp;
397 
398     /* get the root node */
399     node = source_tree->root->right;
400     while (node) {
401         stats = (stats_source_t *)node->key;
402         cmp = strcmp(source, stats->source);
403         if (cmp < 0)
404             node = node->left;
405         else if (cmp > 0)
406             node = node->right;
407         else
408             return stats;
409     }
410 
411     /* didn't find it */
412     return NULL;
413 }
414 
_copy_event(stats_event_t * event)415 static stats_event_t *_copy_event(stats_event_t *event)
416 {
417     stats_event_t *copy = (stats_event_t *)calloc(1, sizeof(stats_event_t));
418     if (event->source)
419         copy->source = (char *)strdup(event->source);
420     else
421         copy->source = NULL;
422     if (event->name)
423         copy->name = (char *)strdup(event->name);
424     if (event->value)
425         copy->value = (char *)strdup(event->value);
426     else
427         copy->value = NULL;
428     copy->hidden = event->hidden;
429     copy->next = NULL;
430 
431     return copy;
432 }
433 
434 
435 /* helper to apply specialised changes to a stats node */
modify_node_event(stats_node_t * node,stats_event_t * event)436 static void modify_node_event (stats_node_t *node, stats_event_t *event)
437 {
438     char *str;
439 
440     if (event->action == STATS_EVENT_HIDDEN)
441     {
442         if (event->value)
443             node->hidden = 1;
444         else
445             node->hidden = 0;
446         return;
447     }
448     if (event->action != STATS_EVENT_SET)
449     {
450         int64_t value = 0;
451 
452         switch (event->action)
453         {
454             case STATS_EVENT_INC:
455                 value = atoi (node->value)+1;
456                 break;
457             case STATS_EVENT_DEC:
458                 value = atoi (node->value)-1;
459                 break;
460             case STATS_EVENT_ADD:
461                 value = atoi (node->value)+atoi (event->value);
462                 break;
463             case STATS_EVENT_SUB:
464                 value = atoll (node->value) - atoll (event->value);
465                 break;
466             default:
467                 ICECAST_LOG_WARN("unhandled event (%d) for %s", event->action, event->source);
468                 break;
469         }
470         str = malloc (16);
471         snprintf (str, 16, "%" PRId64, value);
472         if (event->value == NULL)
473             event->value = strdup (str);
474     }
475     else
476         str = (char *)strdup (event->value);
477     free (node->value);
478     node->value = str;
479     if (event->source)
480         ICECAST_LOG_DEBUG("update \"%s\" %s (%s)", event->source, node->name, node->value);
481     else
482         ICECAST_LOG_DEBUG("update global %s (%s)", node->name, node->value);
483 }
484 
485 
process_global_event(stats_event_t * event)486 static void process_global_event (stats_event_t *event)
487 {
488     stats_node_t *node;
489 
490     /* ICECAST_LOG_DEBUG("global event %s %s %d", event->name, event->value, event->action); */
491     if (event->action == STATS_EVENT_REMOVE)
492     {
493         /* we're deleting */
494         node = _find_node(_stats.global_tree, event->name);
495         if (node != NULL)
496             avl_delete(_stats.global_tree, (void *)node, _free_stats);
497         return;
498     }
499     node = _find_node(_stats.global_tree, event->name);
500     if (node)
501     {
502         modify_node_event (node, event);
503     }
504     else
505     {
506         /* add node */
507         node = (stats_node_t *)calloc(1, sizeof(stats_node_t));
508         node->name = (char *)strdup(event->name);
509         node->value = (char *)strdup(event->value);
510 
511         avl_insert(_stats.global_tree, (void *)node);
512     }
513 }
514 
515 
process_source_event(stats_event_t * event)516 static void process_source_event (stats_event_t *event)
517 {
518     stats_source_t *snode = _find_source(_stats.source_tree, event->source);
519     if (snode == NULL)
520     {
521         if (event->action == STATS_EVENT_REMOVE)
522             return;
523         snode = (stats_source_t *)calloc(1,sizeof(stats_source_t));
524         if (snode == NULL)
525             return;
526         ICECAST_LOG_DEBUG("new source stat %s", event->source);
527         snode->source = (char *)strdup(event->source);
528         snode->stats_tree = avl_tree_new(_compare_stats, NULL);
529         if (event->action == STATS_EVENT_HIDDEN)
530             snode->hidden = 1;
531         else
532             snode->hidden = 0;
533 
534         avl_insert(_stats.source_tree, (void *)snode);
535     }
536     if (event->name)
537     {
538         stats_node_t *node = _find_node(snode->stats_tree, event->name);
539         if (node == NULL)
540         {
541             if (event->action == STATS_EVENT_REMOVE)
542                 return;
543             /* adding node */
544             if (event->value)
545             {
546                 ICECAST_LOG_DEBUG("new node %s (%s)", event->name, event->value);
547                 node = (stats_node_t *)calloc(1,sizeof(stats_node_t));
548                 node->name = (char *)strdup(event->name);
549                 node->value = (char *)strdup(event->value);
550                 node->hidden = snode->hidden;
551 
552                 avl_insert(snode->stats_tree, (void *)node);
553             }
554             return;
555         }
556         if (event->action == STATS_EVENT_REMOVE)
557         {
558             ICECAST_LOG_DEBUG("delete node %s", event->name);
559             avl_delete(snode->stats_tree, (void *)node, _free_stats);
560             return;
561         }
562         modify_node_event (node, event);
563         return;
564     }
565     if (event->action == STATS_EVENT_HIDDEN)
566     {
567         avl_node *node = avl_get_first (snode->stats_tree);
568 
569         if (event->value)
570             snode->hidden = 1;
571         else
572             snode->hidden = 0;
573         while (node)
574         {
575             stats_node_t *stats = (stats_node_t*)node->key;
576             stats->hidden = snode->hidden;
577             node = avl_get_next (node);
578         }
579         return;
580     }
581     if (event->action == STATS_EVENT_REMOVE)
582     {
583         ICECAST_LOG_DEBUG("delete source node %s", event->source);
584         avl_delete(_stats.source_tree, (void *)snode, _free_source_stats);
585     }
586 }
587 
588 /* NOTE: implicit %z is added to format string. */
__format_time(char * buffer,size_t len,const char * format)589 static inline void __format_time(char * buffer, size_t len, const char * format) {
590     time_t now = time(NULL);
591     struct tm local;
592     char tzbuffer[32];
593     char timebuffer[128];
594 #ifdef _WIN32
595     struct tm *thetime;
596     int time_days, time_hours, time_tz;
597     int tempnum1, tempnum2;
598     char sign;
599 #endif
600 
601     localtime_r (&now, &local);
602 #ifndef _WIN32
603     strftime (tzbuffer, sizeof(tzbuffer), "%z", &local);
604 #else
605     thetime = gmtime (&now);
606     time_days = local.tm_yday - thetime->tm_yday;
607 
608     if (time_days < -1) {
609         tempnum1 = 24;
610     } else {
611         tempnum1 = 1;
612     }
613 
614     if (tempnum1 < time_days) {
615         tempnum2 = -24;
616     } else {
617         tempnum2 = time_days*24;
618     }
619 
620     time_hours = (tempnum2 + local.tm_hour - thetime->tm_hour);
621     time_tz = time_hours * 60 + local.tm_min - thetime->tm_min;
622 
623     if (time_tz < 0) {
624         sign = '-';
625         time_tz = -time_tz;
626     } else {
627         sign = '+';
628     }
629 
630     snprintf(tzbuffer, sizeof(tzbuffer), "%c%.2d%.2d", sign, time_tz / 60, time_tz % 60);
631 #endif
632     strftime (timebuffer, sizeof(timebuffer), format, &local);
633 
634     snprintf(buffer, len, "%s%s", timebuffer, tzbuffer);
635 }
636 
stats_event_time(const char * mount,const char * name)637 void stats_event_time (const char *mount, const char *name)
638 {
639     char buffer[100];
640 
641     __format_time(buffer, sizeof(buffer), "%a, %d %b %Y %H:%M:%S ");
642     stats_event (mount, name, buffer);
643 }
644 
645 
stats_event_time_iso8601(const char * mount,const char * name)646 void stats_event_time_iso8601 (const char *mount, const char *name)
647 {
648     char buffer[100];
649 
650     __format_time(buffer, sizeof(buffer), "%Y-%m-%dT%H:%M:%S");
651     stats_event (mount, name, buffer);
652 }
653 
654 
stats_global(ice_config_t * config)655 void stats_global (ice_config_t *config)
656 {
657     stats_event (NULL, "server_id", config->server_id);
658     stats_event (NULL, "host", config->hostname);
659     stats_event (NULL, "location", config->location);
660     stats_event (NULL, "admin", config->admin);
661 }
662 
663 
_stats_thread(void * arg)664 static void *_stats_thread(void *arg)
665 {
666     stats_event_t *event;
667     stats_event_t *copy;
668     event_listener_t *listener;
669 
670     stats_event_time (NULL, "server_start");
671     stats_event_time_iso8601 (NULL, "server_start_iso8601");
672 
673     /* global currently active stats */
674     stats_event (NULL, "clients", "0");
675     stats_event (NULL, "connections", "0");
676     stats_event (NULL, "sources", "0");
677     stats_event (NULL, "stats", "0");
678     stats_event (NULL, "listeners", "0");
679 
680     /* global accumulating stats */
681     stats_event (NULL, "client_connections", "0");
682     stats_event (NULL, "source_client_connections", "0");
683     stats_event (NULL, "source_relay_connections", "0");
684     stats_event (NULL, "source_total_connections", "0");
685     stats_event (NULL, "stats_connections", "0");
686     stats_event (NULL, "listener_connections", "0");
687 
688     ICECAST_LOG_INFO("stats thread started");
689     while (_stats_running) {
690         thread_mutex_lock(&_global_event_mutex);
691         if (_global_event_queue.head != NULL) {
692             /* grab the next event from the queue */
693             event = _get_event_from_queue (&_global_event_queue);
694             thread_mutex_unlock(&_global_event_mutex);
695 
696             if (event == NULL)
697                 continue;
698             event->next = NULL;
699 
700             thread_mutex_lock(&_stats_mutex);
701 
702             /* check if we are dealing with a global or source event */
703             if (event->source == NULL)
704                 process_global_event (event);
705             else
706                 process_source_event (event);
707 
708             /* now we have an event that's been processed into the running stats */
709             /* this event should get copied to event listeners' queues */
710             listener = (event_listener_t *)_event_listeners;
711             while (listener) {
712                 copy = _copy_event(event);
713                 thread_mutex_lock (&listener->mutex);
714                 _add_event_to_queue (copy, &listener->queue);
715                 thread_mutex_unlock (&listener->mutex);
716 
717                 listener = listener->next;
718             }
719 
720             /* now we need to destroy the event */
721             _free_event(event);
722 
723             thread_mutex_unlock(&_stats_mutex);
724             continue;
725         }
726         else
727         {
728             thread_mutex_unlock(&_global_event_mutex);
729         }
730 
731         thread_sleep(300000);
732     }
733 
734     return NULL;
735 }
736 
737 /* you must have the _stats_mutex locked here */
_unregister_listener(event_listener_t * listener)738 static void _unregister_listener(event_listener_t *listener)
739 {
740     event_listener_t **prev = (event_listener_t **)&_event_listeners,
741                      *current = *prev;
742     while (current)
743     {
744         if (current == listener)
745         {
746             *prev = current->next;
747             break;
748         }
749         prev = &current->next;
750         current = *prev;
751     }
752 }
753 
754 
_make_event_from_node(stats_node_t * node,char * source)755 static stats_event_t *_make_event_from_node(stats_node_t *node, char *source)
756 {
757     stats_event_t *event = (stats_event_t *)malloc(sizeof(stats_event_t));
758 
759     if (source != NULL)
760         event->source = (char *)strdup(source);
761     else
762         event->source = NULL;
763     event->name = (char *)strdup(node->name);
764     event->value = (char *)strdup(node->value);
765     event->hidden = node->hidden;
766     event->action = STATS_EVENT_SET;
767     event->next = NULL;
768 
769     return event;
770 }
771 
772 
_add_event_to_queue(stats_event_t * event,event_queue_t * queue)773 static void _add_event_to_queue(stats_event_t *event, event_queue_t *queue)
774 {
775     *queue->tail = event;
776     queue->tail = (volatile stats_event_t **)&event->next;
777 }
778 
779 
_get_event_from_queue(event_queue_t * queue)780 static stats_event_t *_get_event_from_queue (event_queue_t *queue)
781 {
782     stats_event_t *event = NULL;
783 
784     if (queue && queue->head)
785     {
786         event = (stats_event_t *)queue->head;
787         queue->head = event->next;
788         if (queue->head == NULL)
789             queue->tail = &queue->head;
790     }
791 
792     return event;
793 }
794 
_send_event_to_client(stats_event_t * event,client_t * client)795 static int _send_event_to_client(stats_event_t *event, client_t *client)
796 {
797     int len;
798     char buf [200];
799 
800     /* send data to the client!!!! */
801     len = snprintf (buf, sizeof (buf), "EVENT %s %s %s\n",
802             (event->source != NULL) ? event->source : "global",
803             event->name ? event->name : "null",
804             event->value ? event->value : "null");
805     if (len > 0 && len < (int)sizeof (buf))
806     {
807         client_send_bytes (client, buf, len);
808         if (client->con->error)
809             return -1;
810     }
811     return 0;
812 }
813 
814 
_dump_stats_to_doc(xmlNodePtr root,const char * show_mount,int hidden)815 static xmlNodePtr _dump_stats_to_doc (xmlNodePtr root, const char *show_mount, int hidden)
816 {
817     avl_node *avlnode;
818     xmlNodePtr ret = NULL;
819 
820     thread_mutex_lock(&_stats_mutex);
821     /* general stats first */
822     avlnode = avl_get_first(_stats.global_tree);
823     while (avlnode)
824     {
825         stats_node_t *stat = avlnode->key;
826         if (stat->hidden <=  hidden)
827             xmlNewTextChild (root, NULL, XMLSTR(stat->name), XMLSTR(stat->value));
828         avlnode = avl_get_next (avlnode);
829     }
830     /* now per mount stats */
831     avlnode = avl_get_first(_stats.source_tree);
832     while (avlnode)
833     {
834         stats_source_t *source = (stats_source_t *)avlnode->key;
835         if (source->hidden <= hidden &&
836                 (show_mount == NULL || strcmp (show_mount, source->source) == 0))
837         {
838             avl_node *avlnode2 = avl_get_first (source->stats_tree);
839             xmlNodePtr xmlnode = xmlNewTextChild (root, NULL, XMLSTR("source"), NULL);
840 
841             xmlSetProp (xmlnode, XMLSTR("mount"), XMLSTR(source->source));
842             if (ret == NULL)
843                 ret = xmlnode;
844             while (avlnode2)
845             {
846                 stats_node_t *stat = avlnode2->key;
847                 xmlNewTextChild (xmlnode, NULL, XMLSTR(stat->name), XMLSTR(stat->value));
848                 avlnode2 = avl_get_next (avlnode2);
849             }
850         }
851         avlnode = avl_get_next (avlnode);
852     }
853     thread_mutex_unlock(&_stats_mutex);
854     return ret;
855 }
856 
857 
858 /* factoring out code for stats loops
859 ** this function copies all stats to queue, and registers
860 ** the queue for all new events atomically.
861 ** note: mutex must already be created!
862 */
_register_listener(event_listener_t * listener)863 static void _register_listener (event_listener_t *listener)
864 {
865     avl_node *node;
866     avl_node *node2;
867     stats_event_t *event;
868     stats_source_t *source;
869 
870     thread_mutex_lock(&_stats_mutex);
871 
872     /* first we fill our queue with the current stats */
873 
874     /* start with the global stats */
875     node = avl_get_first(_stats.global_tree);
876     while (node) {
877         event = _make_event_from_node((stats_node_t *)node->key, NULL);
878         _add_event_to_queue (event, &listener->queue);
879 
880         node = avl_get_next(node);
881     }
882 
883     /* now the stats for each source */
884     node = avl_get_first(_stats.source_tree);
885     while (node) {
886         source = (stats_source_t *)node->key;
887         node2 = avl_get_first(source->stats_tree);
888         while (node2) {
889             event = _make_event_from_node((stats_node_t *)node2->key, source->source);
890             _add_event_to_queue (event, &listener->queue);
891 
892             node2 = avl_get_next(node2);
893         }
894 
895         node = avl_get_next(node);
896     }
897 
898     /* now we register to receive future event notices */
899     listener->next = (event_listener_t *)_event_listeners;
900     _event_listeners = listener;
901 
902     thread_mutex_unlock(&_stats_mutex);
903 }
904 
stats_connection(void * arg)905 void *stats_connection(void *arg)
906 {
907     client_t *client = (client_t *)arg;
908     stats_event_t *event;
909     event_listener_t listener;
910 
911     ICECAST_LOG_INFO("stats client starting");
912 
913     event_queue_init (&listener.queue);
914     /* increment the thread count */
915     thread_mutex_lock(&_stats_mutex);
916     _stats_threads++;
917     stats_event_args (NULL, "stats", "%d", _stats_threads);
918     thread_mutex_unlock(&_stats_mutex);
919 
920     thread_mutex_create (&(listener.mutex));
921 
922     _register_listener (&listener);
923 
924     while (_stats_running) {
925         thread_mutex_lock (&listener.mutex);
926         event = _get_event_from_queue (&listener.queue);
927         thread_mutex_unlock (&listener.mutex);
928         if (event != NULL) {
929             if (_send_event_to_client(event, client) < 0) {
930                 _free_event(event);
931                 break;
932             }
933             _free_event(event);
934             continue;
935         }
936         thread_sleep (500000);
937     }
938 
939     thread_mutex_lock(&_stats_mutex);
940     _unregister_listener (&listener);
941     _stats_threads--;
942     stats_event_args (NULL, "stats", "%d", _stats_threads);
943     thread_mutex_unlock(&_stats_mutex);
944 
945     thread_mutex_destroy (&listener.mutex);
946     client_destroy (client);
947     ICECAST_LOG_INFO("stats client finished");
948 
949     return NULL;
950 }
951 
952 
stats_callback(client_t * client,void * notused)953 void stats_callback (client_t *client, void *notused)
954 {
955     if (client->con->error)
956     {
957         client_destroy (client);
958         return;
959     }
960     client_set_queue (client, NULL);
961     thread_create("Stats Connection", stats_connection, (void *)client, THREAD_DETACHED);
962 }
963 
964 
965 typedef struct _source_xml_tag {
966     char *mount;
967     xmlNodePtr node;
968 
969     struct _source_xml_tag *next;
970 } source_xml_t;
971 
972 
stats_transform_xslt(client_t * client,const char * uri)973 void stats_transform_xslt(client_t *client, const char *uri)
974 {
975     xmlDocPtr doc;
976     char *xslpath = util_get_path_from_normalised_uri (uri);
977     const char *mount = httpp_get_query_param (client->parser, "mount");
978 
979     doc = stats_get_xml (0, mount);
980 
981     xslt_transform(doc, xslpath, client);
982 
983     xmlFreeDoc(doc);
984     free (xslpath);
985 }
986 
stats_get_xml(int show_hidden,const char * show_mount)987 xmlDocPtr stats_get_xml(int show_hidden, const char *show_mount)
988 {
989     xmlDocPtr doc;
990     xmlNodePtr node;
991 
992     doc = xmlNewDoc (XMLSTR("1.0"));
993     node = xmlNewDocNode (doc, NULL, XMLSTR("icestats"), NULL);
994     xmlDocSetRootElement(doc, node);
995 
996     node = _dump_stats_to_doc (node, show_mount, show_hidden);
997 
998     return doc;
999 }
1000 
1001 
_compare_stats(void * arg,void * a,void * b)1002 static int _compare_stats(void *arg, void *a, void *b)
1003 {
1004     stats_node_t *nodea = (stats_node_t *)a;
1005     stats_node_t *nodeb = (stats_node_t *)b;
1006 
1007     return strcmp(nodea->name, nodeb->name);
1008 }
1009 
_compare_source_stats(void * arg,void * a,void * b)1010 static int _compare_source_stats(void *arg, void *a, void *b)
1011 {
1012     stats_source_t *nodea = (stats_source_t *)a;
1013     stats_source_t *nodeb = (stats_source_t *)b;
1014 
1015     return strcmp(nodea->source, nodeb->source);
1016 }
1017 
_free_stats(void * key)1018 static int _free_stats(void *key)
1019 {
1020     stats_node_t *node = (stats_node_t *)key;
1021     free(node->value);
1022     free(node->name);
1023     free(node);
1024 
1025     return 1;
1026 }
1027 
_free_source_stats(void * key)1028 static int _free_source_stats(void *key)
1029 {
1030     stats_source_t *node = (stats_source_t *)key;
1031     avl_tree_free(node->stats_tree, _free_stats);
1032     free(node->source);
1033     free(node);
1034 
1035     return 1;
1036 }
1037 
_free_event(stats_event_t * event)1038 static void _free_event(stats_event_t *event)
1039 {
1040     if (event->source) free(event->source);
1041     if (event->name) free(event->name);
1042     if (event->value) free(event->value);
1043     free(event);
1044 }
1045 
1046 
stats_get_streams(void)1047 refbuf_t *stats_get_streams (void)
1048 {
1049 #define STREAMLIST_BLKSIZE  4096
1050     avl_node *node;
1051     unsigned int remaining = STREAMLIST_BLKSIZE;
1052     refbuf_t *start = refbuf_new (remaining), *cur = start;
1053     char *buffer = cur->data;
1054 
1055     /* now the stats for each source */
1056     thread_mutex_lock (&_stats_mutex);
1057     node = avl_get_first(_stats.source_tree);
1058     while (node)
1059     {
1060         int ret;
1061         stats_source_t *source = (stats_source_t *)node->key;
1062 
1063         if (source->hidden == 0)
1064         {
1065             if (remaining <= strlen (source->source) + 3)
1066             {
1067                 cur->len = STREAMLIST_BLKSIZE - remaining;
1068                 cur->next = refbuf_new (STREAMLIST_BLKSIZE);
1069                 remaining = STREAMLIST_BLKSIZE;
1070                 cur = cur->next;
1071                 buffer = cur->data;
1072             }
1073             ret = snprintf (buffer, remaining, "%s\r\n", source->source);
1074             if (ret > 0)
1075             {
1076                 buffer += ret;
1077                 remaining -= ret;
1078             }
1079         }
1080         node = avl_get_next(node);
1081     }
1082     thread_mutex_unlock (&_stats_mutex);
1083     cur->len = STREAMLIST_BLKSIZE - remaining;
1084     return start;
1085 }
1086 
1087 
1088 
1089 /* This removes any source stats from virtual mountpoints, ie mountpoints
1090  * where no source_t exists. This function requires the global sources lock
1091  * to be held before calling.
1092  */
stats_clear_virtual_mounts(void)1093 void stats_clear_virtual_mounts (void)
1094 {
1095     avl_node *snode;
1096 
1097     thread_mutex_lock (&_stats_mutex);
1098     snode = avl_get_first(_stats.source_tree);
1099     while (snode)
1100     {
1101         stats_source_t *src = (stats_source_t *)snode->key;
1102         source_t *source = source_find_mount_raw (src->source);
1103 
1104         if (source == NULL)
1105         {
1106             /* no source_t is reserved so remove them now */
1107             snode = avl_get_next (snode);
1108             ICECAST_LOG_DEBUG("releasing %s stats", src->source);
1109             avl_delete (_stats.source_tree, src, _free_source_stats);
1110             continue;
1111         }
1112 
1113         snode = avl_get_next (snode);
1114     }
1115     thread_mutex_unlock (&_stats_mutex);
1116 }
1117 
1118