1 /* Icecast
2  *
3  * This program is distributed under the GNU General Public License, version 2.
4  * A copy of this license is included with this source.
5  *
6  * Copyright 2000-2004, Jack Moffitt <jack@xiph.org,
7  *                      Michael Smith <msmith@xiph.org>,
8  *                      oddsock <oddsock@xiph.org>,
9  *                      Karl Heyes <karl@xiph.org>
10  *                      and others (see AUTHORS for details).
11  */
12 
13 #ifdef HAVE_CONFIG_H
14 #include <config.h>
15 #endif
16 
17 #include <stdio.h>
18 #include <string.h>
19 #include <stdlib.h>
20 #include <stdarg.h>
21 #include <limits.h>
22 
23 #include <libxml/xmlmemory.h>
24 #include <libxml/parser.h>
25 #include <libxml/parserInternals.h>
26 #include <libxml/tree.h>
27 
28 #include <git_hash.h>
29 #include "thread/thread.h"
30 #include "avl/avl.h"
31 #include "httpp/httpp.h"
32 #include "net/sock.h"
33 
34 #include "connection.h"
35 
36 #include "source.h"
37 #include "admin.h"
38 #include "global.h"
39 #include "refbuf.h"
40 #include "client.h"
41 #include "stats.h"
42 #include "xslt.h"
43 #include "util.h"
44 #include "fserve.h"
45 #define CATMODULE "stats"
46 #include "logging.h"
47 
48 #if !defined HAVE_ATOLL && defined HAVE_STRTOLL
49 #define atoll(nptr) strtoll(nptr, (char **)NULL, 10)
50 #endif
51 
52 #define VAL_BUFSIZE 30
53 #define STATS_BLOCK_CONNECTION  01
54 
55 #define STATS_EVENT_SET     0
56 #define STATS_EVENT_INC     1
57 #define STATS_EVENT_DEC     2
58 #define STATS_EVENT_ADD     3
59 #define STATS_EVENT_SUB     4
60 #define STATS_EVENT_REMOVE  5
61 #define STATS_EVENT_HIDDEN  0x80
62 
63 typedef struct _stats_node_tag
64 {
65     char *name;
66     char *value;
67     time_t  last_reported;
68     int  flags;
69 } stats_node_t;
70 
71 typedef struct _stats_event_tag
72 {
73     char *source;
74     char *name;
75     char *value;
76     int  flags;
77     int  action;
78 
79     struct _stats_event_tag *next;
80 } stats_event_t;
81 
82 typedef struct _stats_source_tag
83 {
84     char *source;
85     int  flags;
86     time_t updated;
87     avl_tree *stats_tree;
88 } stats_source_t;
89 
90 typedef struct _event_listener_tag
91 {
92     int mask;
93     unsigned int content_len;
94     char *source;
95 
96     /* queue for unwritten stats to stats clients */
97     refbuf_t *recent_block;
98     client_t *client;
99 
100     struct _event_listener_tag *next;
101 } event_listener_t;
102 
103 
104 typedef struct _stats_tag
105 {
106     avl_tree *global_tree;
107     avl_tree *source_tree;
108 
109     /* list of listeners for stats */
110     event_listener_t *event_listeners;
111     mutex_t listeners_lock;
112 
113 } stats_t;
114 
115 static volatile int _stats_running = 0;
116 
117 static stats_t _stats;
118 
119 
120 static int _compare_stats(void *a, void *b, void *arg);
121 static int _compare_source_stats(void *a, void *b, void *arg);
122 static int _free_stats(void *key);
123 static int _free_source_stats(void *key);
124 static int _free_source_stats_wrapper (void *key);
125 static stats_node_t *_find_node(const avl_tree *tree, const char *name);
126 static stats_source_t *_find_source(avl_tree *tree, const char *source);
127 static void process_event (stats_event_t *event);
128 static void _add_stats_to_stats_client (client_t *client, const char *fmt, va_list ap);
129 static void stats_listener_send (int flags, const char *fmt, ...);
130 
131 unsigned int throttle_sends;
132 
133 /* simple helper function for creating an event */
build_event(stats_event_t * event,const char * source,const char * name,const char * value)134 static void build_event (stats_event_t *event, const char *source, const char *name, const char *value)
135 {
136     event->source = (char *)source;
137     event->name = (char *)name;
138     event->value = (char *)value;
139     event->flags = STATS_PUBLIC;
140     if (source) event->flags |= STATS_SLAVE;
141     if (value)
142         event->action = STATS_EVENT_SET;
143     else
144         event->action = STATS_EVENT_REMOVE;
145 }
146 
147 
stats_initialize(void)148 void stats_initialize(void)
149 {
150     if (_stats_running)
151         return;
152 
153     /* set up global struct */
154     _stats.global_tree = avl_tree_new(_compare_stats, NULL);
155     _stats.source_tree = avl_tree_new(_compare_source_stats, NULL);
156 
157     _stats.event_listeners = NULL;
158     thread_mutex_create (&_stats.listeners_lock);
159 
160     _stats_running = 1;
161 
162     stats_event_time (NULL, "server_start", STATS_GENERAL);
163 
164     /* global currently active stats */
165     stats_event_flags (NULL, "clients", "0", STATS_COUNTERS|STATS_REGULAR);
166     stats_event_flags (NULL, "listeners", "0", STATS_COUNTERS|STATS_REGULAR);
167     stats_event_flags (NULL, "connections", "0", STATS_COUNTERS|STATS_REGULAR);
168     stats_event_flags (NULL, "sources", "0", STATS_COUNTERS|STATS_REGULAR);
169     stats_event_flags (NULL, "stats", "0", STATS_COUNTERS|STATS_REGULAR);
170     stats_event_flags (NULL, "banned_IPs", "0", STATS_COUNTERS|STATS_REGULAR);
171 #ifdef GIT_VERSION
172     stats_event (NULL, "build", GIT_VERSION);
173 #endif
174 
175     /* global accumulating stats */
176     stats_event_flags (NULL, "client_connections", "0", STATS_COUNTERS|STATS_REGULAR);
177     stats_event_flags (NULL, "source_client_connections", "0", STATS_COUNTERS|STATS_REGULAR);
178     stats_event_flags (NULL, "source_relay_connections", "0", STATS_COUNTERS|STATS_REGULAR);
179     stats_event_flags (NULL, "source_total_connections", "0", STATS_COUNTERS|STATS_REGULAR);
180     stats_event_flags (NULL, "stats_connections", "0", STATS_COUNTERS|STATS_REGULAR);
181     stats_event_flags (NULL, "listener_connections", "0", STATS_COUNTERS|STATS_REGULAR);
182     stats_event_flags (NULL, "outgoing_kbitrate", "0", STATS_COUNTERS|STATS_REGULAR);
183     stats_event_flags (NULL, "stream_kbytes_sent", "0", STATS_COUNTERS|STATS_REGULAR);
184     stats_event_flags (NULL, "stream_kbytes_read", "0", STATS_COUNTERS|STATS_REGULAR);
185 }
186 
stats_shutdown(void)187 void stats_shutdown(void)
188 {
189     if(!_stats_running) /* We can't shutdown if we're not running. */
190         return;
191 
192     _stats_running = 0;
193 
194     avl_tree_free(_stats.source_tree, _free_source_stats_wrapper);
195     avl_tree_free(_stats.global_tree, _free_stats);
196     thread_mutex_destroy (&_stats.listeners_lock);
197 }
198 
199 
stats_clients_wakeup(void)200 void stats_clients_wakeup (void)
201 {
202     event_listener_t *listener;
203 
204     thread_mutex_lock (&_stats.listeners_lock);
205     listener = _stats.event_listeners;
206     while (listener)
207     {
208         client_t *client = listener->client;
209         if (client)
210             client->schedule_ms = 0;
211         listener = listener->next;
212     }
213     thread_mutex_unlock (&_stats.listeners_lock);
214 }
215 
216 
217 /* simple name=tag stat create/update */
stats_event(const char * source,const char * name,const char * value)218 void stats_event(const char *source, const char *name, const char *value)
219 {
220     stats_event_t event;
221 
222     if (value && xmlCheckUTF8 ((unsigned char *)value) == 0)
223     {
224         WARN3 ("seen non-UTF8 data (%s), probably incorrect metadata (%s, %s)",
225                 source?source:"global", name, value);
226         return;
227     }
228     build_event (&event, source, name, (char *)value);
229     process_event (&event);
230 }
231 
232 
233 /* wrapper for stats_event, this takes a charset to convert from */
stats_event_conv(const char * mount,const char * name,const char * value,const char * charset)234 void stats_event_conv(const char *mount, const char *name, const char *value, const char *charset)
235 {
236     const char *metadata = value;
237     xmlBufferPtr conv = xmlBufferCreate ();
238 
239     if (charset && value)
240     {
241         xmlCharEncodingHandlerPtr handle = xmlFindCharEncodingHandler (charset);
242 
243         if (handle)
244         {
245             xmlBufferPtr raw = xmlBufferCreate ();
246             xmlBufferAdd (raw, (const xmlChar *)value, strlen (value));
247             if (xmlCharEncInFunc (handle, conv, raw) > 0)
248                 metadata = (char *)xmlBufferContent (conv);
249             xmlBufferFree (raw);
250             xmlCharEncCloseFunc (handle);
251         }
252         else
253             WARN1 ("No charset found for \"%s\"", charset);
254     }
255 
256     stats_event (mount, name, metadata);
257     xmlBufferFree (conv);
258 }
259 
260 /* set stat with flags, name can be NULL if it applies to a whole
261  * source stats tree. */
stats_event_flags(const char * source,const char * name,const char * value,int flags)262 void stats_event_flags (const char *source, const char *name, const char *value, int flags)
263 {
264     stats_event_t event;
265 
266     build_event (&event, source, name, value);
267     event.flags = flags;
268     if (value)
269         event.action |= STATS_EVENT_HIDDEN;
270     else
271         event.action = STATS_EVENT_HIDDEN;
272     process_event (&event);
273 }
274 
275 /* printf style formatting for stat create/update */
stats_event_args(const char * source,char * name,char * format,...)276 void stats_event_args(const char *source, char *name, char *format, ...)
277 {
278     va_list val;
279     int ret;
280     char buf[1024];
281 
282     if (name == NULL)
283         return;
284     va_start(val, format);
285     ret = vsnprintf(buf, sizeof (buf), format, val);
286     va_end(val);
287 
288     if (ret < 0 || (unsigned int)ret >= sizeof (buf))
289     {
290         WARN2 ("problem with formatting %s stat %s",
291                 source==NULL ? "global" : source, name);
292         return;
293     }
294     stats_event(source, name, buf);
295 }
296 
_get_stats(const char * source,const char * name)297 static char *_get_stats(const char *source, const char *name)
298 {
299     stats_node_t *stats = NULL;
300     stats_source_t *src = NULL;
301     char *value = NULL;
302 
303     if (source == NULL) {
304         avl_tree_rlock (_stats.global_tree);
305         stats = _find_node(_stats.global_tree, name);
306         if (stats) value = (char *)strdup(stats->value);
307         avl_tree_unlock (_stats.global_tree);
308     } else {
309         avl_tree_rlock (_stats.source_tree);
310         src = _find_source(_stats.source_tree, source);
311         if (src)
312         {
313             avl_tree_rlock (src->stats_tree);
314             avl_tree_unlock (_stats.source_tree);
315             stats = _find_node(src->stats_tree, name);
316             if (stats) value = (char *)strdup(stats->value);
317             avl_tree_unlock (src->stats_tree);
318         }
319         else
320             avl_tree_unlock (_stats.source_tree);
321     }
322 
323     return value;
324 }
325 
stats_get_value(const char * source,const char * name)326 char *stats_get_value(const char *source, const char *name)
327 {
328     return(_get_stats(source, name));
329 }
330 
331 
stats_retrieve(stats_handle_t handle,const char * name)332 char *stats_retrieve (stats_handle_t handle, const char *name)
333 {
334     char *v = NULL;
335     stats_source_t *src_stats = (stats_source_t *)handle;
336     stats_node_t *stats = _find_node (src_stats->stats_tree, name);
337 
338     if (stats) v =  strdup (stats->value);
339     return v;
340 }
341 
342 
343 /* increase the value in the provided stat by 1 */
stats_event_inc(const char * source,const char * name)344 void stats_event_inc(const char *source, const char *name)
345 {
346     stats_event_t event;
347     char buffer[VAL_BUFSIZE] = "1";
348     build_event (&event, source, name, buffer);
349     /* DEBUG2("%s on %s", name, source==NULL?"global":source); */
350     event.action = STATS_EVENT_INC;
351     process_event (&event);
352 }
353 
stats_event_add(const char * source,const char * name,unsigned long value)354 void stats_event_add(const char *source, const char *name, unsigned long value)
355 {
356     stats_event_t event;
357     char buffer [VAL_BUFSIZE];
358 
359     if (value == 0)
360         return;
361     build_event (&event, source, name, buffer);
362     snprintf (buffer, VAL_BUFSIZE, "%ld", value);
363     event.action = STATS_EVENT_ADD;
364     /* DEBUG2("%s on %s", name, source==NULL?"global":source); */
365     process_event (&event);
366 }
367 
stats_event_sub(const char * source,const char * name,unsigned long value)368 void stats_event_sub(const char *source, const char *name, unsigned long value)
369 {
370     stats_event_t event;
371     char buffer[VAL_BUFSIZE];
372 
373     if (value == 0)
374         return;
375     build_event (&event, source, name, buffer);
376     /* DEBUG2("%s on %s", name, source==NULL?"global":source); */
377     snprintf (buffer, VAL_BUFSIZE, "%ld", value);
378     event.action = STATS_EVENT_SUB;
379     process_event (&event);
380 }
381 
382 /* decrease the value in the provided stat by 1 */
stats_event_dec(const char * source,const char * name)383 void stats_event_dec(const char *source, const char *name)
384 {
385     stats_event_t event;
386     char buffer[VAL_BUFSIZE] = "0";
387     /* DEBUG2("%s on %s", name, source==NULL?"global":source); */
388     build_event (&event, source, name, buffer);
389     event.action = STATS_EVENT_DEC;
390     process_event (&event);
391 }
392 
393 /* note: you must call this function only when you have exclusive access
394 ** to the avl_tree
395 */
_find_node(const avl_tree * stats_tree,const char * name)396 static stats_node_t *_find_node(const avl_tree *stats_tree, const char *name)
397 {
398     stats_node_t *stats;
399     avl_node *node;
400     int cmp;
401 
402     /* get the root node */
403     node = stats_tree->root->right;
404 
405     while (node) {
406         stats = (stats_node_t *)node->key;
407         cmp = strcmp(name, stats->name);
408         if (cmp < 0)
409             node = node->left;
410         else if (cmp > 0)
411             node = node->right;
412         else
413             return stats;
414     }
415 
416     /* didn't find it */
417     return NULL;
418 }
419 
420 /* note: you must call this function only when you have exclusive access
421 ** to the avl_tree
422 */
_find_source(avl_tree * source_tree,const char * source)423 static stats_source_t *_find_source(avl_tree *source_tree, const char *source)
424 {
425     stats_source_t *stats;
426     avl_node *node;
427     int cmp;
428 
429     /* get the root node */
430     node = source_tree->root->right;
431     while (node) {
432         stats = (stats_source_t *)node->key;
433         cmp = strcmp(source, stats->source);
434         if (cmp < 0)
435             node = node->left;
436         else if (cmp > 0)
437             node = node->right;
438         else
439             return stats;
440     }
441 
442     /* didn't find it */
443     return NULL;
444 }
445 
446 
447 /* helper to apply specialised changes to a stats node */
modify_node_event(stats_node_t * node,stats_event_t * event)448 static void modify_node_event (stats_node_t *node, stats_event_t *event)
449 {
450     if (node == NULL || event == NULL)
451         return;
452     if (event->action & STATS_EVENT_HIDDEN)
453     {
454         node->flags = event->flags;
455         event->action &= ~STATS_EVENT_HIDDEN;
456         if (event->value == NULL)
457             return;
458     }
459     if (event->action == STATS_EVENT_SET)
460     {
461         if (node->flags & STATS_REGULAR)
462         {
463             if (node->value && strcmp (node->value, event->value) == 0)
464                 return;  // no change, lets get out
465         }
466     }
467     else
468     {
469         int64_t value = 0;
470 
471         switch (event->action)
472         {
473             case STATS_EVENT_INC:
474                 value = atoll (node->value)+1;
475                 break;
476             case STATS_EVENT_DEC:
477                 value = atoll (node->value)-1;
478                 break;
479             case STATS_EVENT_ADD:
480                 value = atoll (node->value) + atoll (event->value);
481                 break;
482             case STATS_EVENT_SUB:
483                 value = atoll (node->value) - atoll (event->value);
484                 break;
485             default:
486                 break;
487         }
488         snprintf (event->value, VAL_BUFSIZE, "%" PRId64, value);
489     }
490     if (node->value)
491         free (node->value);
492     node->value = strdup (event->value);
493 
494     if (node->flags & STATS_REGULAR)
495         node->last_reported = 0;
496     else
497         DEBUG3 ("update \"%s\" %s (%s)", event->source?event->source:"global", node->name, node->value);
498 }
499 
500 
process_global_event(stats_event_t * event)501 static void process_global_event (stats_event_t *event)
502 {
503     stats_node_t *node = NULL;
504 
505     avl_tree_wlock (_stats.global_tree);
506     /* DEBUG3("global event %s %s %d", event->name, event->value, event->action); */
507     if (event->action == STATS_EVENT_REMOVE)
508     {
509         /* we're deleting */
510         node = _find_node(_stats.global_tree, event->name);
511         if (node != NULL)
512         {
513             stats_listener_send (node->flags, "DELETE global %s\n", event->name);
514             avl_delete(_stats.global_tree, (void *)node, _free_stats);
515         }
516         avl_tree_unlock (_stats.global_tree);
517         return;
518     }
519     node = _find_node(_stats.global_tree, event->name);
520     if (node)
521     {
522         modify_node_event (node, event);
523     }
524     else
525     {
526         /* add node */
527         node = (stats_node_t *)calloc(1, sizeof(stats_node_t));
528         node->name = (char *)strdup(event->name);
529         node->value = (char *)strdup(event->value);
530         node->flags = event->flags;
531 
532         avl_insert(_stats.global_tree, (void *)node);
533     }
534     if ((node->flags & STATS_REGULAR) == 0)
535         stats_listener_send (node->flags, "EVENT global %s %s\n", node->name, node->value);
536     avl_tree_unlock (_stats.global_tree);
537 }
538 
539 
process_source_stat(stats_source_t * src_stats,stats_event_t * event)540 static void process_source_stat (stats_source_t *src_stats, stats_event_t *event)
541 {
542     if (event->name)
543     {
544         stats_node_t *node = _find_node (src_stats->stats_tree, event->name);
545         if (node == NULL)
546         {
547             /* adding node */
548             if (event->action != STATS_EVENT_REMOVE && event->value)
549             {
550                 DEBUG3 ("new node on %s \"%s\" (%s)", src_stats->source, event->name, event->value);
551                 node = (stats_node_t *)calloc (1,sizeof(stats_node_t));
552                 node->name = (char *)strdup (event->name);
553                 node->value = (char *)strdup (event->value);
554                 node->flags = event->flags;
555                 if (src_stats->flags & STATS_HIDDEN)
556                     node->flags |= STATS_HIDDEN;
557                 stats_listener_send (node->flags, "EVENT %s %s %s\n", src_stats->source, event->name, event->value);
558                 avl_insert (src_stats->stats_tree, (void *)node);
559             }
560             return;
561         }
562         if (event->action == STATS_EVENT_REMOVE)
563         {
564             DEBUG2 ("delete node %s from %s", event->name, src_stats->source);
565             stats_listener_send (node->flags, "DELETE %s %s\n", src_stats->source, event->name);
566             avl_delete (src_stats->stats_tree, (void *)node, _free_stats);
567             return;
568         }
569         modify_node_event (node, event);
570         stats_listener_send (node->flags, "EVENT %s %s %s\n", src_stats->source, node->name, node->value);
571         return;
572     }
573     if (event->action == STATS_EVENT_REMOVE && event->name == NULL)
574     {
575         avl_tree_unlock (src_stats->stats_tree);
576         avl_tree_wlock (_stats.source_tree);
577         avl_tree_wlock (src_stats->stats_tree);
578         avl_delete (_stats.source_tree, (void *)src_stats, _free_source_stats);
579         avl_tree_unlock (_stats.source_tree);
580         return;
581     }
582     /* change source flags status */
583     if (event->action & STATS_EVENT_HIDDEN)
584     {
585         avl_node *node = avl_get_first (src_stats->stats_tree);
586         int visible = 0;
587 
588         if ((event->flags&STATS_HIDDEN) == (src_stats->flags&STATS_HIDDEN))
589             return;
590         if (src_stats->flags & STATS_HIDDEN)
591         {
592             stats_node_t *ct = _find_node (src_stats->stats_tree, "server_type");
593             const char *type = "audio/mpeg";
594             if (ct)
595                 type = ct->value;
596             src_stats->flags &= ~STATS_HIDDEN;
597             stats_listener_send (src_stats->flags, "NEW %s %s\n", type, src_stats->source);
598             visible = 1;
599         }
600         else
601         {
602             stats_listener_send (src_stats->flags, "DELETE %s\n", src_stats->source);
603             src_stats->flags |= STATS_HIDDEN;
604         }
605         while (node)
606         {
607             stats_node_t *stats = (stats_node_t*)node->key;
608             if (visible)
609             {
610                 stats->flags &= ~STATS_HIDDEN;
611                 stats_listener_send (stats->flags, "EVENT %s %s %s\n", src_stats->source, stats->name, stats->value);
612             }
613             else
614                 stats->flags |= STATS_HIDDEN;
615             node = avl_get_next (node);
616         }
617     }
618 }
619 
620 
process_source_event(stats_event_t * event)621 static void process_source_event (stats_event_t *event)
622 {
623     stats_source_t *snode;
624 
625     avl_tree_wlock (_stats.source_tree);
626     snode = _find_source(_stats.source_tree, event->source);
627     if (snode == NULL)
628     {
629         if (event->action == STATS_EVENT_REMOVE)
630         {
631             avl_tree_unlock (_stats.source_tree);
632             return;
633         }
634         snode = (stats_source_t *)calloc(1,sizeof(stats_source_t));
635         if (snode == NULL)
636             abort();
637         DEBUG1 ("new source stat %s", event->source);
638         snode->source = (char *)strdup(event->source);
639         snode->stats_tree = avl_tree_new(_compare_stats, NULL);
640         snode->flags = STATS_SLAVE|STATS_GENERAL|STATS_HIDDEN;
641 
642         avl_insert(_stats.source_tree, (void *)snode);
643     }
644     if (event->action == STATS_EVENT_REMOVE && event->name == NULL)
645     {
646         int fallback_stream = 0;
647         avl_tree_wlock (snode->stats_tree);
648         fallback_stream = _find_node (snode->stats_tree, "fallback") == NULL ? 1 : 0;
649         if (fallback_stream)
650             avl_delete(_stats.source_tree, (void *)snode, _free_source_stats);
651         else
652             avl_tree_unlock (snode->stats_tree);
653         avl_tree_unlock (_stats.source_tree);
654         return;
655     }
656     avl_tree_wlock (snode->stats_tree);
657     avl_tree_unlock (_stats.source_tree);
658     process_source_stat (snode, event);
659     avl_tree_unlock (snode->stats_tree);
660 }
661 
662 
stats_set_time(stats_handle_t handle,const char * name,int flags,time_t tm)663 void stats_set_time (stats_handle_t handle, const char *name, int flags, time_t tm)
664 {
665     char buffer[100];
666 
667     util_get_clf_time (buffer, sizeof (buffer), tm);
668     stats_set_flags (handle, name, buffer, flags);
669 }
670 
671 
stats_event_time(const char * mount,const char * name,int flags)672 void stats_event_time (const char *mount, const char *name, int flags)
673 {
674     time_t now = time(NULL);
675     char buffer[100];
676 
677     util_get_clf_time (buffer, sizeof (buffer), now);
678     stats_event_flags (mount, name, buffer, flags);
679 }
680 
681 
stats_listeners_send(client_t * client)682 static int stats_listeners_send (client_t *client)
683 {
684     int loop = 12, total = 0;
685     int ret = 0;
686     event_listener_t *listener = client->shared_data;
687 
688     if (client->connection.error || global.running != ICE_RUNNING)
689         return -1;
690     if (client->refbuf && client->refbuf->flags & STATS_BLOCK_CONNECTION)
691         loop = 14;
692     else
693         // impose a queue limit of 2Meg if it has been connected for so many seconds, gives
694         // chance for some catchup on large data sets.
695         if (listener->content_len > 2000000 && (client->worker->current_time.tv_sec - client->connection.con_time) > 60)
696         {
697             WARN1 ("dropping stats client, %d in queue", listener->content_len);
698             return -1;
699         }
700     client->schedule_ms = client->worker->time_ms;
701     thread_mutex_lock (&_stats.listeners_lock);
702     while (1)
703     {
704         refbuf_t *refbuf = client->refbuf;
705 
706         if (refbuf == NULL)
707         {
708             client->schedule_ms = client->worker->time_ms + 80;
709             break;
710         }
711         if (loop == 0 || total > 50000)
712         {
713             client->schedule_ms = client->worker->time_ms + (total>>11) + 5;
714             break;
715         }
716         ret = format_generic_write_to_client (client);
717         if (ret > 0)
718         {
719             total += ret;
720         }
721         if (client->pos == refbuf->len)
722         {
723             client->refbuf = refbuf->next;
724             listener->content_len -= refbuf->len;
725             refbuf->next = NULL;
726             refbuf_release (refbuf);
727             client->pos = 0;
728             //DEBUG2 ("content is %ld, next %p", listener->content_len, client->refbuf);
729             if (client->refbuf == NULL)
730             {
731                 if (listener->content_len)
732                     WARN1 ("content length is %u", listener->content_len);
733                 listener->recent_block = NULL;
734                 client->schedule_ms = client->worker->time_ms + 60;
735                 break;
736             }
737             loop--;
738         }
739         else
740         {
741             client->schedule_ms = client->worker->time_ms + (ret > 0 ? 70 : 100);
742             break; /* short write, so stop for now */
743         }
744     }
745     thread_mutex_unlock (&_stats.listeners_lock);
746     if (client->connection.error || global.running != ICE_RUNNING)
747         return -1;
748     return 0;
749 }
750 
751 
clear_stats_queue(client_t * client)752 static void clear_stats_queue (client_t *client)
753 {
754     refbuf_t *refbuf = client->refbuf;
755     while (refbuf)
756     {
757         refbuf_t *to_go = refbuf;
758         refbuf = to_go->next;
759         if (to_go->_count != 1) DEBUG1 ("odd count for stats %d", to_go->_count);
760         to_go->next = NULL;
761         refbuf_release (to_go);
762     }
763     client->refbuf = NULL;
764 }
765 
766 
stats_listener_send(int mask,const char * fmt,...)767 static void stats_listener_send (int mask, const char *fmt, ...)
768 {
769     va_list ap;
770     event_listener_t *listener;
771 
772     va_start(ap, fmt);
773 
774     thread_mutex_lock (&_stats.listeners_lock);
775     listener = _stats.event_listeners;
776 
777     while (listener)
778     {
779         int admuser = listener->mask & STATS_HIDDEN,
780             hidden = mask & STATS_HIDDEN,
781             flags = mask & ~STATS_HIDDEN;
782 
783         if (admuser || (hidden == 0 && (flags & listener->mask)))
784             _add_stats_to_stats_client (listener->client, fmt, ap);
785         listener = listener->next;
786     }
787     thread_mutex_unlock (&_stats.listeners_lock);
788     va_end(ap);
789 }
790 
791 
792 /* called after each xml reload */
stats_global(ice_config_t * config)793 void stats_global (ice_config_t *config)
794 {
795     stats_event_flags (NULL, "server_id", config->server_id, STATS_GENERAL);
796     stats_event_flags (NULL, "host", config->hostname, STATS_GENERAL);
797     stats_event (NULL, "location", config->location);
798     stats_event (NULL, "admin", config->admin);
799     global.max_rate = config->max_bandwidth;
800     throttle_sends = 0;
801 }
802 
process_event(stats_event_t * event)803 static void process_event (stats_event_t *event)
804 {
805     if (event == NULL)
806         return;
807     /* check if we are dealing with a global or source event */
808     if (event->source == NULL)
809         process_global_event (event);
810     else
811         process_source_event (event);
812 }
813 
814 
_append_to_bufferv(refbuf_t * refbuf,int max_len,const char * fmt,va_list ap)815 static int _append_to_bufferv (refbuf_t *refbuf, int max_len, const char *fmt, va_list ap)
816 {
817     char *buf = (char*)refbuf->data + refbuf->len;
818     int len = max_len - refbuf->len;
819     int ret;
820     va_list vl;
821 
822     va_copy (vl, ap);
823     if (len <= 0)
824         return -1;
825     ret = vsnprintf (buf, len, fmt, vl);
826     if (ret < 0 || ret >= len)
827         return -1;
828     refbuf->len += ret;
829     return ret;
830 }
831 
_append_to_buffer(refbuf_t * refbuf,int max_len,const char * fmt,...)832 static int _append_to_buffer (refbuf_t *refbuf, int max_len, const char *fmt, ...)
833 {
834     int ret;
835     va_list va;
836 
837     va_start (va, fmt);
838     ret = _append_to_bufferv (refbuf, max_len, fmt, va);
839     va_end(va);
840     if (refbuf->len == 0) // trap for stupid case, report and then ignore it
841     {
842         ERROR1 ("message too big to append, ignoring \"%.25s...\"", refbuf->data);
843         return 0;
844     }
845     return ret;
846 }
847 
848 
_add_node_to_stats_client(client_t * client,refbuf_t * refbuf)849 static void _add_node_to_stats_client (client_t *client, refbuf_t *refbuf)
850 {
851     if (refbuf->len)
852     {
853         event_listener_t *listener = client->shared_data;
854         //DEBUG2 ("content is %ld, next %p", listener->content_len, refbuf);
855 
856         if (listener->recent_block)
857         {
858             listener->recent_block->next = refbuf;
859             listener->recent_block = refbuf;
860         }
861         else
862         {
863             listener->recent_block = refbuf;
864             client->refbuf = refbuf;
865         }
866         listener->content_len += refbuf->len;
867     }
868 }
869 
870 
_add_stats_to_stats_client(client_t * client,const char * fmt,va_list ap)871 static void _add_stats_to_stats_client (client_t *client, const char *fmt, va_list ap)
872 {
873     event_listener_t *listener = client->shared_data;
874     refbuf_t *r = listener->recent_block;
875     worker_t *worker = client->worker;
876 
877     if (worker == NULL) return; // may of left worker
878     if (listener->content_len > 6000000) // max limiter imposed
879     {
880         if (client->connection.error == 0)
881             WARN1 ("Detected large send queue for stats, %s flagged for termination", client->connection.ip);
882         client->connection.error = 1;
883         return;
884     }
885     do
886     {
887         if (r && (r->flags & STATS_BLOCK_CONNECTION) == 0)
888         {
889             /* lets see if we can append to an existing block */
890             if (r->len < 4000)
891             {
892                 int written = _append_to_bufferv (r, 4096, fmt, ap);
893                 if (written > 0)
894                 {
895                     listener->content_len += written;
896                     break;
897                 }
898             }
899         }
900         r = refbuf_new (4096);
901         r->len = 0;
902         if (_append_to_bufferv (r, 4096, fmt, ap) < 0)
903         {
904             WARN1 ("stat details are too large \"%s\"", fmt);
905             refbuf_release (r);
906             return;
907         }
908         _add_node_to_stats_client (client, r);
909     } while (0);
910 }
911 
912 
_dump_stats_to_doc(xmlNodePtr root,const char * show_mount,int flags)913 static xmlNodePtr _dump_stats_to_doc (xmlNodePtr root, const char *show_mount, int flags)
914 {
915     avl_node *avlnode;
916     xmlNodePtr ret = NULL;
917 
918     /* general stats first */
919     avl_tree_rlock (_stats.global_tree);
920     avlnode = avl_get_first(_stats.global_tree);
921     while (avlnode)
922     {
923         stats_node_t *stat = avlnode->key;
924         if (stat->flags & flags)
925             xmlNewTextChild (root, NULL, XMLSTR(stat->name), XMLSTR(stat->value));
926         avlnode = avl_get_next (avlnode);
927     }
928     avl_tree_unlock (_stats.global_tree);
929     /* now per mount stats */
930     avl_tree_rlock (_stats.source_tree);
931     avlnode = avl_get_first(_stats.source_tree);
932     while (avlnode)
933     {
934         stats_source_t *source = (stats_source_t *)avlnode->key;
935         if (((flags&STATS_HIDDEN) || (source->flags&STATS_HIDDEN) == (flags&STATS_HIDDEN)) &&
936                 (show_mount == NULL || strcmp (show_mount, source->source) == 0))
937         {
938             avl_node *avlnode2;
939             xmlNodePtr xmlnode = xmlNewTextChild (root, NULL, XMLSTR("source"), NULL);
940             avl_tree_rlock (source->stats_tree);
941             avlnode2 = avl_get_first (source->stats_tree);
942 
943             xmlSetProp (xmlnode, XMLSTR("mount"), XMLSTR(source->source));
944             if (ret == NULL)
945                 ret = xmlnode;
946             while (avlnode2)
947             {
948                 stats_node_t *stat = avlnode2->key;
949                 if ((flags&STATS_HIDDEN) || (stat->flags&STATS_HIDDEN) == (flags&STATS_HIDDEN))
950                     xmlNewTextChild (xmlnode, NULL, XMLSTR(stat->name), XMLSTR(stat->value));
951                 avlnode2 = avl_get_next (avlnode2);
952             }
953             avl_tree_unlock (source->stats_tree);
954         }
955         avlnode = avl_get_next (avlnode);
956     }
957     avl_tree_unlock (_stats.source_tree);
958     return ret;
959 }
960 
961 
962 /* factoring out code for stats loops
963  * this function copies all stats to queue, and registers
964  */
_register_listener(client_t * client)965 static void _register_listener (client_t *client)
966 {
967     event_listener_t *listener = client->shared_data;
968     avl_node *node;
969     worker_t *worker = client->worker;
970     stats_event_t stats_count;
971     refbuf_t *refbuf, *biglist = NULL, **full_p = &biglist, *last = NULL;
972     size_t size = 8192, len = 0;
973     char buffer[20];
974 
975     build_event (&stats_count, NULL, "stats_connections", buffer);
976     stats_count.action = STATS_EVENT_INC;
977     process_event (&stats_count);
978 
979     /* we register to receive future events, sources could come in after these initial stats */
980     thread_mutex_lock (&_stats.listeners_lock);
981     listener->next = _stats.event_listeners;
982     _stats.event_listeners = listener;
983     thread_mutex_unlock (&_stats.listeners_lock);
984 
985     /* first we fill our initial queue with the headers */
986     refbuf = refbuf_new (size);
987     refbuf->len = 0;
988 
989     _append_to_buffer (refbuf, size, "HTTP/1.0 200 OK\r\nCapability: streamlist stats\r\n\r\n");
990 
991     /* now the global stats */
992     avl_tree_rlock (_stats.global_tree);
993     node = avl_get_first(_stats.global_tree);
994     while (node)
995     {
996         stats_node_t *stat = node->key;
997 
998         if (stat->flags & listener->mask)
999         {
1000             while (_append_to_buffer (refbuf, size, "EVENT global %s %s\n", stat->name, stat->value) < 0)
1001             {
1002                 *full_p = last = refbuf;
1003                 full_p = &refbuf->next;
1004                 len += refbuf->len;
1005                 refbuf = refbuf_new (size);
1006                 refbuf->len = 0;
1007             }
1008         }
1009         node = avl_get_next(node);
1010     }
1011     avl_tree_unlock (_stats.global_tree);
1012     /* now the stats for each source */
1013     avl_tree_rlock (_stats.source_tree);
1014     node = avl_get_first(_stats.source_tree);
1015     while (node)
1016     {
1017         stats_source_t *snode = (stats_source_t *)node->key;
1018 
1019         if (snode->flags & listener->mask)
1020         {
1021             stats_node_t *ct = _find_node (snode->stats_tree, "server_type");
1022             const char *type = "audio/mpeg";
1023             if (ct)
1024                 type = ct->value;
1025             while (_append_to_buffer (refbuf, size, "NEW %s %s\n", type, snode->source) < 0)
1026             {
1027                 *full_p = last = refbuf;
1028                 full_p = &refbuf->next;
1029                 len += refbuf->len;
1030                 refbuf = refbuf_new (size);
1031                 refbuf->len = 0;
1032             }
1033         }
1034         node = avl_get_next(node);
1035     }
1036     while (_append_to_buffer (refbuf, size, "INFO full list end\n") < 0)
1037     {
1038         *full_p = last = refbuf;
1039         full_p = &refbuf->next;
1040         len += refbuf->len;
1041         refbuf = refbuf_new (size);
1042         refbuf->len = 0;
1043     }
1044     node = avl_get_first(_stats.source_tree);
1045     while (node)
1046     {
1047         stats_source_t *snode = (stats_source_t *)node->key;
1048 
1049         if (snode->flags & listener->mask)
1050         {
1051             stats_node_t *metadata_stat = NULL;
1052             avl_node *node2;
1053 
1054             avl_tree_rlock (snode->stats_tree);
1055             node2 = avl_get_first(snode->stats_tree);
1056             while (node2)
1057             {
1058                 stats_node_t *stat = node2->key;
1059                 if (stat->flags & listener->mask)
1060                 {
1061                     if (strcmp (stat->name, "metadata_updated") == 0)
1062                         metadata_stat = stat;
1063                     else
1064                         while (_append_to_buffer (refbuf, size, "EVENT %s %s %s\n", snode->source, stat->name, stat->value) < 0)
1065                         {
1066                             *full_p = last = refbuf;
1067                             full_p = &refbuf->next;
1068                             len += refbuf->len;
1069                             refbuf = refbuf_new (size);
1070                             refbuf->len = 0;
1071                         }
1072                 }
1073                 node2 = avl_get_next (node2);
1074             }
1075             while (metadata_stat &&
1076                     _append_to_buffer (refbuf, size, "EVENT %s %s %s\n", snode->source, metadata_stat->name, metadata_stat->value) < 0)
1077             {
1078                 *full_p = last = refbuf;
1079                 full_p = &refbuf->next;
1080                 len += refbuf->len;
1081                 refbuf = refbuf_new (size);
1082                 refbuf->len = 0;
1083             }
1084             avl_tree_unlock (snode->stats_tree);
1085         }
1086         node = avl_get_next(node);
1087     }
1088     avl_tree_unlock (_stats.source_tree);
1089     if (refbuf->len)
1090     {
1091         *full_p = last = refbuf;
1092         full_p = &refbuf->next;
1093         len += refbuf->len;
1094     }
1095     else
1096         refbuf_release (refbuf); // get rid if empty
1097 
1098     /* before we make the client active (for sending queued data), we need to prepend the stats
1099      * we have just built onto any stats that may of come in */
1100     thread_mutex_lock (&_stats.listeners_lock);
1101     *full_p = client->refbuf;
1102     client->refbuf = biglist;
1103     listener->content_len += len;
1104     while (last->next)
1105         last = last->next; // this should not loop typically, but may do
1106     listener->recent_block = last;
1107     thread_mutex_unlock (&_stats.listeners_lock);
1108 
1109     client->schedule_ms = 0;
1110     client->flags |= CLIENT_ACTIVE;
1111     worker_wakeup (worker);
1112 }
1113 
1114 
stats_client_release(client_t * client)1115 static void stats_client_release (client_t *client)
1116 {
1117     event_listener_t *listener = client->shared_data, *match, **trail;
1118     stats_event_t stats_count;
1119     char buffer [20];
1120 
1121     if (listener == NULL)
1122         return;
1123     thread_mutex_lock (&_stats.listeners_lock);
1124     match = _stats.event_listeners;
1125     trail = &_stats.event_listeners;
1126 
1127     while (match && listener != match)
1128     {
1129         trail = &match->next;
1130         match = *trail;
1131     }
1132     if (match)
1133         *trail = match->next;
1134     else
1135         WARN0 ("odd, no stats client details in collection");
1136     thread_mutex_unlock (&_stats.listeners_lock);
1137 
1138     clear_stats_queue (client);
1139     free (listener->source);
1140     free (listener);
1141     client_destroy (client);
1142 
1143     build_event (&stats_count, NULL, "stats_connections", buffer);
1144     stats_count.action = STATS_EVENT_DEC;
1145     process_event (&stats_count);
1146 }
1147 
1148 
1149 struct _client_functions stats_client_send_ops =
1150 {
1151     stats_listeners_send,
1152     stats_client_release
1153 };
1154 
stats_add_listener(client_t * client,int mask)1155 void stats_add_listener (client_t *client, int mask)
1156 {
1157     event_listener_t *listener = calloc (1, sizeof (event_listener_t));
1158     listener->mask = mask;
1159 
1160     client->respcode = 200;
1161     client->ops = &stats_client_send_ops;
1162     client->shared_data = listener;
1163     client_set_queue (client, NULL);
1164     listener->client = client;
1165 
1166     _register_listener (client);
1167 }
1168 
1169 
stats_transform_xslt(client_t * client,const char * uri)1170 int stats_transform_xslt (client_t *client, const char *uri)
1171 {
1172     xmlDocPtr doc;
1173     char *xslpath = util_get_path_from_normalised_uri (uri, 0);
1174     const char *mount = httpp_get_query_param (client->parser, "mount");
1175     int ret;
1176 
1177     if (mount == NULL && client->server_conn->shoutcast_mount && strcmp (uri, "/7.xsl") == 0)
1178         mount = client->server_conn->shoutcast_mount;
1179 
1180     doc = stats_get_xml (STATS_PUBLIC, mount);
1181 
1182     ret = xslt_transform (doc, xslpath, client);
1183 
1184     free (xslpath);
1185     return ret;
1186 }
1187 
stats_get_xml(int flags,const char * show_mount)1188 xmlDocPtr stats_get_xml (int flags, const char *show_mount)
1189 {
1190     xmlDocPtr doc;
1191     xmlNodePtr node;
1192 
1193     doc = xmlNewDoc (XMLSTR("1.0"));
1194     node = xmlNewDocNode (doc, NULL, XMLSTR("icestats"), NULL);
1195     xmlDocSetRootElement(doc, node);
1196 
1197     node = _dump_stats_to_doc (node, show_mount, flags);
1198 
1199     if (show_mount && node)
1200     {
1201 		source_t *source;
1202         /* show each listener */
1203         avl_tree_rlock (global.source_tree);
1204         source = source_find_mount_raw (show_mount);
1205 
1206         if (source)
1207         {
1208             thread_rwlock_rlock (&source->lock);
1209             admin_source_listeners (source, node);
1210             thread_rwlock_unlock (&source->lock);
1211             avl_tree_unlock (global.source_tree);
1212         }
1213         else
1214         {
1215             fbinfo finfo;
1216 
1217             avl_tree_unlock (global.source_tree);
1218             finfo.flags = FS_FALLBACK;
1219             finfo.mount = (char*)show_mount;
1220             finfo.limit = 0;
1221             finfo.fallback = NULL;
1222 
1223             fserve_list_clients_xml (node, &finfo);
1224         }
1225     }
1226     return doc;
1227 }
1228 
_compare_stats(void * arg,void * a,void * b)1229 static int _compare_stats(void *arg, void *a, void *b)
1230 {
1231     stats_node_t *nodea = (stats_node_t *)a;
1232     stats_node_t *nodeb = (stats_node_t *)b;
1233 
1234     return strcmp(nodea->name, nodeb->name);
1235 }
1236 
_compare_source_stats(void * arg,void * a,void * b)1237 static int _compare_source_stats(void *arg, void *a, void *b)
1238 {
1239     stats_source_t *nodea = (stats_source_t *)a;
1240     stats_source_t *nodeb = (stats_source_t *)b;
1241 
1242     return strcmp(nodea->source, nodeb->source);
1243 }
1244 
_free_stats(void * key)1245 static int _free_stats(void *key)
1246 {
1247     stats_node_t *node = (stats_node_t *)key;
1248     free(node->value);
1249     free(node->name);
1250     free(node);
1251 
1252     return 1;
1253 }
1254 
_free_source_stats(void * key)1255 static int _free_source_stats(void *key)
1256 {
1257     stats_source_t *node = (stats_source_t *)key;
1258     stats_listener_send (node->flags, "DELETE %s\n", node->source);
1259     DEBUG1 ("delete source node %s", node->source);
1260     avl_tree_unlock (node->stats_tree);
1261     avl_tree_free(node->stats_tree, _free_stats);
1262     free(node->source);
1263     free(node);
1264 
1265     return 1;
1266 }
1267 
_free_source_stats_wrapper(void * key)1268 static int _free_source_stats_wrapper (void *key)
1269 {
1270     stats_source_t *node = (stats_source_t *)key;
1271     avl_tree_rlock (node->stats_tree);
1272     _free_source_stats (node);
1273     return 1;
1274 }
1275 
1276 /* return a list of blocks which contain lines of text. Each line is a mountpoint
1277  * reference that a slave will use for relaying.  The prepend setting is to indicate
1278  * if some something else needs to be added to each line.
1279  */
stats_get_streams(int prepend)1280 refbuf_t *stats_get_streams (int prepend)
1281 {
1282 #define STREAMLIST_BLKSIZE  4096
1283     avl_node *node;
1284     unsigned int remaining = STREAMLIST_BLKSIZE, prelen;
1285     refbuf_t *start = refbuf_new (remaining), *cur = start;
1286     const char *pre = "";
1287     char *buffer = cur->data;
1288 
1289     if (prepend)
1290         pre = "/admin/streams?mount=";
1291     prelen = strlen (pre);
1292 
1293     /* now the stats for each source */
1294     avl_tree_rlock (_stats.source_tree);
1295     node = avl_get_first(_stats.source_tree);
1296     while (node)
1297     {
1298         int ret;
1299         stats_source_t *source = (stats_source_t *)node->key;
1300 
1301         if ((source->flags & STATS_HIDDEN) == 0)
1302         {
1303             if (remaining <= strlen (source->source) + prelen + 3)
1304             {
1305                 cur->len = STREAMLIST_BLKSIZE - remaining;
1306                 cur->next = refbuf_new (STREAMLIST_BLKSIZE);
1307                 remaining = STREAMLIST_BLKSIZE;
1308                 cur = cur->next;
1309                 buffer = cur->data;
1310             }
1311             ret = snprintf (buffer, remaining, "%s%s\r\n", pre, source->source);
1312             if (ret > 0)
1313             {
1314                 buffer += ret;
1315                 remaining -= ret;
1316             }
1317         }
1318         node = avl_get_next(node);
1319     }
1320     avl_tree_unlock (_stats.source_tree);
1321     cur->len = STREAMLIST_BLKSIZE - remaining;
1322     return start;
1323 }
1324 
1325 
1326 
1327 /* because we can have stats entries for inactive mountpoints (when there is a fallback)
1328  * then these need to be left on, while others need to be removed
1329  */
stats_purge(time_t mark)1330 void stats_purge (time_t mark)
1331 {
1332     avl_node *snode;
1333 
1334     avl_tree_wlock (_stats.source_tree);
1335     snode = avl_get_first(_stats.source_tree);
1336     while (snode)
1337     {
1338         stats_source_t *src = (stats_source_t *)snode->key;
1339 
1340         snode = avl_get_next (snode);
1341         if (src->source[0] == '/')
1342         {
1343             if (src->updated < mark)
1344             {
1345                 avl_tree_wlock (src->stats_tree);
1346                 avl_delete (_stats.source_tree, src, _free_source_stats);
1347             }
1348             continue;
1349         }
1350         if (fserve_contains (src->source) < 0)
1351         {
1352             /* no source_t and no fallback file stat, so delete */
1353             DEBUG1 ("dropping unreferenced stats for %s", src->source);
1354             avl_tree_wlock (src->stats_tree);
1355             avl_delete (_stats.source_tree, src, _free_source_stats);
1356         }
1357     }
1358     avl_tree_unlock (_stats.source_tree);
1359 }
1360 
1361 
stats_global_calc(time_t now)1362 void stats_global_calc (time_t now)
1363 {
1364     stats_event_t clients, listeners;
1365     avl_node *anode;
1366     char buf1 [VAL_BUFSIZE];
1367     char buf2 [VAL_BUFSIZE];
1368     char buf3 [VAL_BUFSIZE];
1369 
1370     global_lock();
1371     connection_stats ();
1372 
1373     snprintf (buf1, sizeof(buf1), "%" PRIu64, (int64_t)global.clients);
1374 
1375     snprintf (buf2, sizeof(buf2), "%" PRIu64, (int64_t)global.listeners);
1376     snprintf (buf3, sizeof(buf3), "%" PRIu64,
1377             (int64_t)global_getrate_avg (global.out_bitrate) * 8 / 1024);
1378     global_unlock();
1379 
1380     build_event (&clients, NULL, "clients", buf1);
1381     clients.flags |= STATS_COUNTERS;
1382     process_event (&clients);
1383     build_event (&listeners, NULL, "listeners", buf2);
1384     listeners.flags |= STATS_COUNTERS;
1385     process_event (&listeners);
1386 
1387     avl_tree_wlock (_stats.global_tree);
1388     anode = avl_get_first(_stats.global_tree);
1389     while (anode)
1390     {
1391         stats_node_t *node = (stats_node_t *)anode->key;
1392 
1393         if (node->flags & STATS_REGULAR)
1394         {
1395             if (node->last_reported + 9 < now)
1396             {
1397                 stats_listener_send (node->flags, "EVENT global %s %s\n", node->name, node->value);
1398                 DEBUG2 ("update global %s (%s)", node->name, node->value);
1399                 node->last_reported = now;
1400             }
1401         }
1402         anode = avl_get_next (anode);
1403     }
1404     avl_tree_unlock (_stats.global_tree);
1405 
1406     build_event (&clients, NULL, "outgoing_kbitrate", buf3);
1407     clients.flags = STATS_COUNTERS|STATS_HIDDEN;
1408     process_event (&clients);
1409 }
1410 
1411 
stats_handle(const char * mount)1412 stats_handle_t stats_handle (const char *mount)
1413 {
1414     stats_source_t *src_stats;
1415 
1416     if (mount == NULL)
1417         return 0;
1418     avl_tree_wlock (_stats.source_tree);
1419     src_stats = _find_source(_stats.source_tree, mount);
1420     if (src_stats == NULL)
1421     {
1422         src_stats = (stats_source_t *)calloc (1, sizeof (stats_source_t));
1423         if (src_stats == NULL)
1424             abort();
1425         DEBUG1 ("new source stat %s", mount);
1426         src_stats->source = (char *)strdup (mount);
1427         src_stats->stats_tree = avl_tree_new (_compare_stats, NULL);
1428         src_stats->flags = STATS_SLAVE|STATS_GENERAL|STATS_HIDDEN;
1429 
1430         avl_insert (_stats.source_tree, (void *)src_stats);
1431     }
1432     src_stats->updated = (time_t)(LONG_MAX);
1433     avl_tree_wlock (src_stats->stats_tree);
1434     avl_tree_unlock (_stats.source_tree);
1435 
1436     return (stats_handle_t)src_stats;
1437 }
1438 
1439 
stats_lock(stats_handle_t handle,const char * mount)1440 stats_handle_t stats_lock (stats_handle_t handle, const char *mount)
1441 {
1442     stats_source_t *src_stats = (stats_source_t *)handle;
1443     if (src_stats == NULL)
1444         src_stats = (stats_source_t*)stats_handle (mount);
1445     else
1446         avl_tree_wlock (src_stats->stats_tree);
1447     return (stats_handle_t)src_stats;
1448 }
1449 
1450 
stats_release(stats_handle_t handle)1451 void stats_release (stats_handle_t handle)
1452 {
1453     stats_source_t *src_stats = (stats_source_t *)handle;
1454     if (src_stats)
1455         avl_tree_unlock (src_stats->stats_tree);
1456 }
1457 
1458 
1459 // drops stats attached to this handle but don't remove the handle itself
stats_flush(stats_handle_t handle)1460 void stats_flush (stats_handle_t handle)
1461 {
1462     if (handle)
1463     {
1464         stats_source_t *src_stats = (stats_source_t *)handle;
1465         avl_tree *t = src_stats->stats_tree;
1466         avl_node *node;
1467 
1468         avl_tree_wlock (src_stats->stats_tree);
1469         while ((node = src_stats->stats_tree->root->right))
1470         {
1471             stats_node_t *stats = (stats_node_t*)node->key;
1472             DEBUG2 ("Removing %s from %s", stats->name, src_stats->source);
1473             avl_delete (t, (void*)stats, _free_stats);
1474         }
1475         stats_listener_send (src_stats->flags, "FLUSH %s\n", src_stats->source);
1476         avl_tree_unlock (src_stats->stats_tree);
1477     }
1478 }
1479 
1480 
1481 // assume source stats are write locked
stats_set(stats_handle_t handle,const char * name,const char * value)1482 void stats_set (stats_handle_t handle, const char *name, const char *value)
1483 {
1484     if (handle)
1485     {
1486         stats_source_t *src_stats = (stats_source_t *)handle;
1487         stats_event_t event;
1488 
1489         build_event (&event, src_stats->source, name, (char *)value);
1490         process_source_stat (src_stats, &event);
1491     }
1492 }
1493 
1494 
stats_set_inc(stats_handle_t handle,const char * name)1495 void stats_set_inc (stats_handle_t handle, const char *name)
1496 {
1497     if (handle)
1498     {
1499         stats_source_t *src_stats = (stats_source_t *)handle;
1500         stats_event_t event;
1501         char buffer[VAL_BUFSIZE] = "1";
1502 
1503         build_event (&event, src_stats->source, name, buffer);
1504         event.action = STATS_EVENT_INC;
1505         process_source_stat (src_stats, &event);
1506     }
1507 }
1508 
1509 
stats_set_args(stats_handle_t handle,const char * name,const char * format,...)1510 void stats_set_args (stats_handle_t handle, const char *name, const char *format, ...)
1511 {
1512     va_list val;
1513     int ret;
1514     stats_source_t *src_stats = (stats_source_t *)handle;
1515     char buf[1024];
1516 
1517     if (name == NULL)
1518         return;
1519     va_start (val, format);
1520     ret = vsnprintf (buf, sizeof (buf), format, val);
1521     va_end (val);
1522 
1523     if (ret < 0 || (unsigned int)ret >= sizeof (buf))
1524     {
1525         WARN2 ("problem with formatting %s stat %s",
1526                 src_stats == NULL ? "global" : src_stats->source, name);
1527         return;
1528     }
1529     stats_set (handle, name, buf);
1530 }
1531 
1532 
stats_set_expire(stats_handle_t handle,time_t mark)1533 void stats_set_expire (stats_handle_t handle, time_t mark)
1534 {
1535     stats_source_t *src_stats = (stats_source_t *)handle;
1536 
1537     if (src_stats)
1538         src_stats->updated = mark;
1539 }
1540 
1541 
stats_set_flags(stats_handle_t handle,const char * name,const char * value,int flags)1542 void stats_set_flags (stats_handle_t handle, const char *name, const char *value, int flags)
1543 {
1544     stats_source_t *src_stats = (stats_source_t *)handle;
1545     stats_event_t event;
1546 
1547     build_event (&event, src_stats->source, name, value);
1548     event.flags = flags;
1549     if (value)
1550         event.action |= STATS_EVENT_HIDDEN;
1551     else
1552         event.action = STATS_EVENT_HIDDEN;
1553     process_source_stat (src_stats, &event);
1554 }
1555 
1556 
contains_xml_entity(const char * value)1557 static int contains_xml_entity (const char *value)
1558 {
1559     if (value)
1560     {
1561         const char *p = strchr (value, '&');
1562         char semi = '\0';
1563 
1564         if (p && sscanf (p, "&%*9[^; ]%c", &semi) == 1 && semi == ';')
1565             return 1;
1566     }
1567     return 0;
1568 }
1569 
1570 
stats_set_entity_decode(stats_handle_t handle,const char * name,const char * value)1571 static void stats_set_entity_decode (stats_handle_t handle, const char *name, const char *value)
1572 {
1573     if (contains_xml_entity (value))
1574     {
1575         xmlDocPtr  doc = xmlNewDoc(NULL);
1576         xmlNodePtr xmlnode;
1577         xmlNodePtr rootelem = xmlNewNode (NULL, (xmlChar *) "html");
1578         stats_source_t *src_stats = (stats_source_t *)handle;
1579         xmlChar *decoded;
1580         char details[200];
1581 
1582         xmlDocSetRootElement (doc, rootelem);
1583 
1584         snprintf (details, sizeof details, "mount %s, name %s, value %s :", src_stats->source, name, value);
1585         xmlSetGenericErrorFunc (details, log_parse_failure);
1586         xmlnode = xmlStringGetNodeList (doc, XMLSTR(value));
1587         decoded = xmlNodeListGetString (doc, xmlnode, 1);
1588         stats_set (handle, name, (void*)decoded);
1589         xmlFree (decoded);
1590         xmlFreeNodeList (xmlnode);
1591         xmlFreeDoc (doc);
1592         return;
1593     }
1594     stats_set (handle, name, value);
1595 }
1596 
1597 
stats_set_conv(stats_handle_t handle,const char * name,const char * value,const char * charset)1598 void stats_set_conv (stats_handle_t handle, const char *name, const char *value, const char *charset)
1599 {
1600     if (charset)
1601     {
1602         xmlCharEncodingHandlerPtr encoding = xmlFindCharEncodingHandler (charset);
1603 
1604         if (encoding)
1605         {
1606             xmlBufferPtr in = xmlBufferCreate ();
1607             xmlBufferPtr conv = xmlBufferCreate ();
1608 
1609             xmlBufferCCat (in, value);
1610             if (xmlCharEncInFunc (encoding, conv, in) > 0)
1611                 stats_set_entity_decode (handle, name, (void*)xmlBufferContent (conv));
1612             xmlBufferFree (in);
1613             xmlBufferFree (conv);
1614             xmlCharEncCloseFunc (encoding);
1615             return;
1616         }
1617         WARN1 ("No charset found for \"%s\"", charset);
1618         return;
1619     }
1620     if (value && xmlCheckUTF8 ((unsigned char *)value) == 0)
1621     {
1622         WARN2 ("seen non-UTF8 data, probably incorrect charcter set (%s, %s)", name, value);
1623         return;
1624     }
1625     stats_set_entity_decode (handle, name, value);
1626 }
1627 
1628 
stats_listener_to_xml(client_t * listener,xmlNodePtr parent)1629 void stats_listener_to_xml (client_t *listener, xmlNodePtr parent)
1630 {
1631     const char *header;
1632     char buf[30];
1633 
1634     xmlNodePtr node = xmlNewChild (parent, NULL, XMLSTR("listener"), NULL);
1635 
1636     snprintf (buf, sizeof (buf), "%" PRIu64, listener->connection.id);
1637     xmlSetProp (node, XMLSTR("id"), XMLSTR(buf));
1638     xmlNewChild (node, NULL, XMLSTR("ID"), XMLSTR(buf));
1639 
1640     xmlNewChild (node, NULL, XMLSTR("IP"), XMLSTR(listener->connection.ip));
1641 
1642     header = httpp_getvar (listener->parser, "user-agent");
1643     if (header && xmlCheckUTF8((unsigned char *)header))
1644     {
1645         xmlChar *str = xmlEncodeEntitiesReentrant (parent->doc, XMLSTR(header));
1646         xmlNewChild (node, NULL, XMLSTR("UserAgent"), str);
1647         xmlFree (str);
1648     }
1649 
1650     header = httpp_getvar (listener->parser, "referer");
1651     if (header && xmlCheckUTF8((unsigned char *)header))
1652     {
1653         xmlChar *str = xmlEncodeEntitiesReentrant (parent->doc, XMLSTR(header));
1654         xmlNewChild (node, NULL, XMLSTR("Referer"), str);
1655         xmlFree (str);
1656     }
1657 
1658     if ((listener->flags & (CLIENT_ACTIVE|CLIENT_IN_FSERVE)) == CLIENT_ACTIVE)
1659     {
1660         source_t *source = listener->shared_data;
1661         snprintf (buf, sizeof (buf), "%"PRIu64, source->client->queue_pos - listener->queue_pos);
1662     }
1663     else
1664         snprintf (buf, sizeof (buf), "0");
1665     xmlNewChild (node, NULL, XMLSTR("lag"), XMLSTR(buf));
1666 
1667     if (listener->worker)
1668     {
1669         snprintf (buf, sizeof (buf), "%lu",
1670                 (unsigned long)(listener->worker->current_time.tv_sec - listener->connection.con_time));
1671         xmlNewChild (node, NULL, XMLSTR("Connected"), XMLSTR(buf));
1672     }
1673     if (listener->username)
1674     {
1675         xmlChar *str = xmlEncodeEntitiesReentrant (parent->doc, XMLSTR(listener->username));
1676         xmlNewChild (node, NULL, XMLSTR("username"), str);
1677         xmlFree (str);
1678     }
1679 }
1680 
1681