1 /* Icecast
2 *
3 * This program is distributed under the GNU General Public License, version 2.
4 * A copy of this license is included with this source.
5 *
6 * Copyright 2000-2004, Jack Moffitt <jack@xiph.org,
7 * Michael Smith <msmith@xiph.org>,
8 * oddsock <oddsock@xiph.org>,
9 * Karl Heyes <karl@xiph.org>
10 * and others (see AUTHORS for details).
11 */
12
13 #ifdef HAVE_CONFIG_H
14 #include <config.h>
15 #endif
16
17 #include <stdio.h>
18 #include <string.h>
19 #include <stdlib.h>
20 #include <stdarg.h>
21 #include <limits.h>
22
23 #include <libxml/xmlmemory.h>
24 #include <libxml/parser.h>
25 #include <libxml/parserInternals.h>
26 #include <libxml/tree.h>
27
28 #include <git_hash.h>
29 #include "thread/thread.h"
30 #include "avl/avl.h"
31 #include "httpp/httpp.h"
32 #include "net/sock.h"
33
34 #include "connection.h"
35
36 #include "source.h"
37 #include "admin.h"
38 #include "global.h"
39 #include "refbuf.h"
40 #include "client.h"
41 #include "stats.h"
42 #include "xslt.h"
43 #include "util.h"
44 #include "fserve.h"
45 #define CATMODULE "stats"
46 #include "logging.h"
47
48 #if !defined HAVE_ATOLL && defined HAVE_STRTOLL
49 #define atoll(nptr) strtoll(nptr, (char **)NULL, 10)
50 #endif
51
52 #define VAL_BUFSIZE 30
53 #define STATS_BLOCK_CONNECTION 01
54
55 #define STATS_EVENT_SET 0
56 #define STATS_EVENT_INC 1
57 #define STATS_EVENT_DEC 2
58 #define STATS_EVENT_ADD 3
59 #define STATS_EVENT_SUB 4
60 #define STATS_EVENT_REMOVE 5
61 #define STATS_EVENT_HIDDEN 0x80
62
63 typedef struct _stats_node_tag
64 {
65 char *name;
66 char *value;
67 time_t last_reported;
68 int flags;
69 } stats_node_t;
70
71 typedef struct _stats_event_tag
72 {
73 char *source;
74 char *name;
75 char *value;
76 int flags;
77 int action;
78
79 struct _stats_event_tag *next;
80 } stats_event_t;
81
82 typedef struct _stats_source_tag
83 {
84 char *source;
85 int flags;
86 time_t updated;
87 avl_tree *stats_tree;
88 } stats_source_t;
89
90 typedef struct _event_listener_tag
91 {
92 int mask;
93 unsigned int content_len;
94 char *source;
95
96 /* queue for unwritten stats to stats clients */
97 refbuf_t *recent_block;
98 client_t *client;
99
100 struct _event_listener_tag *next;
101 } event_listener_t;
102
103
104 typedef struct _stats_tag
105 {
106 avl_tree *global_tree;
107 avl_tree *source_tree;
108
109 /* list of listeners for stats */
110 event_listener_t *event_listeners;
111 mutex_t listeners_lock;
112
113 } stats_t;
114
115 static volatile int _stats_running = 0;
116
117 static stats_t _stats;
118
119
120 static int _compare_stats(void *a, void *b, void *arg);
121 static int _compare_source_stats(void *a, void *b, void *arg);
122 static int _free_stats(void *key);
123 static int _free_source_stats(void *key);
124 static int _free_source_stats_wrapper (void *key);
125 static stats_node_t *_find_node(const avl_tree *tree, const char *name);
126 static stats_source_t *_find_source(avl_tree *tree, const char *source);
127 static void process_event (stats_event_t *event);
128 static void _add_stats_to_stats_client (client_t *client, const char *fmt, va_list ap);
129 static void stats_listener_send (int flags, const char *fmt, ...);
130
131 unsigned int throttle_sends;
132
133 /* simple helper function for creating an event */
build_event(stats_event_t * event,const char * source,const char * name,const char * value)134 static void build_event (stats_event_t *event, const char *source, const char *name, const char *value)
135 {
136 event->source = (char *)source;
137 event->name = (char *)name;
138 event->value = (char *)value;
139 event->flags = STATS_PUBLIC;
140 if (source) event->flags |= STATS_SLAVE;
141 if (value)
142 event->action = STATS_EVENT_SET;
143 else
144 event->action = STATS_EVENT_REMOVE;
145 }
146
147
stats_initialize(void)148 void stats_initialize(void)
149 {
150 if (_stats_running)
151 return;
152
153 /* set up global struct */
154 _stats.global_tree = avl_tree_new(_compare_stats, NULL);
155 _stats.source_tree = avl_tree_new(_compare_source_stats, NULL);
156
157 _stats.event_listeners = NULL;
158 thread_mutex_create (&_stats.listeners_lock);
159
160 _stats_running = 1;
161
162 stats_event_time (NULL, "server_start", STATS_GENERAL);
163
164 /* global currently active stats */
165 stats_event_flags (NULL, "clients", "0", STATS_COUNTERS|STATS_REGULAR);
166 stats_event_flags (NULL, "listeners", "0", STATS_COUNTERS|STATS_REGULAR);
167 stats_event_flags (NULL, "connections", "0", STATS_COUNTERS|STATS_REGULAR);
168 stats_event_flags (NULL, "sources", "0", STATS_COUNTERS|STATS_REGULAR);
169 stats_event_flags (NULL, "stats", "0", STATS_COUNTERS|STATS_REGULAR);
170 stats_event_flags (NULL, "banned_IPs", "0", STATS_COUNTERS|STATS_REGULAR);
171 #ifdef GIT_VERSION
172 stats_event (NULL, "build", GIT_VERSION);
173 #endif
174
175 /* global accumulating stats */
176 stats_event_flags (NULL, "client_connections", "0", STATS_COUNTERS|STATS_REGULAR);
177 stats_event_flags (NULL, "source_client_connections", "0", STATS_COUNTERS|STATS_REGULAR);
178 stats_event_flags (NULL, "source_relay_connections", "0", STATS_COUNTERS|STATS_REGULAR);
179 stats_event_flags (NULL, "source_total_connections", "0", STATS_COUNTERS|STATS_REGULAR);
180 stats_event_flags (NULL, "stats_connections", "0", STATS_COUNTERS|STATS_REGULAR);
181 stats_event_flags (NULL, "listener_connections", "0", STATS_COUNTERS|STATS_REGULAR);
182 stats_event_flags (NULL, "outgoing_kbitrate", "0", STATS_COUNTERS|STATS_REGULAR);
183 stats_event_flags (NULL, "stream_kbytes_sent", "0", STATS_COUNTERS|STATS_REGULAR);
184 stats_event_flags (NULL, "stream_kbytes_read", "0", STATS_COUNTERS|STATS_REGULAR);
185 }
186
stats_shutdown(void)187 void stats_shutdown(void)
188 {
189 if(!_stats_running) /* We can't shutdown if we're not running. */
190 return;
191
192 _stats_running = 0;
193
194 avl_tree_free(_stats.source_tree, _free_source_stats_wrapper);
195 avl_tree_free(_stats.global_tree, _free_stats);
196 thread_mutex_destroy (&_stats.listeners_lock);
197 }
198
199
stats_clients_wakeup(void)200 void stats_clients_wakeup (void)
201 {
202 event_listener_t *listener;
203
204 thread_mutex_lock (&_stats.listeners_lock);
205 listener = _stats.event_listeners;
206 while (listener)
207 {
208 client_t *client = listener->client;
209 if (client)
210 client->schedule_ms = 0;
211 listener = listener->next;
212 }
213 thread_mutex_unlock (&_stats.listeners_lock);
214 }
215
216
217 /* simple name=tag stat create/update */
stats_event(const char * source,const char * name,const char * value)218 void stats_event(const char *source, const char *name, const char *value)
219 {
220 stats_event_t event;
221
222 if (value && xmlCheckUTF8 ((unsigned char *)value) == 0)
223 {
224 WARN3 ("seen non-UTF8 data (%s), probably incorrect metadata (%s, %s)",
225 source?source:"global", name, value);
226 return;
227 }
228 build_event (&event, source, name, (char *)value);
229 process_event (&event);
230 }
231
232
233 /* wrapper for stats_event, this takes a charset to convert from */
stats_event_conv(const char * mount,const char * name,const char * value,const char * charset)234 void stats_event_conv(const char *mount, const char *name, const char *value, const char *charset)
235 {
236 const char *metadata = value;
237 xmlBufferPtr conv = xmlBufferCreate ();
238
239 if (charset && value)
240 {
241 xmlCharEncodingHandlerPtr handle = xmlFindCharEncodingHandler (charset);
242
243 if (handle)
244 {
245 xmlBufferPtr raw = xmlBufferCreate ();
246 xmlBufferAdd (raw, (const xmlChar *)value, strlen (value));
247 if (xmlCharEncInFunc (handle, conv, raw) > 0)
248 metadata = (char *)xmlBufferContent (conv);
249 xmlBufferFree (raw);
250 xmlCharEncCloseFunc (handle);
251 }
252 else
253 WARN1 ("No charset found for \"%s\"", charset);
254 }
255
256 stats_event (mount, name, metadata);
257 xmlBufferFree (conv);
258 }
259
260 /* set stat with flags, name can be NULL if it applies to a whole
261 * source stats tree. */
stats_event_flags(const char * source,const char * name,const char * value,int flags)262 void stats_event_flags (const char *source, const char *name, const char *value, int flags)
263 {
264 stats_event_t event;
265
266 build_event (&event, source, name, value);
267 event.flags = flags;
268 if (value)
269 event.action |= STATS_EVENT_HIDDEN;
270 else
271 event.action = STATS_EVENT_HIDDEN;
272 process_event (&event);
273 }
274
275 /* printf style formatting for stat create/update */
stats_event_args(const char * source,char * name,char * format,...)276 void stats_event_args(const char *source, char *name, char *format, ...)
277 {
278 va_list val;
279 int ret;
280 char buf[1024];
281
282 if (name == NULL)
283 return;
284 va_start(val, format);
285 ret = vsnprintf(buf, sizeof (buf), format, val);
286 va_end(val);
287
288 if (ret < 0 || (unsigned int)ret >= sizeof (buf))
289 {
290 WARN2 ("problem with formatting %s stat %s",
291 source==NULL ? "global" : source, name);
292 return;
293 }
294 stats_event(source, name, buf);
295 }
296
_get_stats(const char * source,const char * name)297 static char *_get_stats(const char *source, const char *name)
298 {
299 stats_node_t *stats = NULL;
300 stats_source_t *src = NULL;
301 char *value = NULL;
302
303 if (source == NULL) {
304 avl_tree_rlock (_stats.global_tree);
305 stats = _find_node(_stats.global_tree, name);
306 if (stats) value = (char *)strdup(stats->value);
307 avl_tree_unlock (_stats.global_tree);
308 } else {
309 avl_tree_rlock (_stats.source_tree);
310 src = _find_source(_stats.source_tree, source);
311 if (src)
312 {
313 avl_tree_rlock (src->stats_tree);
314 avl_tree_unlock (_stats.source_tree);
315 stats = _find_node(src->stats_tree, name);
316 if (stats) value = (char *)strdup(stats->value);
317 avl_tree_unlock (src->stats_tree);
318 }
319 else
320 avl_tree_unlock (_stats.source_tree);
321 }
322
323 return value;
324 }
325
stats_get_value(const char * source,const char * name)326 char *stats_get_value(const char *source, const char *name)
327 {
328 return(_get_stats(source, name));
329 }
330
331
stats_retrieve(stats_handle_t handle,const char * name)332 char *stats_retrieve (stats_handle_t handle, const char *name)
333 {
334 char *v = NULL;
335 stats_source_t *src_stats = (stats_source_t *)handle;
336 stats_node_t *stats = _find_node (src_stats->stats_tree, name);
337
338 if (stats) v = strdup (stats->value);
339 return v;
340 }
341
342
343 /* increase the value in the provided stat by 1 */
stats_event_inc(const char * source,const char * name)344 void stats_event_inc(const char *source, const char *name)
345 {
346 stats_event_t event;
347 char buffer[VAL_BUFSIZE] = "1";
348 build_event (&event, source, name, buffer);
349 /* DEBUG2("%s on %s", name, source==NULL?"global":source); */
350 event.action = STATS_EVENT_INC;
351 process_event (&event);
352 }
353
stats_event_add(const char * source,const char * name,unsigned long value)354 void stats_event_add(const char *source, const char *name, unsigned long value)
355 {
356 stats_event_t event;
357 char buffer [VAL_BUFSIZE];
358
359 if (value == 0)
360 return;
361 build_event (&event, source, name, buffer);
362 snprintf (buffer, VAL_BUFSIZE, "%ld", value);
363 event.action = STATS_EVENT_ADD;
364 /* DEBUG2("%s on %s", name, source==NULL?"global":source); */
365 process_event (&event);
366 }
367
stats_event_sub(const char * source,const char * name,unsigned long value)368 void stats_event_sub(const char *source, const char *name, unsigned long value)
369 {
370 stats_event_t event;
371 char buffer[VAL_BUFSIZE];
372
373 if (value == 0)
374 return;
375 build_event (&event, source, name, buffer);
376 /* DEBUG2("%s on %s", name, source==NULL?"global":source); */
377 snprintf (buffer, VAL_BUFSIZE, "%ld", value);
378 event.action = STATS_EVENT_SUB;
379 process_event (&event);
380 }
381
382 /* decrease the value in the provided stat by 1 */
stats_event_dec(const char * source,const char * name)383 void stats_event_dec(const char *source, const char *name)
384 {
385 stats_event_t event;
386 char buffer[VAL_BUFSIZE] = "0";
387 /* DEBUG2("%s on %s", name, source==NULL?"global":source); */
388 build_event (&event, source, name, buffer);
389 event.action = STATS_EVENT_DEC;
390 process_event (&event);
391 }
392
393 /* note: you must call this function only when you have exclusive access
394 ** to the avl_tree
395 */
_find_node(const avl_tree * stats_tree,const char * name)396 static stats_node_t *_find_node(const avl_tree *stats_tree, const char *name)
397 {
398 stats_node_t *stats;
399 avl_node *node;
400 int cmp;
401
402 /* get the root node */
403 node = stats_tree->root->right;
404
405 while (node) {
406 stats = (stats_node_t *)node->key;
407 cmp = strcmp(name, stats->name);
408 if (cmp < 0)
409 node = node->left;
410 else if (cmp > 0)
411 node = node->right;
412 else
413 return stats;
414 }
415
416 /* didn't find it */
417 return NULL;
418 }
419
420 /* note: you must call this function only when you have exclusive access
421 ** to the avl_tree
422 */
_find_source(avl_tree * source_tree,const char * source)423 static stats_source_t *_find_source(avl_tree *source_tree, const char *source)
424 {
425 stats_source_t *stats;
426 avl_node *node;
427 int cmp;
428
429 /* get the root node */
430 node = source_tree->root->right;
431 while (node) {
432 stats = (stats_source_t *)node->key;
433 cmp = strcmp(source, stats->source);
434 if (cmp < 0)
435 node = node->left;
436 else if (cmp > 0)
437 node = node->right;
438 else
439 return stats;
440 }
441
442 /* didn't find it */
443 return NULL;
444 }
445
446
447 /* helper to apply specialised changes to a stats node */
modify_node_event(stats_node_t * node,stats_event_t * event)448 static void modify_node_event (stats_node_t *node, stats_event_t *event)
449 {
450 if (node == NULL || event == NULL)
451 return;
452 if (event->action & STATS_EVENT_HIDDEN)
453 {
454 node->flags = event->flags;
455 event->action &= ~STATS_EVENT_HIDDEN;
456 if (event->value == NULL)
457 return;
458 }
459 if (event->action == STATS_EVENT_SET)
460 {
461 if (node->flags & STATS_REGULAR)
462 {
463 if (node->value && strcmp (node->value, event->value) == 0)
464 return; // no change, lets get out
465 }
466 }
467 else
468 {
469 int64_t value = 0;
470
471 switch (event->action)
472 {
473 case STATS_EVENT_INC:
474 value = atoll (node->value)+1;
475 break;
476 case STATS_EVENT_DEC:
477 value = atoll (node->value)-1;
478 break;
479 case STATS_EVENT_ADD:
480 value = atoll (node->value) + atoll (event->value);
481 break;
482 case STATS_EVENT_SUB:
483 value = atoll (node->value) - atoll (event->value);
484 break;
485 default:
486 break;
487 }
488 snprintf (event->value, VAL_BUFSIZE, "%" PRId64, value);
489 }
490 if (node->value)
491 free (node->value);
492 node->value = strdup (event->value);
493
494 if (node->flags & STATS_REGULAR)
495 node->last_reported = 0;
496 else
497 DEBUG3 ("update \"%s\" %s (%s)", event->source?event->source:"global", node->name, node->value);
498 }
499
500
process_global_event(stats_event_t * event)501 static void process_global_event (stats_event_t *event)
502 {
503 stats_node_t *node = NULL;
504
505 avl_tree_wlock (_stats.global_tree);
506 /* DEBUG3("global event %s %s %d", event->name, event->value, event->action); */
507 if (event->action == STATS_EVENT_REMOVE)
508 {
509 /* we're deleting */
510 node = _find_node(_stats.global_tree, event->name);
511 if (node != NULL)
512 {
513 stats_listener_send (node->flags, "DELETE global %s\n", event->name);
514 avl_delete(_stats.global_tree, (void *)node, _free_stats);
515 }
516 avl_tree_unlock (_stats.global_tree);
517 return;
518 }
519 node = _find_node(_stats.global_tree, event->name);
520 if (node)
521 {
522 modify_node_event (node, event);
523 }
524 else
525 {
526 /* add node */
527 node = (stats_node_t *)calloc(1, sizeof(stats_node_t));
528 node->name = (char *)strdup(event->name);
529 node->value = (char *)strdup(event->value);
530 node->flags = event->flags;
531
532 avl_insert(_stats.global_tree, (void *)node);
533 }
534 if ((node->flags & STATS_REGULAR) == 0)
535 stats_listener_send (node->flags, "EVENT global %s %s\n", node->name, node->value);
536 avl_tree_unlock (_stats.global_tree);
537 }
538
539
process_source_stat(stats_source_t * src_stats,stats_event_t * event)540 static void process_source_stat (stats_source_t *src_stats, stats_event_t *event)
541 {
542 if (event->name)
543 {
544 stats_node_t *node = _find_node (src_stats->stats_tree, event->name);
545 if (node == NULL)
546 {
547 /* adding node */
548 if (event->action != STATS_EVENT_REMOVE && event->value)
549 {
550 DEBUG3 ("new node on %s \"%s\" (%s)", src_stats->source, event->name, event->value);
551 node = (stats_node_t *)calloc (1,sizeof(stats_node_t));
552 node->name = (char *)strdup (event->name);
553 node->value = (char *)strdup (event->value);
554 node->flags = event->flags;
555 if (src_stats->flags & STATS_HIDDEN)
556 node->flags |= STATS_HIDDEN;
557 stats_listener_send (node->flags, "EVENT %s %s %s\n", src_stats->source, event->name, event->value);
558 avl_insert (src_stats->stats_tree, (void *)node);
559 }
560 return;
561 }
562 if (event->action == STATS_EVENT_REMOVE)
563 {
564 DEBUG2 ("delete node %s from %s", event->name, src_stats->source);
565 stats_listener_send (node->flags, "DELETE %s %s\n", src_stats->source, event->name);
566 avl_delete (src_stats->stats_tree, (void *)node, _free_stats);
567 return;
568 }
569 modify_node_event (node, event);
570 stats_listener_send (node->flags, "EVENT %s %s %s\n", src_stats->source, node->name, node->value);
571 return;
572 }
573 if (event->action == STATS_EVENT_REMOVE && event->name == NULL)
574 {
575 avl_tree_unlock (src_stats->stats_tree);
576 avl_tree_wlock (_stats.source_tree);
577 avl_tree_wlock (src_stats->stats_tree);
578 avl_delete (_stats.source_tree, (void *)src_stats, _free_source_stats);
579 avl_tree_unlock (_stats.source_tree);
580 return;
581 }
582 /* change source flags status */
583 if (event->action & STATS_EVENT_HIDDEN)
584 {
585 avl_node *node = avl_get_first (src_stats->stats_tree);
586 int visible = 0;
587
588 if ((event->flags&STATS_HIDDEN) == (src_stats->flags&STATS_HIDDEN))
589 return;
590 if (src_stats->flags & STATS_HIDDEN)
591 {
592 stats_node_t *ct = _find_node (src_stats->stats_tree, "server_type");
593 const char *type = "audio/mpeg";
594 if (ct)
595 type = ct->value;
596 src_stats->flags &= ~STATS_HIDDEN;
597 stats_listener_send (src_stats->flags, "NEW %s %s\n", type, src_stats->source);
598 visible = 1;
599 }
600 else
601 {
602 stats_listener_send (src_stats->flags, "DELETE %s\n", src_stats->source);
603 src_stats->flags |= STATS_HIDDEN;
604 }
605 while (node)
606 {
607 stats_node_t *stats = (stats_node_t*)node->key;
608 if (visible)
609 {
610 stats->flags &= ~STATS_HIDDEN;
611 stats_listener_send (stats->flags, "EVENT %s %s %s\n", src_stats->source, stats->name, stats->value);
612 }
613 else
614 stats->flags |= STATS_HIDDEN;
615 node = avl_get_next (node);
616 }
617 }
618 }
619
620
process_source_event(stats_event_t * event)621 static void process_source_event (stats_event_t *event)
622 {
623 stats_source_t *snode;
624
625 avl_tree_wlock (_stats.source_tree);
626 snode = _find_source(_stats.source_tree, event->source);
627 if (snode == NULL)
628 {
629 if (event->action == STATS_EVENT_REMOVE)
630 {
631 avl_tree_unlock (_stats.source_tree);
632 return;
633 }
634 snode = (stats_source_t *)calloc(1,sizeof(stats_source_t));
635 if (snode == NULL)
636 abort();
637 DEBUG1 ("new source stat %s", event->source);
638 snode->source = (char *)strdup(event->source);
639 snode->stats_tree = avl_tree_new(_compare_stats, NULL);
640 snode->flags = STATS_SLAVE|STATS_GENERAL|STATS_HIDDEN;
641
642 avl_insert(_stats.source_tree, (void *)snode);
643 }
644 if (event->action == STATS_EVENT_REMOVE && event->name == NULL)
645 {
646 int fallback_stream = 0;
647 avl_tree_wlock (snode->stats_tree);
648 fallback_stream = _find_node (snode->stats_tree, "fallback") == NULL ? 1 : 0;
649 if (fallback_stream)
650 avl_delete(_stats.source_tree, (void *)snode, _free_source_stats);
651 else
652 avl_tree_unlock (snode->stats_tree);
653 avl_tree_unlock (_stats.source_tree);
654 return;
655 }
656 avl_tree_wlock (snode->stats_tree);
657 avl_tree_unlock (_stats.source_tree);
658 process_source_stat (snode, event);
659 avl_tree_unlock (snode->stats_tree);
660 }
661
662
stats_set_time(stats_handle_t handle,const char * name,int flags,time_t tm)663 void stats_set_time (stats_handle_t handle, const char *name, int flags, time_t tm)
664 {
665 char buffer[100];
666
667 util_get_clf_time (buffer, sizeof (buffer), tm);
668 stats_set_flags (handle, name, buffer, flags);
669 }
670
671
stats_event_time(const char * mount,const char * name,int flags)672 void stats_event_time (const char *mount, const char *name, int flags)
673 {
674 time_t now = time(NULL);
675 char buffer[100];
676
677 util_get_clf_time (buffer, sizeof (buffer), now);
678 stats_event_flags (mount, name, buffer, flags);
679 }
680
681
stats_listeners_send(client_t * client)682 static int stats_listeners_send (client_t *client)
683 {
684 int loop = 12, total = 0;
685 int ret = 0;
686 event_listener_t *listener = client->shared_data;
687
688 if (client->connection.error || global.running != ICE_RUNNING)
689 return -1;
690 if (client->refbuf && client->refbuf->flags & STATS_BLOCK_CONNECTION)
691 loop = 14;
692 else
693 // impose a queue limit of 2Meg if it has been connected for so many seconds, gives
694 // chance for some catchup on large data sets.
695 if (listener->content_len > 2000000 && (client->worker->current_time.tv_sec - client->connection.con_time) > 60)
696 {
697 WARN1 ("dropping stats client, %d in queue", listener->content_len);
698 return -1;
699 }
700 client->schedule_ms = client->worker->time_ms;
701 thread_mutex_lock (&_stats.listeners_lock);
702 while (1)
703 {
704 refbuf_t *refbuf = client->refbuf;
705
706 if (refbuf == NULL)
707 {
708 client->schedule_ms = client->worker->time_ms + 80;
709 break;
710 }
711 if (loop == 0 || total > 50000)
712 {
713 client->schedule_ms = client->worker->time_ms + (total>>11) + 5;
714 break;
715 }
716 ret = format_generic_write_to_client (client);
717 if (ret > 0)
718 {
719 total += ret;
720 }
721 if (client->pos == refbuf->len)
722 {
723 client->refbuf = refbuf->next;
724 listener->content_len -= refbuf->len;
725 refbuf->next = NULL;
726 refbuf_release (refbuf);
727 client->pos = 0;
728 //DEBUG2 ("content is %ld, next %p", listener->content_len, client->refbuf);
729 if (client->refbuf == NULL)
730 {
731 if (listener->content_len)
732 WARN1 ("content length is %u", listener->content_len);
733 listener->recent_block = NULL;
734 client->schedule_ms = client->worker->time_ms + 60;
735 break;
736 }
737 loop--;
738 }
739 else
740 {
741 client->schedule_ms = client->worker->time_ms + (ret > 0 ? 70 : 100);
742 break; /* short write, so stop for now */
743 }
744 }
745 thread_mutex_unlock (&_stats.listeners_lock);
746 if (client->connection.error || global.running != ICE_RUNNING)
747 return -1;
748 return 0;
749 }
750
751
clear_stats_queue(client_t * client)752 static void clear_stats_queue (client_t *client)
753 {
754 refbuf_t *refbuf = client->refbuf;
755 while (refbuf)
756 {
757 refbuf_t *to_go = refbuf;
758 refbuf = to_go->next;
759 if (to_go->_count != 1) DEBUG1 ("odd count for stats %d", to_go->_count);
760 to_go->next = NULL;
761 refbuf_release (to_go);
762 }
763 client->refbuf = NULL;
764 }
765
766
stats_listener_send(int mask,const char * fmt,...)767 static void stats_listener_send (int mask, const char *fmt, ...)
768 {
769 va_list ap;
770 event_listener_t *listener;
771
772 va_start(ap, fmt);
773
774 thread_mutex_lock (&_stats.listeners_lock);
775 listener = _stats.event_listeners;
776
777 while (listener)
778 {
779 int admuser = listener->mask & STATS_HIDDEN,
780 hidden = mask & STATS_HIDDEN,
781 flags = mask & ~STATS_HIDDEN;
782
783 if (admuser || (hidden == 0 && (flags & listener->mask)))
784 _add_stats_to_stats_client (listener->client, fmt, ap);
785 listener = listener->next;
786 }
787 thread_mutex_unlock (&_stats.listeners_lock);
788 va_end(ap);
789 }
790
791
792 /* called after each xml reload */
stats_global(ice_config_t * config)793 void stats_global (ice_config_t *config)
794 {
795 stats_event_flags (NULL, "server_id", config->server_id, STATS_GENERAL);
796 stats_event_flags (NULL, "host", config->hostname, STATS_GENERAL);
797 stats_event (NULL, "location", config->location);
798 stats_event (NULL, "admin", config->admin);
799 global.max_rate = config->max_bandwidth;
800 throttle_sends = 0;
801 }
802
process_event(stats_event_t * event)803 static void process_event (stats_event_t *event)
804 {
805 if (event == NULL)
806 return;
807 /* check if we are dealing with a global or source event */
808 if (event->source == NULL)
809 process_global_event (event);
810 else
811 process_source_event (event);
812 }
813
814
_append_to_bufferv(refbuf_t * refbuf,int max_len,const char * fmt,va_list ap)815 static int _append_to_bufferv (refbuf_t *refbuf, int max_len, const char *fmt, va_list ap)
816 {
817 char *buf = (char*)refbuf->data + refbuf->len;
818 int len = max_len - refbuf->len;
819 int ret;
820 va_list vl;
821
822 va_copy (vl, ap);
823 if (len <= 0)
824 return -1;
825 ret = vsnprintf (buf, len, fmt, vl);
826 if (ret < 0 || ret >= len)
827 return -1;
828 refbuf->len += ret;
829 return ret;
830 }
831
_append_to_buffer(refbuf_t * refbuf,int max_len,const char * fmt,...)832 static int _append_to_buffer (refbuf_t *refbuf, int max_len, const char *fmt, ...)
833 {
834 int ret;
835 va_list va;
836
837 va_start (va, fmt);
838 ret = _append_to_bufferv (refbuf, max_len, fmt, va);
839 va_end(va);
840 if (refbuf->len == 0) // trap for stupid case, report and then ignore it
841 {
842 ERROR1 ("message too big to append, ignoring \"%.25s...\"", refbuf->data);
843 return 0;
844 }
845 return ret;
846 }
847
848
_add_node_to_stats_client(client_t * client,refbuf_t * refbuf)849 static void _add_node_to_stats_client (client_t *client, refbuf_t *refbuf)
850 {
851 if (refbuf->len)
852 {
853 event_listener_t *listener = client->shared_data;
854 //DEBUG2 ("content is %ld, next %p", listener->content_len, refbuf);
855
856 if (listener->recent_block)
857 {
858 listener->recent_block->next = refbuf;
859 listener->recent_block = refbuf;
860 }
861 else
862 {
863 listener->recent_block = refbuf;
864 client->refbuf = refbuf;
865 }
866 listener->content_len += refbuf->len;
867 }
868 }
869
870
_add_stats_to_stats_client(client_t * client,const char * fmt,va_list ap)871 static void _add_stats_to_stats_client (client_t *client, const char *fmt, va_list ap)
872 {
873 event_listener_t *listener = client->shared_data;
874 refbuf_t *r = listener->recent_block;
875 worker_t *worker = client->worker;
876
877 if (worker == NULL) return; // may of left worker
878 if (listener->content_len > 6000000) // max limiter imposed
879 {
880 if (client->connection.error == 0)
881 WARN1 ("Detected large send queue for stats, %s flagged for termination", client->connection.ip);
882 client->connection.error = 1;
883 return;
884 }
885 do
886 {
887 if (r && (r->flags & STATS_BLOCK_CONNECTION) == 0)
888 {
889 /* lets see if we can append to an existing block */
890 if (r->len < 4000)
891 {
892 int written = _append_to_bufferv (r, 4096, fmt, ap);
893 if (written > 0)
894 {
895 listener->content_len += written;
896 break;
897 }
898 }
899 }
900 r = refbuf_new (4096);
901 r->len = 0;
902 if (_append_to_bufferv (r, 4096, fmt, ap) < 0)
903 {
904 WARN1 ("stat details are too large \"%s\"", fmt);
905 refbuf_release (r);
906 return;
907 }
908 _add_node_to_stats_client (client, r);
909 } while (0);
910 }
911
912
_dump_stats_to_doc(xmlNodePtr root,const char * show_mount,int flags)913 static xmlNodePtr _dump_stats_to_doc (xmlNodePtr root, const char *show_mount, int flags)
914 {
915 avl_node *avlnode;
916 xmlNodePtr ret = NULL;
917
918 /* general stats first */
919 avl_tree_rlock (_stats.global_tree);
920 avlnode = avl_get_first(_stats.global_tree);
921 while (avlnode)
922 {
923 stats_node_t *stat = avlnode->key;
924 if (stat->flags & flags)
925 xmlNewTextChild (root, NULL, XMLSTR(stat->name), XMLSTR(stat->value));
926 avlnode = avl_get_next (avlnode);
927 }
928 avl_tree_unlock (_stats.global_tree);
929 /* now per mount stats */
930 avl_tree_rlock (_stats.source_tree);
931 avlnode = avl_get_first(_stats.source_tree);
932 while (avlnode)
933 {
934 stats_source_t *source = (stats_source_t *)avlnode->key;
935 if (((flags&STATS_HIDDEN) || (source->flags&STATS_HIDDEN) == (flags&STATS_HIDDEN)) &&
936 (show_mount == NULL || strcmp (show_mount, source->source) == 0))
937 {
938 avl_node *avlnode2;
939 xmlNodePtr xmlnode = xmlNewTextChild (root, NULL, XMLSTR("source"), NULL);
940 avl_tree_rlock (source->stats_tree);
941 avlnode2 = avl_get_first (source->stats_tree);
942
943 xmlSetProp (xmlnode, XMLSTR("mount"), XMLSTR(source->source));
944 if (ret == NULL)
945 ret = xmlnode;
946 while (avlnode2)
947 {
948 stats_node_t *stat = avlnode2->key;
949 if ((flags&STATS_HIDDEN) || (stat->flags&STATS_HIDDEN) == (flags&STATS_HIDDEN))
950 xmlNewTextChild (xmlnode, NULL, XMLSTR(stat->name), XMLSTR(stat->value));
951 avlnode2 = avl_get_next (avlnode2);
952 }
953 avl_tree_unlock (source->stats_tree);
954 }
955 avlnode = avl_get_next (avlnode);
956 }
957 avl_tree_unlock (_stats.source_tree);
958 return ret;
959 }
960
961
962 /* factoring out code for stats loops
963 * this function copies all stats to queue, and registers
964 */
_register_listener(client_t * client)965 static void _register_listener (client_t *client)
966 {
967 event_listener_t *listener = client->shared_data;
968 avl_node *node;
969 worker_t *worker = client->worker;
970 stats_event_t stats_count;
971 refbuf_t *refbuf, *biglist = NULL, **full_p = &biglist, *last = NULL;
972 size_t size = 8192, len = 0;
973 char buffer[20];
974
975 build_event (&stats_count, NULL, "stats_connections", buffer);
976 stats_count.action = STATS_EVENT_INC;
977 process_event (&stats_count);
978
979 /* we register to receive future events, sources could come in after these initial stats */
980 thread_mutex_lock (&_stats.listeners_lock);
981 listener->next = _stats.event_listeners;
982 _stats.event_listeners = listener;
983 thread_mutex_unlock (&_stats.listeners_lock);
984
985 /* first we fill our initial queue with the headers */
986 refbuf = refbuf_new (size);
987 refbuf->len = 0;
988
989 _append_to_buffer (refbuf, size, "HTTP/1.0 200 OK\r\nCapability: streamlist stats\r\n\r\n");
990
991 /* now the global stats */
992 avl_tree_rlock (_stats.global_tree);
993 node = avl_get_first(_stats.global_tree);
994 while (node)
995 {
996 stats_node_t *stat = node->key;
997
998 if (stat->flags & listener->mask)
999 {
1000 while (_append_to_buffer (refbuf, size, "EVENT global %s %s\n", stat->name, stat->value) < 0)
1001 {
1002 *full_p = last = refbuf;
1003 full_p = &refbuf->next;
1004 len += refbuf->len;
1005 refbuf = refbuf_new (size);
1006 refbuf->len = 0;
1007 }
1008 }
1009 node = avl_get_next(node);
1010 }
1011 avl_tree_unlock (_stats.global_tree);
1012 /* now the stats for each source */
1013 avl_tree_rlock (_stats.source_tree);
1014 node = avl_get_first(_stats.source_tree);
1015 while (node)
1016 {
1017 stats_source_t *snode = (stats_source_t *)node->key;
1018
1019 if (snode->flags & listener->mask)
1020 {
1021 stats_node_t *ct = _find_node (snode->stats_tree, "server_type");
1022 const char *type = "audio/mpeg";
1023 if (ct)
1024 type = ct->value;
1025 while (_append_to_buffer (refbuf, size, "NEW %s %s\n", type, snode->source) < 0)
1026 {
1027 *full_p = last = refbuf;
1028 full_p = &refbuf->next;
1029 len += refbuf->len;
1030 refbuf = refbuf_new (size);
1031 refbuf->len = 0;
1032 }
1033 }
1034 node = avl_get_next(node);
1035 }
1036 while (_append_to_buffer (refbuf, size, "INFO full list end\n") < 0)
1037 {
1038 *full_p = last = refbuf;
1039 full_p = &refbuf->next;
1040 len += refbuf->len;
1041 refbuf = refbuf_new (size);
1042 refbuf->len = 0;
1043 }
1044 node = avl_get_first(_stats.source_tree);
1045 while (node)
1046 {
1047 stats_source_t *snode = (stats_source_t *)node->key;
1048
1049 if (snode->flags & listener->mask)
1050 {
1051 stats_node_t *metadata_stat = NULL;
1052 avl_node *node2;
1053
1054 avl_tree_rlock (snode->stats_tree);
1055 node2 = avl_get_first(snode->stats_tree);
1056 while (node2)
1057 {
1058 stats_node_t *stat = node2->key;
1059 if (stat->flags & listener->mask)
1060 {
1061 if (strcmp (stat->name, "metadata_updated") == 0)
1062 metadata_stat = stat;
1063 else
1064 while (_append_to_buffer (refbuf, size, "EVENT %s %s %s\n", snode->source, stat->name, stat->value) < 0)
1065 {
1066 *full_p = last = refbuf;
1067 full_p = &refbuf->next;
1068 len += refbuf->len;
1069 refbuf = refbuf_new (size);
1070 refbuf->len = 0;
1071 }
1072 }
1073 node2 = avl_get_next (node2);
1074 }
1075 while (metadata_stat &&
1076 _append_to_buffer (refbuf, size, "EVENT %s %s %s\n", snode->source, metadata_stat->name, metadata_stat->value) < 0)
1077 {
1078 *full_p = last = refbuf;
1079 full_p = &refbuf->next;
1080 len += refbuf->len;
1081 refbuf = refbuf_new (size);
1082 refbuf->len = 0;
1083 }
1084 avl_tree_unlock (snode->stats_tree);
1085 }
1086 node = avl_get_next(node);
1087 }
1088 avl_tree_unlock (_stats.source_tree);
1089 if (refbuf->len)
1090 {
1091 *full_p = last = refbuf;
1092 full_p = &refbuf->next;
1093 len += refbuf->len;
1094 }
1095 else
1096 refbuf_release (refbuf); // get rid if empty
1097
1098 /* before we make the client active (for sending queued data), we need to prepend the stats
1099 * we have just built onto any stats that may of come in */
1100 thread_mutex_lock (&_stats.listeners_lock);
1101 *full_p = client->refbuf;
1102 client->refbuf = biglist;
1103 listener->content_len += len;
1104 while (last->next)
1105 last = last->next; // this should not loop typically, but may do
1106 listener->recent_block = last;
1107 thread_mutex_unlock (&_stats.listeners_lock);
1108
1109 client->schedule_ms = 0;
1110 client->flags |= CLIENT_ACTIVE;
1111 worker_wakeup (worker);
1112 }
1113
1114
stats_client_release(client_t * client)1115 static void stats_client_release (client_t *client)
1116 {
1117 event_listener_t *listener = client->shared_data, *match, **trail;
1118 stats_event_t stats_count;
1119 char buffer [20];
1120
1121 if (listener == NULL)
1122 return;
1123 thread_mutex_lock (&_stats.listeners_lock);
1124 match = _stats.event_listeners;
1125 trail = &_stats.event_listeners;
1126
1127 while (match && listener != match)
1128 {
1129 trail = &match->next;
1130 match = *trail;
1131 }
1132 if (match)
1133 *trail = match->next;
1134 else
1135 WARN0 ("odd, no stats client details in collection");
1136 thread_mutex_unlock (&_stats.listeners_lock);
1137
1138 clear_stats_queue (client);
1139 free (listener->source);
1140 free (listener);
1141 client_destroy (client);
1142
1143 build_event (&stats_count, NULL, "stats_connections", buffer);
1144 stats_count.action = STATS_EVENT_DEC;
1145 process_event (&stats_count);
1146 }
1147
1148
1149 struct _client_functions stats_client_send_ops =
1150 {
1151 stats_listeners_send,
1152 stats_client_release
1153 };
1154
stats_add_listener(client_t * client,int mask)1155 void stats_add_listener (client_t *client, int mask)
1156 {
1157 event_listener_t *listener = calloc (1, sizeof (event_listener_t));
1158 listener->mask = mask;
1159
1160 client->respcode = 200;
1161 client->ops = &stats_client_send_ops;
1162 client->shared_data = listener;
1163 client_set_queue (client, NULL);
1164 listener->client = client;
1165
1166 _register_listener (client);
1167 }
1168
1169
stats_transform_xslt(client_t * client,const char * uri)1170 int stats_transform_xslt (client_t *client, const char *uri)
1171 {
1172 xmlDocPtr doc;
1173 char *xslpath = util_get_path_from_normalised_uri (uri, 0);
1174 const char *mount = httpp_get_query_param (client->parser, "mount");
1175 int ret;
1176
1177 if (mount == NULL && client->server_conn->shoutcast_mount && strcmp (uri, "/7.xsl") == 0)
1178 mount = client->server_conn->shoutcast_mount;
1179
1180 doc = stats_get_xml (STATS_PUBLIC, mount);
1181
1182 ret = xslt_transform (doc, xslpath, client);
1183
1184 free (xslpath);
1185 return ret;
1186 }
1187
stats_get_xml(int flags,const char * show_mount)1188 xmlDocPtr stats_get_xml (int flags, const char *show_mount)
1189 {
1190 xmlDocPtr doc;
1191 xmlNodePtr node;
1192
1193 doc = xmlNewDoc (XMLSTR("1.0"));
1194 node = xmlNewDocNode (doc, NULL, XMLSTR("icestats"), NULL);
1195 xmlDocSetRootElement(doc, node);
1196
1197 node = _dump_stats_to_doc (node, show_mount, flags);
1198
1199 if (show_mount && node)
1200 {
1201 source_t *source;
1202 /* show each listener */
1203 avl_tree_rlock (global.source_tree);
1204 source = source_find_mount_raw (show_mount);
1205
1206 if (source)
1207 {
1208 thread_rwlock_rlock (&source->lock);
1209 admin_source_listeners (source, node);
1210 thread_rwlock_unlock (&source->lock);
1211 avl_tree_unlock (global.source_tree);
1212 }
1213 else
1214 {
1215 fbinfo finfo;
1216
1217 avl_tree_unlock (global.source_tree);
1218 finfo.flags = FS_FALLBACK;
1219 finfo.mount = (char*)show_mount;
1220 finfo.limit = 0;
1221 finfo.fallback = NULL;
1222
1223 fserve_list_clients_xml (node, &finfo);
1224 }
1225 }
1226 return doc;
1227 }
1228
_compare_stats(void * arg,void * a,void * b)1229 static int _compare_stats(void *arg, void *a, void *b)
1230 {
1231 stats_node_t *nodea = (stats_node_t *)a;
1232 stats_node_t *nodeb = (stats_node_t *)b;
1233
1234 return strcmp(nodea->name, nodeb->name);
1235 }
1236
_compare_source_stats(void * arg,void * a,void * b)1237 static int _compare_source_stats(void *arg, void *a, void *b)
1238 {
1239 stats_source_t *nodea = (stats_source_t *)a;
1240 stats_source_t *nodeb = (stats_source_t *)b;
1241
1242 return strcmp(nodea->source, nodeb->source);
1243 }
1244
_free_stats(void * key)1245 static int _free_stats(void *key)
1246 {
1247 stats_node_t *node = (stats_node_t *)key;
1248 free(node->value);
1249 free(node->name);
1250 free(node);
1251
1252 return 1;
1253 }
1254
_free_source_stats(void * key)1255 static int _free_source_stats(void *key)
1256 {
1257 stats_source_t *node = (stats_source_t *)key;
1258 stats_listener_send (node->flags, "DELETE %s\n", node->source);
1259 DEBUG1 ("delete source node %s", node->source);
1260 avl_tree_unlock (node->stats_tree);
1261 avl_tree_free(node->stats_tree, _free_stats);
1262 free(node->source);
1263 free(node);
1264
1265 return 1;
1266 }
1267
_free_source_stats_wrapper(void * key)1268 static int _free_source_stats_wrapper (void *key)
1269 {
1270 stats_source_t *node = (stats_source_t *)key;
1271 avl_tree_rlock (node->stats_tree);
1272 _free_source_stats (node);
1273 return 1;
1274 }
1275
1276 /* return a list of blocks which contain lines of text. Each line is a mountpoint
1277 * reference that a slave will use for relaying. The prepend setting is to indicate
1278 * if some something else needs to be added to each line.
1279 */
stats_get_streams(int prepend)1280 refbuf_t *stats_get_streams (int prepend)
1281 {
1282 #define STREAMLIST_BLKSIZE 4096
1283 avl_node *node;
1284 unsigned int remaining = STREAMLIST_BLKSIZE, prelen;
1285 refbuf_t *start = refbuf_new (remaining), *cur = start;
1286 const char *pre = "";
1287 char *buffer = cur->data;
1288
1289 if (prepend)
1290 pre = "/admin/streams?mount=";
1291 prelen = strlen (pre);
1292
1293 /* now the stats for each source */
1294 avl_tree_rlock (_stats.source_tree);
1295 node = avl_get_first(_stats.source_tree);
1296 while (node)
1297 {
1298 int ret;
1299 stats_source_t *source = (stats_source_t *)node->key;
1300
1301 if ((source->flags & STATS_HIDDEN) == 0)
1302 {
1303 if (remaining <= strlen (source->source) + prelen + 3)
1304 {
1305 cur->len = STREAMLIST_BLKSIZE - remaining;
1306 cur->next = refbuf_new (STREAMLIST_BLKSIZE);
1307 remaining = STREAMLIST_BLKSIZE;
1308 cur = cur->next;
1309 buffer = cur->data;
1310 }
1311 ret = snprintf (buffer, remaining, "%s%s\r\n", pre, source->source);
1312 if (ret > 0)
1313 {
1314 buffer += ret;
1315 remaining -= ret;
1316 }
1317 }
1318 node = avl_get_next(node);
1319 }
1320 avl_tree_unlock (_stats.source_tree);
1321 cur->len = STREAMLIST_BLKSIZE - remaining;
1322 return start;
1323 }
1324
1325
1326
1327 /* because we can have stats entries for inactive mountpoints (when there is a fallback)
1328 * then these need to be left on, while others need to be removed
1329 */
stats_purge(time_t mark)1330 void stats_purge (time_t mark)
1331 {
1332 avl_node *snode;
1333
1334 avl_tree_wlock (_stats.source_tree);
1335 snode = avl_get_first(_stats.source_tree);
1336 while (snode)
1337 {
1338 stats_source_t *src = (stats_source_t *)snode->key;
1339
1340 snode = avl_get_next (snode);
1341 if (src->source[0] == '/')
1342 {
1343 if (src->updated < mark)
1344 {
1345 avl_tree_wlock (src->stats_tree);
1346 avl_delete (_stats.source_tree, src, _free_source_stats);
1347 }
1348 continue;
1349 }
1350 if (fserve_contains (src->source) < 0)
1351 {
1352 /* no source_t and no fallback file stat, so delete */
1353 DEBUG1 ("dropping unreferenced stats for %s", src->source);
1354 avl_tree_wlock (src->stats_tree);
1355 avl_delete (_stats.source_tree, src, _free_source_stats);
1356 }
1357 }
1358 avl_tree_unlock (_stats.source_tree);
1359 }
1360
1361
stats_global_calc(time_t now)1362 void stats_global_calc (time_t now)
1363 {
1364 stats_event_t clients, listeners;
1365 avl_node *anode;
1366 char buf1 [VAL_BUFSIZE];
1367 char buf2 [VAL_BUFSIZE];
1368 char buf3 [VAL_BUFSIZE];
1369
1370 global_lock();
1371 connection_stats ();
1372
1373 snprintf (buf1, sizeof(buf1), "%" PRIu64, (int64_t)global.clients);
1374
1375 snprintf (buf2, sizeof(buf2), "%" PRIu64, (int64_t)global.listeners);
1376 snprintf (buf3, sizeof(buf3), "%" PRIu64,
1377 (int64_t)global_getrate_avg (global.out_bitrate) * 8 / 1024);
1378 global_unlock();
1379
1380 build_event (&clients, NULL, "clients", buf1);
1381 clients.flags |= STATS_COUNTERS;
1382 process_event (&clients);
1383 build_event (&listeners, NULL, "listeners", buf2);
1384 listeners.flags |= STATS_COUNTERS;
1385 process_event (&listeners);
1386
1387 avl_tree_wlock (_stats.global_tree);
1388 anode = avl_get_first(_stats.global_tree);
1389 while (anode)
1390 {
1391 stats_node_t *node = (stats_node_t *)anode->key;
1392
1393 if (node->flags & STATS_REGULAR)
1394 {
1395 if (node->last_reported + 9 < now)
1396 {
1397 stats_listener_send (node->flags, "EVENT global %s %s\n", node->name, node->value);
1398 DEBUG2 ("update global %s (%s)", node->name, node->value);
1399 node->last_reported = now;
1400 }
1401 }
1402 anode = avl_get_next (anode);
1403 }
1404 avl_tree_unlock (_stats.global_tree);
1405
1406 build_event (&clients, NULL, "outgoing_kbitrate", buf3);
1407 clients.flags = STATS_COUNTERS|STATS_HIDDEN;
1408 process_event (&clients);
1409 }
1410
1411
stats_handle(const char * mount)1412 stats_handle_t stats_handle (const char *mount)
1413 {
1414 stats_source_t *src_stats;
1415
1416 if (mount == NULL)
1417 return 0;
1418 avl_tree_wlock (_stats.source_tree);
1419 src_stats = _find_source(_stats.source_tree, mount);
1420 if (src_stats == NULL)
1421 {
1422 src_stats = (stats_source_t *)calloc (1, sizeof (stats_source_t));
1423 if (src_stats == NULL)
1424 abort();
1425 DEBUG1 ("new source stat %s", mount);
1426 src_stats->source = (char *)strdup (mount);
1427 src_stats->stats_tree = avl_tree_new (_compare_stats, NULL);
1428 src_stats->flags = STATS_SLAVE|STATS_GENERAL|STATS_HIDDEN;
1429
1430 avl_insert (_stats.source_tree, (void *)src_stats);
1431 }
1432 src_stats->updated = (time_t)(LONG_MAX);
1433 avl_tree_wlock (src_stats->stats_tree);
1434 avl_tree_unlock (_stats.source_tree);
1435
1436 return (stats_handle_t)src_stats;
1437 }
1438
1439
stats_lock(stats_handle_t handle,const char * mount)1440 stats_handle_t stats_lock (stats_handle_t handle, const char *mount)
1441 {
1442 stats_source_t *src_stats = (stats_source_t *)handle;
1443 if (src_stats == NULL)
1444 src_stats = (stats_source_t*)stats_handle (mount);
1445 else
1446 avl_tree_wlock (src_stats->stats_tree);
1447 return (stats_handle_t)src_stats;
1448 }
1449
1450
stats_release(stats_handle_t handle)1451 void stats_release (stats_handle_t handle)
1452 {
1453 stats_source_t *src_stats = (stats_source_t *)handle;
1454 if (src_stats)
1455 avl_tree_unlock (src_stats->stats_tree);
1456 }
1457
1458
1459 // drops stats attached to this handle but don't remove the handle itself
stats_flush(stats_handle_t handle)1460 void stats_flush (stats_handle_t handle)
1461 {
1462 if (handle)
1463 {
1464 stats_source_t *src_stats = (stats_source_t *)handle;
1465 avl_tree *t = src_stats->stats_tree;
1466 avl_node *node;
1467
1468 avl_tree_wlock (src_stats->stats_tree);
1469 while ((node = src_stats->stats_tree->root->right))
1470 {
1471 stats_node_t *stats = (stats_node_t*)node->key;
1472 DEBUG2 ("Removing %s from %s", stats->name, src_stats->source);
1473 avl_delete (t, (void*)stats, _free_stats);
1474 }
1475 stats_listener_send (src_stats->flags, "FLUSH %s\n", src_stats->source);
1476 avl_tree_unlock (src_stats->stats_tree);
1477 }
1478 }
1479
1480
1481 // assume source stats are write locked
stats_set(stats_handle_t handle,const char * name,const char * value)1482 void stats_set (stats_handle_t handle, const char *name, const char *value)
1483 {
1484 if (handle)
1485 {
1486 stats_source_t *src_stats = (stats_source_t *)handle;
1487 stats_event_t event;
1488
1489 build_event (&event, src_stats->source, name, (char *)value);
1490 process_source_stat (src_stats, &event);
1491 }
1492 }
1493
1494
stats_set_inc(stats_handle_t handle,const char * name)1495 void stats_set_inc (stats_handle_t handle, const char *name)
1496 {
1497 if (handle)
1498 {
1499 stats_source_t *src_stats = (stats_source_t *)handle;
1500 stats_event_t event;
1501 char buffer[VAL_BUFSIZE] = "1";
1502
1503 build_event (&event, src_stats->source, name, buffer);
1504 event.action = STATS_EVENT_INC;
1505 process_source_stat (src_stats, &event);
1506 }
1507 }
1508
1509
stats_set_args(stats_handle_t handle,const char * name,const char * format,...)1510 void stats_set_args (stats_handle_t handle, const char *name, const char *format, ...)
1511 {
1512 va_list val;
1513 int ret;
1514 stats_source_t *src_stats = (stats_source_t *)handle;
1515 char buf[1024];
1516
1517 if (name == NULL)
1518 return;
1519 va_start (val, format);
1520 ret = vsnprintf (buf, sizeof (buf), format, val);
1521 va_end (val);
1522
1523 if (ret < 0 || (unsigned int)ret >= sizeof (buf))
1524 {
1525 WARN2 ("problem with formatting %s stat %s",
1526 src_stats == NULL ? "global" : src_stats->source, name);
1527 return;
1528 }
1529 stats_set (handle, name, buf);
1530 }
1531
1532
stats_set_expire(stats_handle_t handle,time_t mark)1533 void stats_set_expire (stats_handle_t handle, time_t mark)
1534 {
1535 stats_source_t *src_stats = (stats_source_t *)handle;
1536
1537 if (src_stats)
1538 src_stats->updated = mark;
1539 }
1540
1541
stats_set_flags(stats_handle_t handle,const char * name,const char * value,int flags)1542 void stats_set_flags (stats_handle_t handle, const char *name, const char *value, int flags)
1543 {
1544 stats_source_t *src_stats = (stats_source_t *)handle;
1545 stats_event_t event;
1546
1547 build_event (&event, src_stats->source, name, value);
1548 event.flags = flags;
1549 if (value)
1550 event.action |= STATS_EVENT_HIDDEN;
1551 else
1552 event.action = STATS_EVENT_HIDDEN;
1553 process_source_stat (src_stats, &event);
1554 }
1555
1556
contains_xml_entity(const char * value)1557 static int contains_xml_entity (const char *value)
1558 {
1559 if (value)
1560 {
1561 const char *p = strchr (value, '&');
1562 char semi = '\0';
1563
1564 if (p && sscanf (p, "&%*9[^; ]%c", &semi) == 1 && semi == ';')
1565 return 1;
1566 }
1567 return 0;
1568 }
1569
1570
stats_set_entity_decode(stats_handle_t handle,const char * name,const char * value)1571 static void stats_set_entity_decode (stats_handle_t handle, const char *name, const char *value)
1572 {
1573 if (contains_xml_entity (value))
1574 {
1575 xmlDocPtr doc = xmlNewDoc(NULL);
1576 xmlNodePtr xmlnode;
1577 xmlNodePtr rootelem = xmlNewNode (NULL, (xmlChar *) "html");
1578 stats_source_t *src_stats = (stats_source_t *)handle;
1579 xmlChar *decoded;
1580 char details[200];
1581
1582 xmlDocSetRootElement (doc, rootelem);
1583
1584 snprintf (details, sizeof details, "mount %s, name %s, value %s :", src_stats->source, name, value);
1585 xmlSetGenericErrorFunc (details, log_parse_failure);
1586 xmlnode = xmlStringGetNodeList (doc, XMLSTR(value));
1587 decoded = xmlNodeListGetString (doc, xmlnode, 1);
1588 stats_set (handle, name, (void*)decoded);
1589 xmlFree (decoded);
1590 xmlFreeNodeList (xmlnode);
1591 xmlFreeDoc (doc);
1592 return;
1593 }
1594 stats_set (handle, name, value);
1595 }
1596
1597
stats_set_conv(stats_handle_t handle,const char * name,const char * value,const char * charset)1598 void stats_set_conv (stats_handle_t handle, const char *name, const char *value, const char *charset)
1599 {
1600 if (charset)
1601 {
1602 xmlCharEncodingHandlerPtr encoding = xmlFindCharEncodingHandler (charset);
1603
1604 if (encoding)
1605 {
1606 xmlBufferPtr in = xmlBufferCreate ();
1607 xmlBufferPtr conv = xmlBufferCreate ();
1608
1609 xmlBufferCCat (in, value);
1610 if (xmlCharEncInFunc (encoding, conv, in) > 0)
1611 stats_set_entity_decode (handle, name, (void*)xmlBufferContent (conv));
1612 xmlBufferFree (in);
1613 xmlBufferFree (conv);
1614 xmlCharEncCloseFunc (encoding);
1615 return;
1616 }
1617 WARN1 ("No charset found for \"%s\"", charset);
1618 return;
1619 }
1620 if (value && xmlCheckUTF8 ((unsigned char *)value) == 0)
1621 {
1622 WARN2 ("seen non-UTF8 data, probably incorrect charcter set (%s, %s)", name, value);
1623 return;
1624 }
1625 stats_set_entity_decode (handle, name, value);
1626 }
1627
1628
stats_listener_to_xml(client_t * listener,xmlNodePtr parent)1629 void stats_listener_to_xml (client_t *listener, xmlNodePtr parent)
1630 {
1631 const char *header;
1632 char buf[30];
1633
1634 xmlNodePtr node = xmlNewChild (parent, NULL, XMLSTR("listener"), NULL);
1635
1636 snprintf (buf, sizeof (buf), "%" PRIu64, listener->connection.id);
1637 xmlSetProp (node, XMLSTR("id"), XMLSTR(buf));
1638 xmlNewChild (node, NULL, XMLSTR("ID"), XMLSTR(buf));
1639
1640 xmlNewChild (node, NULL, XMLSTR("IP"), XMLSTR(listener->connection.ip));
1641
1642 header = httpp_getvar (listener->parser, "user-agent");
1643 if (header && xmlCheckUTF8((unsigned char *)header))
1644 {
1645 xmlChar *str = xmlEncodeEntitiesReentrant (parent->doc, XMLSTR(header));
1646 xmlNewChild (node, NULL, XMLSTR("UserAgent"), str);
1647 xmlFree (str);
1648 }
1649
1650 header = httpp_getvar (listener->parser, "referer");
1651 if (header && xmlCheckUTF8((unsigned char *)header))
1652 {
1653 xmlChar *str = xmlEncodeEntitiesReentrant (parent->doc, XMLSTR(header));
1654 xmlNewChild (node, NULL, XMLSTR("Referer"), str);
1655 xmlFree (str);
1656 }
1657
1658 if ((listener->flags & (CLIENT_ACTIVE|CLIENT_IN_FSERVE)) == CLIENT_ACTIVE)
1659 {
1660 source_t *source = listener->shared_data;
1661 snprintf (buf, sizeof (buf), "%"PRIu64, source->client->queue_pos - listener->queue_pos);
1662 }
1663 else
1664 snprintf (buf, sizeof (buf), "0");
1665 xmlNewChild (node, NULL, XMLSTR("lag"), XMLSTR(buf));
1666
1667 if (listener->worker)
1668 {
1669 snprintf (buf, sizeof (buf), "%lu",
1670 (unsigned long)(listener->worker->current_time.tv_sec - listener->connection.con_time));
1671 xmlNewChild (node, NULL, XMLSTR("Connected"), XMLSTR(buf));
1672 }
1673 if (listener->username)
1674 {
1675 xmlChar *str = xmlEncodeEntitiesReentrant (parent->doc, XMLSTR(listener->username));
1676 xmlNewChild (node, NULL, XMLSTR("username"), str);
1677 xmlFree (str);
1678 }
1679 }
1680
1681