1 #ifdef HAVE_CONFIG_H
2 #include "config.h"
3 #endif
4 #include <ganglia.h> /* for the libgmond messaging */
5 #include <gm_metric.h>
6
7 #include <stdio.h>
8 #include <stdlib.h>
9 #include <unistd.h>
10 #include <syslog.h>
11 #include <math.h>
12 #ifdef SOLARIS
13 #define fabsf(f) ((float)fabs(f))
14 #endif
15 #ifdef _AIX
16 #ifndef _AIX52
17 /* _AIX52 is defined on all versions of AIX >= 5.2
18 fabsf doesn't exist on versions prior to 5.2 */
19 #define fabsf(f) ((float)fabs(f))
20 #endif
21 #endif
22 #ifdef LINUX
23 #include <sys/utsname.h>
24 #endif
25 #include <zlib.h>
26
27 #include <apr.h>
28 #include <apr_strings.h>
29 #include <apr_hash.h>
30 #include <apr_time.h>
31 #include <apr_pools.h>
32 #include <apr_poll.h>
33 #include <apr_network_io.h>
34 #include <apr_signal.h>
35 #include <apr_thread_proc.h>
36 #include <apr_tables.h>
37 #include <apr_dso.h>
38 #include <apr_version.h>
39
40 #ifdef HAVE_LIBPCRE
41 #if defined (HAVE_PCRE_PCRE_H)
42 #include <pcre/pcre.h>
43 #else
44 #include <pcre.h>
45 #endif
46 #endif
47
48 #include "cmdline.h" /* generated by cmdline.sh which runs gengetopt */
49 #include "become_a_nobody.h"
50 #include "apr_net.h" /* our private network functions based on apr */
51 #include "dtd.h" /* the DTD definition for our XML */
52 #include "g25_config.h" /* for converting old file formats to new */
53 #include "update_pidfile.h"
54 #include "gm_scoreboard.h"
55 #include "ganglia_priv.h"
56
57 /* Specifies a single value metric callback */
58 #define CB_NOINDEX -1
59
60 /* If a bind fails, and retry_bind is true, this is the interval to sleep
61 before retry. Specified in seconds */
62 #define RETRY_BIND_DELAY 60
63
64 /* The key in the apr_socket_t struct where our gzipped data is stored */
65 #define GZIP_KEY "gzip"
66
67 /* When this gmond was started */
68 apr_time_t started;
69 /* My name */
70 char myname[APRMAXHOSTLEN+1];
71 /* The commandline options */
72 struct gengetopt_args_info args_info;
73 /* The configuration file */
74 cfg_t *config_file;
75 /* The debug level (in debug_msg.c) */
76 static int debug_level;
77 /* The global context */
78 apr_pool_t *global_context = NULL;
79 /* Deaf mode boolean */
80 int deaf;
81 /* Mute mode boolean */
82 int mute;
83 /* Allow extra data boolean */
84 int allow_extra_data;
85 /* last time we received any data */
86 apr_time_t udp_last_heard;
87 /* Cluster tag boolean */
88 int cluster_tag = 0;
89 /* This host's location */
90 char *host_location = NULL;
91 /* This host name, spoofed */
92 char *override_hostname = NULL;
93 /* This host ip, spoofed */
94 char *override_ip = NULL;
95 /* Tags */
96 char *tags = NULL;
97 /* Boolean. Will this host received gexec requests? */
98 int gexec_on = 0;
99 /* This is tweakable by globals{max_udp_msg_len=...} */
100 int max_udp_message_len = 1472;
101 /* The default configuration for gmond. Found in conf.c. */
102 extern char *default_gmond_configuration;
103 /* The number of seconds to hold "dead" hosts in the hosts hash */
104 int host_dmax = 0;
105 /* The number of seconds to wait for a message before considering it down */
106 int host_tmax = 20;
107 /* The amount of time between cleanups */
108 int cleanup_threshold = 300;
109 /* Time interval before send another metadata packet */
110 int send_metadata_interval = 0;
111 /* The directory where DSO modules are located */
112 char *module_dir = NULL;
113
114 /* The array for outgoing UDP message channels */
115 Ganglia_udp_send_channels udp_send_channels = NULL;
116
117 /* TODO: The array for outgoing TCP message channels (later) */
118 apr_array_header_t *tcp_send_array = NULL;
119
120 enum Ganglia_action_types {
121 GANGLIA_ACCESS_DENY = 0,
122 GANGLIA_ACCESS_ALLOW = 1
123 };
124 typedef enum Ganglia_action_types Ganglia_action_types;
125
126 /* This is the structure used for the access control lists */
127 struct Ganglia_access {
128 apr_ipsubnet_t *ipsub;
129 Ganglia_action_types action;
130 };
131 typedef struct Ganglia_access Ganglia_access;
132
133 struct Ganglia_acl {
134 apr_array_header_t *access_array;
135 Ganglia_action_types default_action;
136 };
137 typedef struct Ganglia_acl Ganglia_acl;
138
139 /* This is the channel definitions */
140 enum Ganglia_channel_types {
141 TCP_ACCEPT_CHANNEL,
142 UDP_RECV_CHANNEL
143 };
144 typedef enum Ganglia_channel_types Ganglia_channel_types;
145
146 struct Ganglia_channel {
147 Ganglia_channel_types type;
148 Ganglia_acl *acl;
149 int timeout;
150 int gzip_output;
151 };
152 typedef struct Ganglia_channel Ganglia_channel;
153
154 /* Two separate pollsets hold the tcp_accept and udp_recv channels */
155 apr_pollset_t *udp_listen_channels = NULL;
156 apr_pollset_t *tcp_listen_channels = NULL;
157
158 /* These are the TCP listen channels */
159 apr_socket_t **tcp_sockets = NULL;
160 /* These are the UDP sockets */
161 apr_socket_t **udp_recv_sockets = NULL;
162
163 /* The hash to hold the hosts (key = host IP) */
164 apr_hash_t *hosts = NULL;
165 apr_thread_mutex_t *hosts_mutex = NULL;
166 /* The "hosts" hash contains values of type "hostdata" */
167
168 #ifdef SFLOW
169 #include "sflow.h"
170 uint16_t sflow_udp_port = SFLOW_IANA_REGISTERED_PORT;
171 #endif
172
173 #include "gmond_internal.h"
174
175 /* This is the structure of the data save to each host->metric hash */
176 struct Ganglia_metadata {
177 /* The pool used for allocating memory */
178 apr_pool_t *pool;
179 /* The name of the metric */
180 char *name;
181 union {
182 /* The ganglia message */
183 Ganglia_metadata_msg f_message;
184 Ganglia_value_msg v_message;
185 } message_u;
186 /* Last heard from */
187 apr_time_t last_heard_from;
188 };
189 typedef struct Ganglia_metadata Ganglia_metadata;
190
191 /* The hash to hold the metrics available on this platform */
192 apr_hash_t *metric_callbacks = NULL;
193
194 /* The "metrics" hash contains values of type "Ganglia_metric_callback" */
195 /* This is where libmetrics meets gmond */
196 struct Ganglia_metric_callback {
197 char *name; /* metric name */
198 float value_threshold;/* the value threshold */
199 char *title; /* Altername metric name or short description */
200 Ganglia_25metric *info;/* the information about this metric */
201 metric_func_void cb; /* callback function (deprecated) */
202 metric_func cbindexed; /* multi-metric callback function */
203 g_val_t now; /* the current value */
204 g_val_t last; /* the last value */
205 Ganglia_value_msg msg; /* the message to send */
206 mmodule *modp; /* dynamic module info struct */
207 int multi_metric_index; /* index identifying which metric is wanted */
208 apr_time_t metadata_last_sent; /* when the metadata was last sent */
209 };
210 typedef struct Ganglia_metric_callback Ganglia_metric_callback;
211
212 /* This is the structure of a collection group */
213 struct Ganglia_collection_group {
214 apr_time_t next_collect; /* When to collect next */
215 apr_time_t next_send; /* When to send next (tmax) */
216 int once;
217 int collect_every;
218 int time_threshold;
219 apr_array_header_t *metric_array;
220 };
221 typedef struct Ganglia_collection_group Ganglia_collection_group;
222
223 /* This is the array of collection groups that we are processing... */
224 apr_array_header_t *collection_groups = NULL;
225
226 mmodule *metric_modules = NULL;
227 extern int daemon_proc; /* defined in error.c */
228
229 char **gmond_argv;
230 extern char **environ;
231
232 /* apr_socket_send can't assure all characters in buf been sent. */
233 static apr_status_t
socket_send_raw(apr_socket_t * sock,const char * buf,apr_size_t * len)234 socket_send_raw(apr_socket_t *sock, const char *buf, apr_size_t *len)
235 {
236 apr_size_t total = *len;
237 apr_size_t thisTime = total;
238 const char* p = buf;
239 apr_status_t ret;
240 for(ret=apr_socket_send(sock, p, &thisTime); ret == APR_SUCCESS;
241 ret=apr_socket_send(sock, p, &thisTime))
242 {
243
244 if(thisTime < total)
245 {
246 total -= thisTime;
247 p += thisTime;
248 thisTime = total;
249 }
250 else
251 break;
252 }
253 return ret;
254 }
255
256 /* wrap socket_send_raw with gzip deflate if enabled. */
257 static apr_status_t
socket_send(apr_socket_t * sock,const char * buf,apr_size_t * len)258 socket_send(apr_socket_t *sock, const char *buf, apr_size_t *len)
259 {
260 char outputbuffer[2048];
261 const int outputlen = sizeof(outputbuffer);
262 apr_size_t wlen;
263 apr_status_t ret;
264 z_stream *strm;
265 int z_ret;
266
267 ret = apr_socket_data_get((void**)&strm, GZIP_KEY, sock);
268 if (ret != APR_SUCCESS)
269 {
270 return ret;
271 }
272
273 if (!strm)
274 {
275 ret = socket_send_raw( sock, buf, len );
276 }
277 else
278 {
279 strm->next_in = (Bytef *)buf;
280 strm->avail_in = *len;
281
282 while( strm->avail_in )
283 {
284 strm->next_out = (Bytef *)outputbuffer;
285 strm->avail_out = outputlen;
286
287 z_ret = deflate( strm, 0 );
288 if (z_ret != Z_OK)
289 {
290 return APR_ENOMEM;
291 }
292
293 wlen = outputlen - strm->avail_out;
294 if( wlen )
295 {
296 ret = socket_send_raw( sock, outputbuffer, &wlen );
297 if(ret != APR_SUCCESS)
298 {
299 return ret;
300 }
301 }
302 }
303 }
304
305 return ret;
306 }
307
308 /* Reload the Ganglia configuration */
309 void
reload_ganglia_configuration(void)310 reload_ganglia_configuration(void)
311 {
312 int i = 0;
313 char *gmond_bin = gmond_argv[0];
314
315 if(udp_listen_channels != NULL)
316 apr_pollset_destroy(udp_listen_channels);
317 if(tcp_listen_channels != NULL)
318 apr_pollset_destroy(tcp_listen_channels);
319 if(tcp_sockets != NULL)
320 for(i = 0; tcp_sockets[i] != 0; i++)
321 apr_socket_close(tcp_sockets[i]);
322 if(udp_recv_sockets != NULL)
323 for(i = 0; udp_recv_sockets[i] != 0; i++)
324 apr_socket_close(udp_recv_sockets[i]);
325 debug_msg("reloading %s", gmond_bin);
326 #ifndef CYGWIN
327 /* To do: over-ride some config opts:
328 - tell new process not to re-daemonize
329 - tell new process not to setuid
330 Any paths in gmond_argv must be absolute or relative to / */
331 execve(gmond_bin, gmond_argv, environ);
332 #else
333 /* Exit and let Windows service manager restart the process, as
334 neither Cygwin nor apr provide a perfect equivalent to execve */
335 exit(EXIT_SUCCESS);
336 #endif
337 err_msg("execve failed to reload %s: %s", gmond_bin, strerror(errno));
338 exit(EXIT_FAILURE);
339 }
340
341 /* this is just a temporary function */
342 void
process_configuration_file(void)343 process_configuration_file(void)
344 {
345 cfg_t *tmp;
346
347 /* this is a global for now */
348 config_file = (cfg_t*)Ganglia_gmond_config_create( args_info.conf_arg, !args_info.conf_given );
349
350 /* Initialize a few variables */
351 tmp = cfg_getsec( config_file, "globals");
352 /* Get the maximum UDP message size */
353 max_udp_message_len = cfg_getint( tmp, "max_udp_msg_len");
354 /* Get the gexec status requested */
355 gexec_on = cfg_getbool(tmp, "gexec");
356 /* Get the host dmax ... */
357 host_dmax = cfg_getint( tmp, "host_dmax");
358 /* Get the host tmax ... */
359 host_tmax = cfg_getint( tmp, "host_tmax");
360 /* Get the cleanup threshold */
361 cleanup_threshold = cfg_getint( tmp, "cleanup_threshold");
362 /* Get the send meta data packet interval */
363 send_metadata_interval = cfg_getint( tmp, "send_metadata_interval");
364 /* Get the DSO module dir */
365 module_dir = cfg_getstr(tmp, "module_dir");
366 /* Acquire spoof name/ip, if they are specified */
367 override_hostname = cfg_getstr(tmp, "override_hostname");
368 override_ip = cfg_getstr(tmp, "override_ip");
369
370 /* Any tags for this host */
371 tags = cfg_getstr(tmp, "tags");
372
373 /* Commandline for debug_level trumps configuration file behaviour ... */
374 if (args_info.debug_given)
375 {
376 debug_level = args_info.debug_arg;
377 }
378 else
379 {
380 debug_level = cfg_getint ( tmp, "debug_level");
381 }
382 set_debug_msg_level(debug_level);
383
384 }
385
386 static void
daemonize_if_necessary(char * argv[])387 daemonize_if_necessary( char *argv[] )
388 {
389 int should_daemonize;
390 cfg_t *tmp;
391 tmp = cfg_getsec( config_file, "globals");
392 should_daemonize = cfg_getbool( tmp, "daemonize");
393
394 /* Daemonize if needed */
395 if(!args_info.foreground_flag && should_daemonize && !debug_level)
396 {
397 char *cwd;
398
399 apr_filepath_get(&cwd, 0, global_context);
400 apr_proc_detach(1);
401 apr_filepath_set(cwd, global_context);
402
403 /* enable errmsg logging to syslog */
404 daemon_proc = 1;
405 openlog (argv[0], LOG_PID, LOG_DAEMON);
406 }
407 }
408
409 static void
setuid_if_necessary(void)410 setuid_if_necessary( void )
411 {
412 cfg_t *tmp;
413 int setuid;
414 #ifndef CYGWIN
415 char *user;
416 #endif
417
418 tmp = cfg_getsec( config_file, "globals");
419 setuid = cfg_getbool( tmp, "setuid" );
420 if(setuid)
421 {
422 #ifdef CYGWIN
423 fprintf(stderr,"Windows does not support setuid.\n");
424 #else
425 user = cfg_getstr(tmp, "user" );
426 become_a_nobody(user);
427 #endif
428 }
429 }
430
431 static void
process_deaf_mute_mode(void)432 process_deaf_mute_mode( void )
433 {
434 cfg_t *tmp = cfg_getsec( config_file, "globals");
435 deaf = cfg_getbool( tmp, "deaf");
436 mute = cfg_getbool( tmp, "mute");
437 if(deaf && mute)
438 {
439 err_msg("Configured to run both deaf and mute. Nothing to do. Exiting.\n");
440 exit(EXIT_FAILURE);
441 }
442 }
443
444 static void
process_allow_extra_data_mode(void)445 process_allow_extra_data_mode( void )
446 {
447 cfg_t *tmp = cfg_getsec( config_file, "globals");
448 allow_extra_data = cfg_getbool( tmp, "allow_extra_data");
449 }
450
451 static Ganglia_acl *
Ganglia_acl_create(cfg_t * channel,apr_pool_t * pool)452 Ganglia_acl_create ( cfg_t *channel, apr_pool_t *pool )
453 {
454 apr_status_t status;
455 Ganglia_acl *acl = NULL;
456 cfg_t *acl_config;
457 char *default_action;
458 int num_access = 0;
459 int i;
460
461 if(!channel || !pool)
462 {
463 return acl;
464 }
465
466
467 acl_config = cfg_getsec(channel, "acl");
468 if(!acl_config)
469 {
470 return acl;
471 }
472
473 /* Find out the number of access entries */
474 num_access = cfg_size( acl_config, "access" );
475 if(!num_access)
476 {
477 return acl;
478 }
479
480 /* Create a new ACL from the pool */
481 acl = apr_pcalloc( pool, sizeof(Ganglia_acl));
482 if(!acl)
483 {
484 err_msg("Unable to allocate memory for ACL. Exiting.\n");
485 exit(EXIT_FAILURE);
486 }
487
488 default_action = cfg_getstr( acl_config, "default");
489 if(!apr_strnatcasecmp( default_action, "deny"))
490 {
491 acl->default_action = GANGLIA_ACCESS_DENY;
492 }
493 else if(!apr_strnatcasecmp( default_action, "allow"))
494 {
495 acl->default_action = GANGLIA_ACCESS_ALLOW;
496 }
497 else
498 {
499 err_msg("Invalid default ACL '%s'. Exiting.\n", default_action);
500 exit(EXIT_FAILURE);
501 }
502
503 /* Create an array to hold each of the access instructions */
504 acl->access_array = apr_array_make( pool, num_access, sizeof(Ganglia_acl *));
505 if(!acl->access_array)
506 {
507 err_msg("Unable to malloc access array. Exiting.\n");
508 exit(EXIT_FAILURE);
509 }
510 for(i=0; i< num_access; i++)
511 {
512 cfg_t *access_config = cfg_getnsec( acl_config, "access", i);
513 Ganglia_access *access = apr_pcalloc( pool, sizeof(Ganglia_access));
514 char *ip, *mask, *action;
515
516 if(!access_config)
517 {
518 /* This shouldn't happen unless maybe acl is empty and
519 * the safest thing to do it exit */
520 err_msg("Unable to process ACLs. Exiting.\n");
521 exit(EXIT_FAILURE);
522 }
523
524 ip = cfg_getstr( access_config, "ip");
525 mask = cfg_getstr( access_config, "mask");
526 action = cfg_getstr( access_config, "action");
527 if(!ip && !mask && !action)
528 {
529 err_msg("An access record requires an ip, mask and action. Exiting.\n");
530 exit(EXIT_FAILURE);
531 }
532
533 /* Process the action first */
534 if(!apr_strnatcasecmp( action, "deny" ))
535 {
536 access->action = GANGLIA_ACCESS_DENY;
537 }
538 else if(!apr_strnatcasecmp( action, "allow"))
539 {
540 access->action = GANGLIA_ACCESS_ALLOW;
541 }
542 else
543 {
544 err_msg("ACL access entry has action '%s'. Must be deny|allow. Exiting.\n", action);
545 exit(EXIT_FAILURE);
546 }
547
548 /* Create the subnet */
549 access->ipsub = NULL;
550 status = apr_ipsubnet_create( &(access->ipsub), ip, mask, pool);
551 if(status != APR_SUCCESS)
552 {
553 err_msg("ACL access entry has invalid ip('%s')/mask('%s'). Exiting.\n", ip, mask);
554 exit(EXIT_FAILURE);
555 }
556
557 /* Save this access entry to the acl */
558 *(Ganglia_access **)apr_array_push( acl->access_array ) = access;
559 }
560 return acl;
561 }
562
563
564 static int
Ganglia_acl_action(Ganglia_acl * acl,apr_sockaddr_t * addr)565 Ganglia_acl_action( Ganglia_acl *acl, apr_sockaddr_t *addr )
566 {
567 int i;
568
569 if(!acl)
570 {
571 /* If no ACL is specified, we assume there is no access control */
572 return GANGLIA_ACCESS_ALLOW;
573 }
574
575 for(i=0; i< acl->access_array->nelts; i++)
576 {
577 Ganglia_access *access = ((Ganglia_access **)(acl->access_array->elts))[i];
578 if(!apr_ipsubnet_test( access->ipsub, addr ))
579 {
580 /* no action will occur because addr is not in this subnet */
581 continue;
582 }
583 else
584 {
585 return access->action;
586 }
587 }
588
589 /* No matches in the access list so we return the default */
590 return acl->default_action;
591 }
592
593 static int32_t
get_sock_family(char * family)594 get_sock_family( char *family )
595 {
596 if( strchr( family, '4' ))
597 {
598 return APR_INET;
599 }
600 else if( strchr( family, '6'))
601 {
602 #if APR_INET6
603 return APR_INET6;
604 #else
605 err_msg("IPv6 is not supported on this host. Exiting.\n");
606 exit(EXIT_FAILURE);
607 #endif
608 }
609
610 err_msg("Unknown family '%s'. Try inet4|inet6. Exiting.\n", family);
611 exit(EXIT_FAILURE);
612 /* shouldn't get here */
613 return APR_UNSPEC;
614 }
615
616 static void
reset_mcast_channels(void)617 reset_mcast_channels( void )
618 {
619 int i;
620 int num_udp_recv_channels = cfg_size( config_file, "udp_recv_channel");
621
622 for(i = 0; i< num_udp_recv_channels; i++)
623 {
624 cfg_t *udp_recv_channel;
625 char *mcast_join, *mcast_if;
626 int port;
627 apr_socket_t *socket = NULL;
628
629 udp_recv_channel = cfg_getnsec( config_file, "udp_recv_channel", i);
630 mcast_join = cfg_getstr( udp_recv_channel, "mcast_join" );
631 mcast_if = cfg_getstr( udp_recv_channel, "mcast_if" );
632 port = cfg_getint( udp_recv_channel, "port");
633
634 if ( mcast_join )
635 {
636 socket = udp_recv_sockets[i];
637 join_mcast(global_context, socket, mcast_join, port, mcast_if);
638 }
639 }
640 }
641
642 static void
setup_listen_channels_pollset(void)643 setup_listen_channels_pollset( void )
644 {
645 apr_status_t status;
646 int i;
647 int num_udp_recv_channels = cfg_size( config_file, "udp_recv_channel");
648 int num_tcp_accept_channels = cfg_size( config_file, "tcp_accept_channel");
649 int total_listen_channels = num_udp_recv_channels + num_tcp_accept_channels;
650 Ganglia_channel *channel;
651 int pollset_opts = 0;
652
653 /* check if gmond was really meant to be deaf */
654 if (total_listen_channels == 0)
655 {
656 deaf = 1;
657 return;
658 }
659
660 /* Create my incoming pollset */
661 #ifdef LINUX
662 struct utsname _name;
663 if(uname(&_name) >= 0) {
664 if(strcmp(_name.release, "2.6") >= 0)
665 pollset_opts = APR_POLLSET_THREADSAFE;
666 }
667 #endif
668 if (num_udp_recv_channels > 0) {
669 if((status = apr_pollset_create(&udp_listen_channels, num_udp_recv_channels, global_context, pollset_opts)) != APR_SUCCESS)
670 {
671 char apr_err[512];
672 apr_strerror(status, apr_err, 511);
673 err_msg("apr_pollset_create failed: %s", apr_err);
674 exit(EXIT_FAILURE);
675 }
676 }
677
678 if (num_tcp_accept_channels > 0) {
679 if((status = apr_pollset_create(&tcp_listen_channels, num_tcp_accept_channels, global_context, pollset_opts)) != APR_SUCCESS)
680 {
681 char apr_err[512];
682 apr_strerror(status, apr_err, 511);
683 err_msg("apr_pollset_create failed: %s", apr_err);
684 exit(EXIT_FAILURE);
685 }
686 }
687
688 if((udp_recv_sockets = (apr_socket_t **)apr_pcalloc(global_context, sizeof(apr_socket_t *) * (num_udp_recv_channels + 1))) == NULL)
689 err_quit("unable to allocate UDP listening sockets");
690
691 /* Process all the udp_recv_channels */
692 for(i = 0; i< num_udp_recv_channels; i++)
693 {
694 cfg_t *udp_recv_channel;
695 char *mcast_join, *mcast_if, *bindaddr, *family;
696 int port, retry_bind, buffer;
697 apr_socket_t *socket = NULL;
698 apr_pollfd_t socket_pollfd;
699 apr_pool_t *pool = NULL;
700 int32_t sock_family = APR_INET;
701 apr_int32_t rx_buf_sz;
702 socklen_t _optlen;
703
704 udp_recv_channel = cfg_getnsec( config_file, "udp_recv_channel", i);
705 mcast_join = cfg_getstr( udp_recv_channel, "mcast_join" );
706 mcast_if = cfg_getstr( udp_recv_channel, "mcast_if" );
707 port = cfg_getint( udp_recv_channel, "port");
708 bindaddr = cfg_getstr( udp_recv_channel, "bind");
709 family = cfg_getstr( udp_recv_channel, "family");
710 retry_bind = cfg_getbool( udp_recv_channel, "retry_bind");
711 buffer = cfg_getint( udp_recv_channel, "buffer");
712
713 debug_msg("udp_recv_channel mcast_join=%s mcast_if=%s port=%d bind=%s buffer=%d",
714 mcast_join? mcast_join:"NULL",
715 mcast_if? mcast_if:"NULL", port,
716 bindaddr? bindaddr: "NULL", buffer);
717
718
719 /* Create a sub-pool for this channel */
720 apr_pool_create(&pool, global_context);
721
722 sock_family = get_sock_family(family);
723
724 if( mcast_join )
725 {
726 /* Listen on the specified multicast channel */
727 socket = create_mcast_server(pool, sock_family, mcast_join, port, bindaddr, mcast_if );
728
729 while(!socket)
730 {
731 if(retry_bind == cfg_false)
732 {
733 err_msg("Error creating multicast server mcast_join=%s port=%d mcast_if=%s family='%s'. Try setting retry_bind. Exiting.\n",
734 mcast_join? mcast_join: "NULL", port, mcast_if? mcast_if:"NULL",family);
735 exit(EXIT_FAILURE);
736 }
737 err_msg("Error creating multicast server mcast_join=%s port=%d mcast_if=%s family='%s'. Will try again...\n",
738 mcast_join? mcast_join: "NULL", port, mcast_if? mcast_if:"NULL",family);
739 apr_sleep(APR_USEC_PER_SEC * RETRY_BIND_DELAY);
740 socket = create_mcast_server(pool, sock_family, mcast_join, port, bindaddr, mcast_if );
741 }
742 }
743 else
744 {
745 /* Create a UDP server */
746 socket = create_udp_server( pool, sock_family, port, bindaddr );
747 while(!socket)
748 {
749 if(retry_bind == cfg_false)
750 {
751 err_msg("Error creating UDP server on port %d bind=%s. Try setting retry_bind. Exiting.\n",
752 port, bindaddr? bindaddr: "unspecified");
753 exit(EXIT_FAILURE);
754 }
755 err_msg("Error creating UDP server on port %d bind=%s. Will try again...\n",
756 port, bindaddr? bindaddr: "unspecified");
757 apr_sleep(APR_USEC_PER_SEC * RETRY_BIND_DELAY);
758 socket = create_udp_server( pool, sock_family, port, bindaddr );
759 }
760 }
761
762 if(buffer)
763 {
764 debug_msg("setting UDP socket receive buffer to: %d\n", (apr_int32_t) buffer);
765 if(apr_socket_opt_set(socket, APR_SO_RCVBUF, (apr_int32_t) buffer) == APR_SUCCESS)
766 {
767 debug_msg("APR reports success setting APR_SO_RCVBUF to: %d\n", (apr_int32_t)buffer );
768
769 /* RB: Let's check if it actually worked to be sure */
770 _optlen = sizeof(rx_buf_sz);
771 if(getsockopt(get_apr_os_socket(socket), SOL_SOCKET, SO_RCVBUF,
772 &rx_buf_sz, &_optlen) == 0)
773 {
774 debug_msg("socket created, SO_RCVBUF = %d\n", rx_buf_sz);
775
776 if(buffer)
777 {
778 /* RB: getsockopt() returns double SO_RCVBUF since kernel reserves overhead space */
779 if(rx_buf_sz!=(buffer*2))
780 {
781 err_msg("Error setting UDP receive buffer for port %d bind=%s to size: %d.\n",
782 port, bindaddr? bindaddr: "unspecified", (apr_int32_t) buffer);
783 err_msg("Reported buffer size by OS: %d : does not match config setting %d.\n",
784 (int) (rx_buf_sz/2), (int) buffer);
785 err_msg("NOTE: only supported on systems that have Apache Portable Runtime library version 0.9.4 or higher.\n");
786 err_msg("Check Operating System (kernel) limits, change or disable buffer size. Exiting.\n");
787 exit(EXIT_FAILURE);
788 }
789 else
790 { /* RB: Eureka */
791 debug_msg("Actual receive buffer size reported by OS matches config setting. Success.");
792 }
793 }
794 }
795 else
796 {
797 err_msg("Unable to verify UDP receive buffer for port %d bind=%s to size: %d. Check Operating System (limits) or change buffer size. Exiting.\n",
798 port, bindaddr? bindaddr: "unspecified", buffer);
799 exit(EXIT_FAILURE);
800 }
801 }
802 else
803 {
804 err_msg("Error setting UDP receive buffer for port %d bind=%s to size: %d. Check Operating System limits or change buffer size. Exiting.\n",
805 port, bindaddr? bindaddr: "unspecified", (apr_int32_t) buffer);
806 err_msg("This is currently only supported on systems that have Apache Portable Runtime library version 0.9.4 or higher.\n");
807 err_msg("Check Operating System (kernel) limits, change or disable buffer size. Exiting.\n");
808 exit(EXIT_FAILURE);
809 }
810 }
811
812 /* Find out about the RX socket buffer
813 This is logged to help people troubleshoot
814 Some users have observed messages about errors when sending
815 or receiving metric packets, and a small buffer size
816 could be an issue */
817 /* RB: Just log this for debugging purposes now */
818 _optlen = sizeof(rx_buf_sz);
819 if(getsockopt(get_apr_os_socket(socket), SOL_SOCKET, SO_RCVBUF,
820 &rx_buf_sz, &_optlen) == 0)
821 {
822 debug_msg("socket created, SO_RCVBUF = %d\n", rx_buf_sz);
823 }
824 else
825 {
826 debug_msg("getsockopt SO_RCVBUF failed\n");
827 }
828
829 /* Build the socket poll file descriptor structure */
830 socket_pollfd.desc_type = APR_POLL_SOCKET;
831 socket_pollfd.reqevents = APR_POLLIN;
832 socket_pollfd.desc.s = socket;
833
834 udp_recv_sockets[i] = socket;
835
836 channel = apr_pcalloc( pool, sizeof(Ganglia_channel));
837 if(!channel)
838 {
839 err_msg("Unable to malloc memory for channel. Exiting. \n");
840 exit(EXIT_FAILURE);
841 }
842
843 /* Mark this channel as a udp_recv_channel */
844 channel->type = UDP_RECV_CHANNEL;
845
846 /* Make sure this socket never blocks */
847 channel->timeout = 0;
848 apr_socket_timeout_set( socket, channel->timeout);
849
850 /* Save the ACL information */
851 channel->acl = Ganglia_acl_create ( udp_recv_channel, pool );
852
853 /* Save the pointer to this socket specific data */
854 socket_pollfd.client_data = channel;
855
856 /* Add the socket to the pollset */
857 status = apr_pollset_add(udp_listen_channels, &socket_pollfd);
858 if(status != APR_SUCCESS)
859 {
860 err_msg("Failed to add socket to pollset. Exiting.\n");
861 exit(EXIT_FAILURE);
862 }
863 }
864
865 if ((tcp_sockets = (apr_socket_t **)apr_pcalloc(global_context, sizeof(apr_socket_t *) * (num_tcp_accept_channels + 1))) == NULL)
866 err_quit("Unable to allocate TCP listening sockets");
867
868 /* Process all the tcp_accept_channels */
869 for(i=0; i< num_tcp_accept_channels; i++)
870 {
871 cfg_t *tcp_accept_channel = cfg_getnsec( config_file, "tcp_accept_channel", i);
872 char *bindaddr, *interface, *family;
873 int port, timeout, gzip_output;
874 apr_socket_t *socket = NULL;
875 apr_pollfd_t socket_pollfd;
876 apr_pool_t *pool = NULL;
877 int32_t sock_family;
878
879 port = cfg_getint( tcp_accept_channel, "port");
880 bindaddr = cfg_getstr( tcp_accept_channel, "bind");
881 interface = cfg_getstr( tcp_accept_channel, "interface");
882 timeout = cfg_getint( tcp_accept_channel, "timeout");
883 family = cfg_getstr( tcp_accept_channel, "family");
884 gzip_output = cfg_getbool( tcp_accept_channel, "gzip_output");
885
886 debug_msg("tcp_accept_channel bind=%s port=%d gzip_output=%d",
887 bindaddr? bindaddr: "NULL", port, gzip_output);
888
889 /* Create a subpool context */
890 apr_pool_create(&pool, global_context);
891
892 sock_family = get_sock_family(family);
893
894 /* Create the socket for the channel, blocking w/timeout */
895 socket = create_tcp_server(pool, sock_family, port, bindaddr,
896 interface, 1, gzip_output);
897 if(!socket)
898 {
899 err_msg("Unable to create tcp_accept_channel. Exiting.\n");
900 exit(EXIT_FAILURE);
901 }
902
903 tcp_sockets[i] = socket;
904
905 /* Build the socket poll file descriptor structure */
906 socket_pollfd.desc_type = APR_POLL_SOCKET;
907 socket_pollfd.reqevents = APR_POLLIN;
908 socket_pollfd.desc.s = socket;
909
910 channel = apr_pcalloc( pool, sizeof(Ganglia_channel));
911 if(!channel)
912 {
913 err_msg("Unable to malloc data for channel. Exiting.\n");
914 exit(EXIT_FAILURE);
915 }
916
917 channel->type = TCP_ACCEPT_CHANNEL;
918
919 /* Save the timeout for this socket */
920 channel->timeout = timeout;
921
922 // Does channel support gzip
923 channel->gzip_output = gzip_output;
924
925 /* Save the ACL information */
926 channel->acl = Ganglia_acl_create( tcp_accept_channel, pool );
927
928 /* Save the pointer to this channel data */
929 socket_pollfd.client_data = channel;
930
931 /* Add the socket to the pollset */
932 status = apr_pollset_add(tcp_listen_channels, &socket_pollfd);
933 if(status != APR_SUCCESS)
934 {
935 err_msg("Failed to add socket to pollset. Exiting.\n");
936 exit(EXIT_FAILURE);
937 }
938 }
939 }
940
sanitize_metric_name(char * metric_name,int is_spoof_msg)941 void sanitize_metric_name(char *metric_name, int is_spoof_msg)
942 {
943 if (metric_name == NULL) return;
944 if (strlen(metric_name) < 1) return;
945 char *p = metric_name;
946 while (p < (metric_name + strlen(metric_name))) {
947 if (
948 !(*p >= 'A' && *p <= 'Z')
949 && !(*p >= 'a' && *p <= 'z')
950 && !(*p >= '0' && *p <= '9')
951 && (*p != '_')
952 && (*p != '-')
953 && (*p != '.')
954 && (*p == ':' && !is_spoof_msg)
955 && (*p != '\0')
956 ) {
957 *p = '_';
958 }
959 p++;
960 }
961 }
962
963
964 static void
get_metric_names(Ganglia_metric_id * metric_id,char ** metricName,char ** realName)965 get_metric_names (Ganglia_metric_id *metric_id, char **metricName, char **realName)
966 {
967 char *firstName=NULL, *secondName=NULL, *buff=NULL;
968 int name_len;
969
970 *metricName = *realName = NULL;
971 firstName = metric_id->name;
972
973 if (metric_id->spoof)
974 {
975 name_len = strlen(firstName);
976 buff = malloc(name_len + 1);
977 strncpy(buff, firstName, name_len + 1);
978 firstName = buff;
979 secondName = strchr(buff + 1, ':');
980 if(secondName)
981 {
982 *secondName = 0;
983 secondName++;
984 }
985 }
986
987 if (firstName) {
988 *metricName = strdup(firstName);
989 if (secondName) {
990 *realName = strdup(secondName);
991 }
992 }
993
994 if (buff)
995 free(buff);
996 return;
997 }
998
999 Ganglia_host *
Ganglia_host_get(char * remIP,apr_sockaddr_t * sa,Ganglia_metric_id * metric_id)1000 Ganglia_host_get( char *remIP, apr_sockaddr_t *sa, Ganglia_metric_id *metric_id)
1001 {
1002 apr_status_t status;
1003 Ganglia_host *hostdata;
1004 apr_pool_t *pool;
1005 char *hostname = NULL;
1006 char *remoteip = remIP;
1007 char *buff = NULL;
1008
1009 if(!remoteip || !sa)
1010 {
1011 return NULL;
1012 }
1013
1014 /* split out the spoofed host name and ip address so that it can
1015 * be used to get the spoofed host. */
1016 if(metric_id && metric_id->spoof)
1017 {
1018 char *spoofName;
1019 char *spoofIP;
1020 int spoof_info_len;
1021
1022 spoof_info_len = strlen(metric_id->host);
1023 buff = malloc(spoof_info_len+1);
1024 strncpy(buff, metric_id->host, spoof_info_len + 1);
1025 spoofIP = buff;
1026 if( !(spoofName = strchr(buff+1,':')) ){
1027 err_msg("Incorrect format for spoof argument. exiting.\n");
1028 if (spoofIP) debug_msg("spoofIP: %s \n",spoofIP);
1029 if (buff) debug_msg("buff: %s \n",buff);
1030 if (buff) free(buff);
1031 return NULL;
1032 }
1033 *spoofName = 0;
1034 spoofName++;
1035 if(!(*spoofName)){
1036 err_msg("Incorrect format for spoof argument. exiting.\n");
1037 if (buff) free(buff);
1038 return NULL;
1039 }
1040 debug_msg(" spoofName: %s spoofIP: %s \n",spoofName,spoofIP);
1041
1042 hostname = spoofName;
1043 remoteip = spoofIP;
1044 }
1045
1046 apr_thread_mutex_lock(hosts_mutex);
1047 hostdata = (Ganglia_host *)apr_hash_get( hosts, remoteip, APR_HASH_KEY_STRING );
1048 apr_thread_mutex_unlock(hosts_mutex);
1049 if(!hostdata)
1050 {
1051 /* Lookup the hostname or use the proxy information if available */
1052 if( !hostname )
1053 {
1054 /* We'll use the resolver to find the hostname */
1055 status = apr_getnameinfo(&hostname, sa, 0);
1056 if(status != APR_SUCCESS)
1057 {
1058 /* If hostname lookup fails.. set it to the ip */
1059 hostname = remoteip;
1060 }
1061 }
1062
1063 /* This is the first time we've heard from this host.. create a new pool */
1064 status = apr_pool_create( &pool, global_context );
1065 if(status != APR_SUCCESS)
1066 {
1067 if (buff) free(buff);
1068 return NULL;
1069 }
1070
1071 /* Malloc the hostdata_t from the new pool */
1072 hostdata = apr_pcalloc( pool, sizeof( Ganglia_host ));
1073 if(!hostdata)
1074 {
1075 if (buff) free(buff);
1076 apr_pool_destroy(pool);
1077 return NULL;
1078 }
1079
1080 /* Save the pool address for later.. freeing this pool free everthing
1081 * for this particular host */
1082 hostdata->pool = pool;
1083
1084 /* Save the hostname */
1085 hostdata->hostname = apr_pstrdup( pool, hostname );
1086
1087 /* Dup the remoteip (it will be freed later) */
1088 hostdata->ip = apr_pstrdup( pool, remoteip);
1089
1090 /* We don't know the location yet */
1091 hostdata->location = NULL;
1092
1093 /* Set the timestamps */
1094 hostdata->first_heard_from = hostdata->last_heard_from = apr_time_now();
1095
1096 /* Create the hostdata mutex */
1097 if (apr_thread_mutex_create(&hostdata->mutex, APR_THREAD_MUTEX_DEFAULT, pool) != APR_SUCCESS)
1098 {
1099 if (buff) free(buff);
1100 apr_pool_destroy(pool);
1101 return NULL;
1102 }
1103
1104 /* Create a hash for the metric data */
1105 hostdata->metrics = apr_hash_make( pool );
1106 if(!hostdata->metrics)
1107 {
1108 if (buff) free(buff);
1109 apr_pool_destroy(pool);
1110 return NULL;
1111 }
1112
1113 /* Create a hash for the gmetric data */
1114 hostdata->gmetrics = apr_hash_make( pool );
1115 if(!hostdata->gmetrics)
1116 {
1117 if (buff) free(buff);
1118 apr_pool_destroy(pool);
1119 return NULL;
1120 }
1121
1122 /* Save this host data to the "hosts" hash */
1123 apr_thread_mutex_lock(hosts_mutex);
1124 apr_hash_set( hosts, hostdata->ip, APR_HASH_KEY_STRING, hostdata);
1125 apr_thread_mutex_unlock(hosts_mutex);
1126 }
1127 else
1128 {
1129 /* We already have this host in our "hosts" hash update timestamp */
1130 hostdata->last_heard_from = apr_time_now();
1131 }
1132
1133 if (buff) free(buff);
1134 return hostdata;
1135 }
1136
1137 void
Ganglia_update_vidals(Ganglia_host * host,Ganglia_value_msg * vmsg)1138 Ganglia_update_vidals( Ganglia_host *host, Ganglia_value_msg *vmsg)
1139 {
1140 char *metricName=NULL, *realName=NULL;
1141
1142 if (!vmsg)
1143 return;
1144
1145 metricName = vmsg->Ganglia_value_msg_u.gstr.metric_id.name;
1146 get_metric_names (&(vmsg->Ganglia_value_msg_u.gstr.metric_id), &metricName, &realName);
1147
1148 if(!strcasecmp("location", metricName))
1149 {
1150 /* We have to manage this memory here because.. returning NULL
1151 * will not cause Ganglia_message_save to be run. Maybe this
1152 * could be done better later i.e should these metrics be
1153 * in the host->metrics list instead of the host structure? */
1154 if(host->location)
1155 {
1156 /* Free old location */
1157 free(host->location);
1158 }
1159 /* Save new location */
1160 host->location = strdup(vmsg->Ganglia_value_msg_u.gstr.str);
1161 debug_msg("Got a location message %s\n", host->location);
1162 /* Processing is finished */
1163 }
1164 else if(!strcasecmp("heartbeat", metricName))
1165 {
1166 /* nothing more needs to be done. we handled the timestamps above. */
1167 host->gmond_started = vmsg->Ganglia_value_msg_u.gu_int.ui;
1168 debug_msg("Got a heartbeat message %d\n", host->gmond_started);
1169 /* Processing is finished */
1170 }
1171 else if(vmsg->Ganglia_value_msg_u.gstr.metric_id.spoof)
1172 {
1173 /* nothing more needs to be done. we handled the timestamps above. */
1174 debug_msg("Got a spoof message %s from %s\n", vmsg->Ganglia_value_msg_u.gstr.metric_id.name,
1175 vmsg->Ganglia_value_msg_u.gstr.metric_id.host);
1176 /* Processing is finished */
1177 }
1178
1179 if (metricName) free(metricName);
1180 if (realName) free(realName);
1181 return;
1182 }
1183
1184 void
Ganglia_metadata_check(Ganglia_host * host,Ganglia_value_msg * vmsg)1185 Ganglia_metadata_check(Ganglia_host *host, Ganglia_value_msg *vmsg )
1186 {
1187 char *metric_name = vmsg->Ganglia_value_msg_u.gstr.metric_id.name;
1188 int is_spoof_msg = vmsg->Ganglia_value_msg_u.gstr.metric_id.spoof;
1189 Ganglia_metadata *metric;
1190
1191 apr_thread_mutex_lock(host->mutex);
1192 metric = (Ganglia_metadata *)apr_hash_get(host->metrics, metric_name, APR_HASH_KEY_STRING);
1193 apr_thread_mutex_unlock(host->mutex);
1194
1195 if(!metric)
1196 {
1197 int len;
1198 XDR x;
1199 char msgbuf[GANGLIA_MAX_MESSAGE_LEN];
1200 char hostbuf[512];
1201 Ganglia_metadata_msg msg;
1202
1203 msg.id = gmetadata_request;
1204 if (is_spoof_msg)
1205 apr_snprintf(hostbuf, 512, "%s:%s", host->ip, host->hostname);
1206 else
1207 apr_snprintf(hostbuf, 512, "%s", host->hostname);
1208 msg.Ganglia_metadata_msg_u.grequest.metric_id.host = hostbuf;
1209 msg.Ganglia_metadata_msg_u.grequest.metric_id.name = metric_name;
1210 msg.Ganglia_metadata_msg_u.grequest.metric_id.spoof = is_spoof_msg;
1211
1212 debug_msg("sending metadata request flag for metric: %s host: %s", metric_name, host->hostname);
1213 ganglia_scoreboard_inc(PKTS_SENT_REQUEST);
1214 ganglia_scoreboard_inc(PKTS_SENT_ALL);
1215
1216 /* Send the message */
1217 xdrmem_create(&x, msgbuf, GANGLIA_MAX_MESSAGE_LEN, XDR_ENCODE);
1218 if(!xdr_Ganglia_metadata_msg(&x, &msg))
1219 {
1220 return;
1221 }
1222 len = xdr_getpos(&x);
1223 /* Send the encoded data along...*/
1224 Ganglia_udp_send_message( udp_send_channels, msgbuf, len);
1225 }
1226
1227 return;
1228 }
1229
1230 #if 0
1231 static void
1232 Ganglia_metadata_free( Ganglia_metadata *metric )
1233 {
1234 if(!metric)
1235 return;
1236 apr_pool_destroy( metric->pool );
1237 }
1238 #endif
1239
1240 void
Ganglia_metadata_save(Ganglia_host * host,Ganglia_metadata_msg * message)1241 Ganglia_metadata_save( Ganglia_host *host, Ganglia_metadata_msg *message )
1242 {
1243 Ganglia_metadata *metric;
1244
1245 if(!host || !message)
1246 return;
1247
1248 /* Search for the Ganglia_metadata in the Ganglia_host */
1249 sanitize_metric_name(message->Ganglia_metadata_msg_u.gfull.metric_id.name, message->Ganglia_metadata_msg_u.gfull.metric_id.spoof);
1250
1251 apr_thread_mutex_lock(host->mutex);
1252 metric = (Ganglia_metadata *)apr_hash_get(host->metrics,
1253 message->Ganglia_metadata_msg_u.gfull.metric_id.name,
1254 APR_HASH_KEY_STRING);
1255 apr_thread_mutex_unlock(host->mutex);
1256
1257 if(metric)
1258 {
1259 apr_pool_clear(metric->pool);
1260 }
1261 else
1262 {
1263 apr_status_t status;
1264
1265 /* This is a new metric sent from this host... allocate space for this data */
1266
1267 /* Allocate a new metric from this context */
1268 metric = apr_pcalloc(host->pool, sizeof(Ganglia_metadata));
1269 if(!metric)
1270 return;
1271
1272 /* Create the context for this metric */
1273 status = apr_pool_create(&(metric->pool), host->pool);
1274 if(status != APR_SUCCESS)
1275 return;
1276
1277 debug_msg("***Allocating metadata packet for host--%s-- and metric --%s-- ****\n", host->hostname, message->Ganglia_metadata_msg_u.gfull.metric_id.name);
1278 }
1279
1280 if(metric)
1281 {
1282 Ganglia_metadata_msg *fmessage = &(metric->message_u.f_message);
1283 u_int i,mlen = message->Ganglia_metadata_msg_u.gfull.metric.metadata.metadata_len;
1284
1285 metric->name = apr_pstrdup(metric->pool, message->Ganglia_metadata_msg_u.gfull.metric_id.name);
1286 fmessage->id = message->id;
1287 fmessage->Ganglia_metadata_msg_u.gfull.metric_id.host =
1288 apr_pstrdup(metric->pool, message->Ganglia_metadata_msg_u.gfull.metric_id.host);
1289 fmessage->Ganglia_metadata_msg_u.gfull.metric_id.name =
1290 apr_pstrdup(metric->pool, message->Ganglia_metadata_msg_u.gfull.metric_id.name);
1291 fmessage->Ganglia_metadata_msg_u.gfull.metric_id.spoof =
1292 message->Ganglia_metadata_msg_u.gfull.metric_id.spoof;
1293 fmessage->Ganglia_metadata_msg_u.gfull.metric.type =
1294 apr_pstrdup(metric->pool, message->Ganglia_metadata_msg_u.gfull.metric.type);
1295 fmessage->Ganglia_metadata_msg_u.gfull.metric.name =
1296 apr_pstrdup(metric->pool, message->Ganglia_metadata_msg_u.gfull.metric.name);
1297 fmessage->Ganglia_metadata_msg_u.gfull.metric.units =
1298 apr_pstrdup(metric->pool, message->Ganglia_metadata_msg_u.gfull.metric.units);
1299 fmessage->Ganglia_metadata_msg_u.gfull.metric.slope =
1300 message->Ganglia_metadata_msg_u.gfull.metric.slope;
1301 fmessage->Ganglia_metadata_msg_u.gfull.metric.tmax =
1302 message->Ganglia_metadata_msg_u.gfull.metric.tmax;
1303 fmessage->Ganglia_metadata_msg_u.gfull.metric.dmax =
1304 message->Ganglia_metadata_msg_u.gfull.metric.dmax;
1305 fmessage->Ganglia_metadata_msg_u.gfull.metric.metadata.metadata_len = mlen;
1306
1307 fmessage->Ganglia_metadata_msg_u.gfull.metric.metadata.metadata_val =
1308 apr_pcalloc(metric->pool, sizeof(Ganglia_extra_data)*mlen);
1309 for (i = 0; i < mlen; i++)
1310 {
1311 fmessage->Ganglia_metadata_msg_u.gfull.metric.metadata.metadata_val[i].name =
1312 apr_pstrdup(metric->pool,
1313 message->Ganglia_metadata_msg_u.gfull.metric.metadata.metadata_val[i].name);
1314 fmessage->Ganglia_metadata_msg_u.gfull.metric.metadata.metadata_val[i].data =
1315 apr_pstrdup(metric->pool,
1316 message->Ganglia_metadata_msg_u.gfull.metric.metadata.metadata_val[i].data);
1317 }
1318
1319 metric->last_heard_from = apr_time_now();
1320
1321 /* Save the full metric */
1322 apr_thread_mutex_lock(host->mutex);
1323 apr_hash_set(host->metrics, metric->name, APR_HASH_KEY_STRING, metric);
1324 apr_thread_mutex_unlock(host->mutex);
1325 debug_msg("saving metadata for metric: %s host: %s", metric->name, host->hostname);
1326 }
1327 }
1328
1329 static void
Ganglia_metadata_request(Ganglia_host * host,Ganglia_metadata_msg * message)1330 Ganglia_metadata_request( Ganglia_host *host, Ganglia_metadata_msg *message )
1331 {
1332 char *name = message->Ganglia_metadata_msg_u.grequest.metric_id.name;
1333 Ganglia_metric_callback *metric_cb;
1334 int is_spoof_msg = message->Ganglia_metadata_msg_u.grequest.metric_id.spoof;
1335 char srch_name[512];
1336
1337 if(is_spoof_msg)
1338 {
1339 apr_snprintf(srch_name, 512, "%s:%s", name, host->hostname);
1340 metric_cb = (Ganglia_metric_callback *)apr_hash_get( metric_callbacks, srch_name, APR_HASH_KEY_STRING );
1341 }
1342 else
1343 {
1344 metric_cb = (Ganglia_metric_callback *)apr_hash_get( metric_callbacks, name, APR_HASH_KEY_STRING );
1345 }
1346
1347 if(!host || !message)
1348 return;
1349
1350 if (metric_cb)
1351 metric_cb->metadata_last_sent = 0;
1352 debug_msg("setting metadata request flag for metric: %s host: %s", name, host->hostname);
1353 }
1354
1355 void
Ganglia_value_save(Ganglia_host * host,Ganglia_value_msg * message)1356 Ganglia_value_save( Ganglia_host *host, Ganglia_value_msg *message )
1357 {
1358 Ganglia_metadata *metric;
1359
1360 if(!host || !message)
1361 return;
1362
1363 /* Search for the Ganglia_metric in the Ganglia_host */
1364 apr_thread_mutex_lock(host->mutex);
1365 metric = (Ganglia_metadata *)apr_hash_get(host->gmetrics,
1366 message->Ganglia_value_msg_u.gstr.metric_id.name,
1367 APR_HASH_KEY_STRING);
1368 apr_thread_mutex_unlock(host->mutex);
1369
1370 if(metric)
1371 {
1372 apr_pool_clear(metric->pool);
1373 }
1374 else
1375 {
1376 apr_status_t status;
1377
1378 /* This is a new metric sent from this host... allocate space for this data */
1379
1380 /* Allocate a new metric from this context */
1381 metric = apr_pcalloc(host->pool, sizeof(Ganglia_metadata));
1382 if(!metric)
1383 {
1384 /* no memory */
1385 return;
1386 }
1387
1388 /* Create the context for this metric */
1389 status = apr_pool_create(&(metric->pool), host->pool);
1390 if(status != APR_SUCCESS)
1391 return;
1392
1393 debug_msg("***Allocating value packet for host--%s-- and metric --%s-- ****\n", message->Ganglia_value_msg_u.gstr.metric_id.host, message->Ganglia_value_msg_u.gstr.metric_id.name );
1394 }
1395
1396
1397 if(metric)
1398 {
1399 Ganglia_value_msg *vmessage = &(metric->message_u.v_message);
1400
1401 metric->name = apr_pstrdup(metric->pool, message->Ganglia_value_msg_u.gstr.metric_id.name );
1402 vmessage->id = message->id;
1403 vmessage->Ganglia_value_msg_u.gstr.metric_id.host =
1404 apr_pstrdup(metric->pool, message->Ganglia_value_msg_u.gstr.metric_id.host);
1405 vmessage->Ganglia_value_msg_u.gstr.metric_id.name =
1406 apr_pstrdup(metric->pool, message->Ganglia_value_msg_u.gstr.metric_id.name);
1407 vmessage->Ganglia_value_msg_u.gstr.metric_id.spoof =
1408 message->Ganglia_value_msg_u.gstr.metric_id.spoof;
1409 vmessage->Ganglia_value_msg_u.gstr.fmt =
1410 apr_pstrdup(metric->pool, message->Ganglia_value_msg_u.gstr.fmt);
1411
1412 switch(message->id)
1413 {
1414 case gmetric_string:
1415 vmessage->Ganglia_value_msg_u.gstr.str =
1416 apr_pstrdup(metric->pool, message->Ganglia_value_msg_u.gstr.str);
1417 break;
1418 case gmetric_ushort:
1419 vmessage->Ganglia_value_msg_u.gu_short.us =
1420 message->Ganglia_value_msg_u.gu_short.us;
1421 break;
1422 case gmetric_short:
1423 vmessage->Ganglia_value_msg_u.gs_short.ss =
1424 message->Ganglia_value_msg_u.gs_short.ss;
1425 break;
1426 case gmetric_uint:
1427 vmessage->Ganglia_value_msg_u.gu_int.ui =
1428 message->Ganglia_value_msg_u.gu_int.ui;
1429 break;
1430 case gmetric_int:
1431 vmessage->Ganglia_value_msg_u.gs_int.si =
1432 message->Ganglia_value_msg_u.gs_int.si;
1433 break;
1434 case gmetric_float:
1435 vmessage->Ganglia_value_msg_u.gf.f =
1436 message->Ganglia_value_msg_u.gf.f;
1437 break;
1438 case gmetric_double:
1439 vmessage->Ganglia_value_msg_u.gd.d =
1440 message->Ganglia_value_msg_u.gd.d;
1441 break;
1442 default:
1443 break;
1444 }
1445
1446 metric->last_heard_from = apr_time_now();
1447
1448 /* Save the last update metric */
1449 apr_thread_mutex_lock(host->mutex);
1450 apr_hash_set(host->gmetrics, metric->name, APR_HASH_KEY_STRING, metric);
1451 apr_thread_mutex_unlock(host->mutex);
1452 }
1453 }
1454
1455 static void
process_udp_recv_channel(const apr_pollfd_t * desc,apr_time_t now)1456 process_udp_recv_channel(const apr_pollfd_t *desc, apr_time_t now)
1457 {
1458 apr_status_t status;
1459 apr_socket_t *socket;
1460 apr_sockaddr_t *remotesa = NULL;
1461 #ifdef SFLOW
1462 uint16_t localport;
1463 char *errorMsg = NULL;
1464 #endif
1465 char remoteip[256];
1466 char buf[max_udp_message_len];
1467 apr_size_t len = max_udp_message_len;
1468 Ganglia_channel *channel;
1469 XDR x;
1470 Ganglia_metadata_msg fmsg;
1471 Ganglia_value_msg vmsg;
1472 Ganglia_host *hostdata = NULL;
1473 apr_pool_t *p = NULL;
1474 Ganglia_msg_formats id;
1475 bool_t ret;
1476
1477 socket = desc->desc.s;
1478 /* We could also use the apr_socket_data_get/set() functions
1479 * to have per socket user data .. see APR docs */
1480 channel = desc->client_data;
1481
1482 /* We need to create a copy of the local sockaddr so that the
1483 recvfrom call has a place holder to put the remote information.
1484 Getting the remote sockaddr might not work since a SOCK_DGRAM
1485 type socket is connectionless. */
1486 apr_pool_create(&p, global_context);
1487 status = apr_socket_addr_get(&remotesa, APR_LOCAL, socket);
1488 #ifdef SFLOW
1489 /* remember this before it gets overwritten */
1490 localport = remotesa->port;
1491 #endif
1492 status = apr_sockaddr_info_get(&remotesa, NULL, remotesa->family, remotesa->port, 0, p);
1493
1494 /* Grab the data */
1495 status = apr_socket_recvfrom(remotesa, socket, 0, buf, &len);
1496 if(status != APR_SUCCESS)
1497 {
1498 apr_pool_destroy(p);
1499 return;
1500 }
1501
1502 /* This function is in ./lib/apr_net.c and not APR. The
1503 * APR counterpart is apr_sockaddr_ip_get() but we don't
1504 * want to malloc memory evertime we call this */
1505 apr_sockaddr_ip_buffer_get(remoteip, 256, remotesa);
1506
1507 /* Check the ACL */
1508 if(Ganglia_acl_action( channel->acl, remotesa) != GANGLIA_ACCESS_ALLOW)
1509 {
1510 apr_pool_destroy(p);
1511 return;
1512 }
1513
1514 ganglia_scoreboard_inc(PKTS_RECVD_ALL);
1515
1516 #ifdef SFLOW
1517 if(localport == sflow_udp_port) {
1518 if(process_sflow_datagram(remotesa, buf, len, now, &errorMsg)) {
1519 ganglia_scoreboard_inc(PKTS_RECVD_VALUE);
1520 }
1521 else {
1522 if(errorMsg) {
1523 debug_msg("sFlow error: %s", errorMsg);
1524 }
1525 ganglia_scoreboard_inc(PKTS_RECVD_FAILED);
1526 }
1527 apr_pool_destroy(p);
1528 return;
1529 }
1530 #endif
1531
1532 /* Create the XDR receive stream */
1533 xdrmem_create(&x, buf, max_udp_message_len, XDR_DECODE);
1534
1535 /* Flush the data... */
1536 memset( &fmsg, 0, sizeof(Ganglia_metadata_msg));
1537 memset( &vmsg, 0, sizeof(Ganglia_value_msg));
1538
1539 /* Figure out what kind of message it we got */
1540 xdr_Ganglia_msg_formats(&x, &id);
1541 xdr_setpos (&x, 0);
1542
1543 /* Read the gangliaMessage from the stream */
1544 /* Save the message from this particular host */
1545 switch (id)
1546 {
1547 case gmetadata_request:
1548 ganglia_scoreboard_inc(PKTS_RECVD_REQUEST);
1549 ret = xdr_Ganglia_metadata_msg(&x, &fmsg);
1550 if (ret)
1551 hostdata = Ganglia_host_get(remoteip, remotesa, &(fmsg.Ganglia_metadata_msg_u.grequest.metric_id));
1552 sanitize_metric_name(fmsg.Ganglia_metadata_msg_u.grequest.metric_id.name, fmsg.Ganglia_metadata_msg_u.grequest.metric_id.spoof);
1553 if(!ret || !hostdata)
1554 {
1555 ganglia_scoreboard_inc(PKTS_RECVD_FAILED);
1556 /* Processing of this message is finished ... */
1557 xdr_free((xdrproc_t)xdr_Ganglia_metadata_msg, (char *)&fmsg);
1558 break;
1559 }
1560 debug_msg("Processing a metric metadata request message from %s", hostdata->hostname);
1561 Ganglia_metadata_request(hostdata, &fmsg);
1562 xdr_free((xdrproc_t)xdr_Ganglia_metadata_msg, (char *)&fmsg);
1563 break;
1564 case gmetadata_full:
1565 ganglia_scoreboard_inc(PKTS_RECVD_METADATA);
1566 ret = xdr_Ganglia_metadata_msg(&x, &fmsg);
1567 if (ret)
1568 hostdata = Ganglia_host_get(remoteip, remotesa, &(fmsg.Ganglia_metadata_msg_u.gfull.metric_id));
1569 sanitize_metric_name(fmsg.Ganglia_metadata_msg_u.gfull.metric_id.name, fmsg.Ganglia_metadata_msg_u.gfull.metric_id.spoof);
1570 if(!ret || !hostdata)
1571 {
1572 ganglia_scoreboard_inc(PKTS_RECVD_FAILED);
1573 /* Processing of this message is finished ... */
1574 xdr_free((xdrproc_t)xdr_Ganglia_metadata_msg, (char *)&fmsg);
1575 break;
1576 }
1577 debug_msg("Processing a metric metadata message from %s", hostdata->hostname);
1578 Ganglia_metadata_save( hostdata, &fmsg );
1579 xdr_free((xdrproc_t)xdr_Ganglia_metadata_msg, (char *)&fmsg);
1580 break;
1581 case gmetric_ushort:
1582 case gmetric_short:
1583 case gmetric_int:
1584 case gmetric_uint:
1585 case gmetric_string:
1586 case gmetric_float:
1587 case gmetric_double:
1588 ganglia_scoreboard_inc(PKTS_RECVD_VALUE);
1589 ret = xdr_Ganglia_value_msg(&x, &vmsg);
1590 if (ret)
1591 hostdata = Ganglia_host_get(remoteip, remotesa, &(vmsg.Ganglia_value_msg_u.gstr.metric_id));
1592 sanitize_metric_name(vmsg.Ganglia_value_msg_u.gstr.metric_id.name, vmsg.Ganglia_value_msg_u.gstr.metric_id.spoof);
1593 if(!ret || !hostdata)
1594 {
1595 ganglia_scoreboard_inc(PKTS_RECVD_FAILED);
1596 /* Processing of this message is finished ... */
1597 xdr_free((xdrproc_t)xdr_Ganglia_value_msg, (char *)&vmsg);
1598 break;
1599 }
1600 debug_msg("Processing a metric value message from %s", hostdata->hostname);
1601 Ganglia_value_save(hostdata, &vmsg);
1602 Ganglia_update_vidals(hostdata, &vmsg);
1603 Ganglia_metadata_check(hostdata, &vmsg);
1604 xdr_free((xdrproc_t)xdr_Ganglia_value_msg, (char *)&vmsg);
1605 break;
1606 default:
1607 ganglia_scoreboard_inc(PKTS_RECVD_IGNORED);
1608 break;
1609 }
1610
1611 apr_pool_destroy(p);
1612
1613 return;
1614 }
1615
1616 static z_stream *
zstream_new()1617 zstream_new()
1618 {
1619 int err;
1620
1621 z_stream *strm = calloc(1, sizeof(z_stream));
1622 if (strm == 0)
1623 {
1624 return NULL;
1625 }
1626
1627 /* Yes, 15 + 16 are 2 special magic values documented in zlib.h */
1628 err = deflateInit2(strm, Z_DEFAULT_COMPRESSION, Z_DEFLATED, 15 + 16, 8, Z_DEFAULT_STRATEGY);
1629 if (err != Z_OK)
1630 {
1631 free( strm );
1632 return NULL;
1633 }
1634
1635 return strm;
1636 }
1637
1638 static apr_status_t
socket_flush(apr_socket_t * client,int gzip_output)1639 socket_flush( apr_socket_t *client, int gzip_output )
1640 {
1641 char outputbuffer[2048];
1642 const int outputlen = sizeof(outputbuffer);
1643 int ret;
1644 int status;
1645 apr_size_t wlen;
1646 z_stream *strm;
1647
1648 if ( !gzip_output )
1649 {
1650 return APR_SUCCESS;
1651 }
1652
1653 if (APR_SUCCESS == apr_socket_data_get((void**)&strm, GZIP_KEY, client))
1654 {
1655 while( 1 )
1656 {
1657 strm->next_out = (Bytef *)outputbuffer;
1658 strm->avail_out = outputlen;
1659
1660 ret = deflate( strm, Z_FINISH );
1661 if (ret != Z_OK && ret != Z_STREAM_END)
1662 {
1663 return APR_ENOMEM;
1664 }
1665
1666 wlen = outputlen - strm->avail_out;
1667 status = socket_send_raw( client, outputbuffer, &wlen );
1668 if(status != APR_SUCCESS)
1669 return status;
1670
1671 if(ret == Z_STREAM_END)
1672 return APR_SUCCESS;
1673 }
1674 }
1675 return APR_SUCCESS;
1676 }
1677
1678 static void
zstream_destroy(z_stream * strm)1679 zstream_destroy( z_stream *strm )
1680 {
1681 if (strm)
1682 {
1683 deflateEnd(strm);
1684 free (strm);
1685 }
1686 }
1687
1688 static apr_status_t
print_xml_header(apr_socket_t * client)1689 print_xml_header( apr_socket_t *client )
1690 {
1691 apr_status_t status;
1692 apr_size_t len = strlen(DTD);
1693 char gangliaxml[128];
1694 char clusterxml[1024];
1695 static int clusterinit = 0;
1696 static char *name = NULL;
1697 static char *owner = NULL;
1698 static char *latlong = NULL;
1699 static char *url = NULL;
1700 apr_time_t now = apr_time_now();
1701
1702 status = socket_send( client, DTD, &len );
1703 if(status != APR_SUCCESS)
1704 return status;
1705
1706 len = apr_snprintf( gangliaxml, 128, "<GANGLIA_XML VERSION=\"%s\" SOURCE=\"gmond\">\n",
1707 VERSION);
1708 status = socket_send( client, gangliaxml, &len);
1709 if(status != APR_SUCCESS)
1710 return status;
1711
1712 if(!clusterinit)
1713 {
1714 /* We only run this on the first connection we process */
1715 cfg_t *cluster = cfg_getsec(config_file, "cluster");
1716 if(cluster)
1717 {
1718 name = cfg_getstr( cluster, "name" );
1719 owner = cfg_getstr( cluster, "owner" );
1720 latlong = cfg_getstr( cluster, "latlong" );
1721 url = cfg_getstr( cluster, "url" );
1722 if(name || owner || latlong || url)
1723 {
1724 cluster_tag =1;
1725 }
1726 }
1727 clusterinit = 1;
1728 }
1729
1730 if(cluster_tag)
1731 {
1732 len = apr_snprintf( clusterxml, 1024,
1733 "<CLUSTER NAME=\"%s\" LOCALTIME=\"%d\" OWNER=\"%s\" LATLONG=\"%s\" URL=\"%s\">\n",
1734 name?name:"unspecified",
1735 (int)(now / APR_USEC_PER_SEC),
1736 owner?owner:"unspecified",
1737 latlong?latlong:"unspecified",
1738 url?url:"unspecified");
1739
1740 return socket_send( client, clusterxml, &len);
1741 }
1742
1743 return APR_SUCCESS;
1744 }
1745
1746 static apr_status_t
print_xml_footer(apr_socket_t * client)1747 print_xml_footer( apr_socket_t *client )
1748 {
1749 apr_status_t status;
1750 apr_size_t len;
1751 if(cluster_tag)
1752 {
1753 len = 11;
1754 status = socket_send(client, "</CLUSTER>\n", &len);
1755 if(status != APR_SUCCESS)
1756 {
1757 return status;
1758 }
1759 }
1760 len = 15;
1761 return socket_send( client, "</GANGLIA_XML>\n", &len);
1762 }
1763
1764 static apr_status_t
print_host_start(apr_socket_t * client,Ganglia_host * hostinfo)1765 print_host_start( apr_socket_t *client, Ganglia_host *hostinfo)
1766 {
1767 apr_size_t len;
1768 char hostxml[1024]; /* for <HOST></HOST> */
1769 apr_time_t now = apr_time_now();
1770 int tn = (now - hostinfo->last_heard_from) / APR_USEC_PER_SEC;
1771
1772 len = apr_snprintf(hostxml, 1024,
1773 "<HOST NAME=\"%s\" IP=\"%s\" TAGS=\"%s\" REPORTED=\"%d\" TN=\"%d\" TMAX=\"%d\" DMAX=\"%d\" LOCATION=\"%s\" GMOND_STARTED=\"%d\">\n",
1774 hostinfo->hostname,
1775 hostinfo->ip,
1776 tags ? tags : "",
1777 (int)(hostinfo->last_heard_from / APR_USEC_PER_SEC),
1778 tn,
1779 host_tmax,
1780 host_dmax,
1781 hostinfo->location? hostinfo->location: "unspecified",
1782 hostinfo->gmond_started);
1783
1784 return socket_send(client, hostxml, &len);
1785 }
1786
1787 /* NOT THREAD SAFE */
1788 static char *
host_metric_type(Ganglia_value_types type)1789 host_metric_type( Ganglia_value_types type)
1790 {
1791 switch(type)
1792 {
1793 case GANGLIA_VALUE_UNKNOWN:
1794 return "unknown";
1795 case GANGLIA_VALUE_STRING:
1796 return "string";
1797 case GANGLIA_VALUE_UNSIGNED_SHORT:
1798 return "uint16";
1799 case GANGLIA_VALUE_SHORT:
1800 return "int16";
1801 case GANGLIA_VALUE_UNSIGNED_INT:
1802 return "uint32";
1803 case GANGLIA_VALUE_INT:
1804 return "int32";
1805 case GANGLIA_VALUE_FLOAT:
1806 return "float";
1807 case GANGLIA_VALUE_DOUBLE:
1808 return "double";
1809 }
1810 return "undef";
1811 }
1812
1813 /* NOT THREAD SAFE */
1814 static char *
host_metric_value(Ganglia_25metric * metric,Ganglia_value_msg * message)1815 host_metric_value( Ganglia_25metric *metric, Ganglia_value_msg *message )
1816 {
1817 static char value[1024];
1818 if(!metric||!message)
1819 {
1820 return "unknown";
1821 }
1822
1823 switch(metric->type)
1824 {
1825 case GANGLIA_VALUE_UNKNOWN:
1826 return "unknown";
1827 case GANGLIA_VALUE_STRING:
1828 return message->Ganglia_value_msg_u.gstr.str;
1829 case GANGLIA_VALUE_UNSIGNED_SHORT:
1830 apr_snprintf(value, 1024, metric->fmt, message->Ganglia_value_msg_u.gu_short.us);
1831 return value;
1832 case GANGLIA_VALUE_SHORT:
1833 /* For right now.. there are no metrics which are signed shorts... use u_short */
1834 apr_snprintf(value, 1024, metric->fmt, message->Ganglia_value_msg_u.gs_short.ss);
1835 return value;
1836 case GANGLIA_VALUE_UNSIGNED_INT:
1837 apr_snprintf(value, 1024, metric->fmt, message->Ganglia_value_msg_u.gu_int.ui);
1838 return value;
1839 case GANGLIA_VALUE_INT:
1840 /* For right now.. there are no metric which are signed ints... use u_int */
1841 apr_snprintf(value, 1024, metric->fmt, message->Ganglia_value_msg_u.gs_int.si);
1842 return value;
1843 case GANGLIA_VALUE_FLOAT:
1844 apr_snprintf(value, 1024, metric->fmt, message->Ganglia_value_msg_u.gf.f);
1845 return value;
1846 case GANGLIA_VALUE_DOUBLE:
1847 apr_snprintf(value, 1024, metric->fmt, message->Ganglia_value_msg_u.gd.d);
1848 return value;
1849 }
1850
1851 return "unknown";
1852 }
1853
1854 static int
xml_escape(char * dst,const char * src,size_t max_dst_len)1855 xml_escape(char *dst, const char *src, size_t max_dst_len)
1856 {
1857 unsigned int ch;
1858 const char *s = src;
1859 char *dst_start = dst;
1860
1861 max_dst_len -= 7; //6+1 is the max length of an entity
1862
1863 *dst = '\0';
1864 while (*s)
1865 {
1866 if (dst - dst_start > max_dst_len)
1867 {
1868 *dst='\0'; break;
1869 }
1870 switch (*s)
1871 {
1872 case '&':
1873 strncat(dst, "&", max_dst_len);
1874 dst += 5;
1875 break;
1876 case '<':
1877 strncat(dst, "<", max_dst_len);
1878 dst += 4;
1879 break;
1880 case '>':
1881 strncat(dst, ">", max_dst_len);
1882 dst += 4;
1883 break;
1884 case '"':
1885 strncat(dst, """, max_dst_len);
1886 dst += 6;
1887 break;
1888 case '\'':
1889 strncat(dst, "'", max_dst_len);
1890 dst += 6;
1891 break;
1892 default:
1893 ch = (unsigned int)(*s);
1894 if ((ch < 32) || (ch > 126)) {
1895 *dst ++ = '&';
1896 *dst ++ = '#';
1897 *dst ++ = (char)((ch / 10) + '0');
1898 *dst ++ = (char)((ch % 10) + '0');
1899 *dst ++ =';';
1900 *dst = '\0';
1901 } else {
1902 *dst ++ = *s;
1903 *dst = '\0';
1904 }
1905 }
1906 s++;
1907 }
1908 return (dst - dst_start);
1909 }
1910
1911 static char *
gmetric_value_to_str(Ganglia_value_msg * message)1912 gmetric_value_to_str(Ganglia_value_msg *message)
1913 {
1914 static char value[1024];
1915 if(!message)
1916 {
1917 return "unknown";
1918 }
1919
1920 switch(message->id)
1921 {
1922 case gmetric_string:
1923 xml_escape(value, message->Ganglia_value_msg_u.gstr.str, 1024);
1924 return value;
1925 case gmetric_ushort:
1926 apr_snprintf(value, 1024, message->Ganglia_value_msg_u.gu_short.fmt, message->Ganglia_value_msg_u.gu_short.us);
1927 return value;
1928 case gmetric_short:
1929 /* For right now.. there are no metrics which are signed shorts... use u_short */
1930 apr_snprintf(value, 1024, message->Ganglia_value_msg_u.gs_short.fmt, message->Ganglia_value_msg_u.gs_short.ss);
1931 return value;
1932 case gmetric_uint:
1933 apr_snprintf(value, 1024, message->Ganglia_value_msg_u.gu_int.fmt, message->Ganglia_value_msg_u.gu_int.ui);
1934 return value;
1935 case gmetric_int:
1936 /* For right now.. there are no metric which are signed ints... use u_int */
1937 apr_snprintf(value, 1024, message->Ganglia_value_msg_u.gs_int.fmt, message->Ganglia_value_msg_u.gs_int.si);
1938 return value;
1939 case gmetric_float:
1940 apr_snprintf(value, 1024, message->Ganglia_value_msg_u.gf.fmt, message->Ganglia_value_msg_u.gf.f);
1941 return value;
1942 case gmetric_double:
1943 apr_snprintf(value, 1024, message->Ganglia_value_msg_u.gd.fmt, message->Ganglia_value_msg_u.gd.d);
1944 return value;
1945 case gmetadata_full:
1946 case gmetadata_request:
1947 default:
1948 return "unknown";
1949 }
1950 }
1951
1952 static apr_status_t
print_host_metric(apr_socket_t * client,Ganglia_metadata * data,Ganglia_metadata * val,apr_time_t now)1953 print_host_metric( apr_socket_t *client, Ganglia_metadata *data, Ganglia_metadata *val, apr_time_t now )
1954 {
1955 char metricxml[1024];
1956 apr_size_t len;
1957 apr_status_t ret;
1958 char *metricName=NULL, *realName=NULL;
1959
1960 if (!data || !val)
1961 return APR_SUCCESS;
1962
1963 get_metric_names (&(data->message_u.f_message.Ganglia_metadata_msg_u.gfull.metric_id), &metricName, &realName);
1964
1965 if (!metricName || (!strcasecmp(metricName, "heartbeat") || !strcasecmp(metricName, "location")))
1966 {
1967 if (metricName) free(metricName);
1968 if (realName) free(realName);
1969 return APR_SUCCESS;
1970 }
1971
1972 len = apr_snprintf(metricxml, 1024,
1973 "<METRIC NAME=\"%s\" VAL=\"%s\" TYPE=\"%s\" UNITS=\"%s\" TN=\"%d\" TMAX=\"%d\" DMAX=\"%d\" SLOPE=\"%s\">\n",
1974 metricName,
1975 gmetric_value_to_str(&(val->message_u.v_message)),
1976 data->message_u.f_message.Ganglia_metadata_msg_u.gfull.metric.type,
1977 data->message_u.f_message.Ganglia_metadata_msg_u.gfull.metric.units,
1978 (int)((now - val->last_heard_from) / APR_USEC_PER_SEC),
1979 data->message_u.f_message.Ganglia_metadata_msg_u.gfull.metric.tmax,
1980 data->message_u.f_message.Ganglia_metadata_msg_u.gfull.metric.dmax,
1981 slope_to_cstr(data->message_u.f_message.Ganglia_metadata_msg_u.gfull.metric.slope));
1982
1983 if (metricName) free(metricName);
1984 if (realName) free(realName);
1985
1986 ret = socket_send(client, metricxml, &len);
1987 if ((ret == APR_SUCCESS) && allow_extra_data)
1988 {
1989 int extra_len = data->message_u.f_message.Ganglia_metadata_msg_u.gfull.metric.metadata.metadata_len;
1990 len = apr_snprintf(metricxml, 1024, "<EXTRA_DATA>\n");
1991 socket_send(client, metricxml, &len);
1992 for (; extra_len > 0; extra_len--)
1993 {
1994 len = apr_snprintf(metricxml, 1024, "<EXTRA_ELEMENT NAME=\"%s\" VAL=\"%s\"/>\n",
1995 data->message_u.f_message.Ganglia_metadata_msg_u.gfull.metric.metadata.metadata_val[extra_len-1].name,
1996 data->message_u.f_message.Ganglia_metadata_msg_u.gfull.metric.metadata.metadata_val[extra_len-1].data);
1997 socket_send(client, metricxml, &len);
1998 }
1999 len = apr_snprintf(metricxml, 1024, "</EXTRA_DATA>\n");
2000 socket_send(client, metricxml, &len);
2001 }
2002 /* Send the closing tag */
2003 len = apr_snprintf(metricxml, 1024, "</METRIC>\n");
2004
2005 return socket_send(client, metricxml, &len);
2006 }
2007
2008 static apr_status_t
print_host_end(apr_socket_t * client)2009 print_host_end( apr_socket_t *client)
2010 {
2011 apr_size_t len = 8;
2012 return socket_send(client, "</HOST>\n", &len);
2013 }
2014
2015 static void
process_tcp_accept_channel(const apr_pollfd_t * desc,apr_time_t now)2016 process_tcp_accept_channel(const apr_pollfd_t *desc, apr_time_t now)
2017 {
2018 apr_status_t status;
2019 apr_hash_index_t *hi, *metric_hi;
2020 void *val;
2021 apr_socket_t *client, *server;
2022 apr_sockaddr_t *remotesa = NULL;
2023 char remoteip[256];
2024 apr_pool_t *client_context = NULL;
2025 Ganglia_channel *channel;
2026
2027 server = desc->desc.s;
2028 /* We could also use the apr_socket_data_get/set() functions
2029 * to have per socket user data .. see APR docs */
2030 channel = desc->client_data;
2031
2032 /* Create a context for the client connection */
2033 apr_pool_create(&client_context, global_context);
2034
2035 /* Accept the connection */
2036 status = apr_socket_accept(&client, server, client_context);
2037 if(status != APR_SUCCESS)
2038 {
2039 goto close_accept_socket;
2040 }
2041
2042 /* Set the timeout for writing to the client */
2043 apr_socket_timeout_set( client, channel->timeout);
2044
2045 apr_socket_addr_get(&remotesa, APR_REMOTE, client);
2046 /* This function is in ./lib/apr_net.c and not APR. The
2047 * APR counterpart is apr_sockaddr_ip_get() but we don't
2048 * want to malloc memory evertime we call this */
2049 apr_sockaddr_ip_buffer_get(remoteip, 256, remotesa);
2050
2051 /* Check the ACL */
2052 if(Ganglia_acl_action( channel->acl, remotesa ) != GANGLIA_ACCESS_ALLOW)
2053 goto close_accept_socket;
2054
2055 if ( channel->gzip_output )
2056 {
2057 z_stream *strm = zstream_new();
2058 if (strm == NULL)
2059 {
2060 debug_msg("failed to allocate gzip stream");
2061 goto close_accept_socket;
2062 }
2063 apr_status_t r = apr_socket_data_set(client, strm, GZIP_KEY, &zstream_destroy);
2064 if (r != APR_SUCCESS)
2065 {
2066 debug_msg("failed to set socket user data");
2067 zstream_destroy(strm);
2068 goto close_accept_socket;
2069 }
2070 }
2071
2072 /* Print the DTD, GANGLIA_XML and CLUSTER tags */
2073 status = print_xml_header(client);
2074 if(status != APR_SUCCESS)
2075 goto close_accept_socket;
2076
2077 /* Walk the host hash */
2078 apr_thread_mutex_lock(hosts_mutex);
2079 for(hi = apr_hash_first(client_context, hosts);
2080 hi;
2081 hi = apr_hash_next(hi))
2082 {
2083 apr_hash_this(hi, NULL, NULL, &val);
2084 status = print_host_start(client, (Ganglia_host *)val);
2085 if(status != APR_SUCCESS)
2086 {
2087 /* Release the mutex and close down the accepted socket */
2088 apr_thread_mutex_unlock(hosts_mutex);
2089 apr_socket_shutdown(client, APR_SHUTDOWN_READ);
2090 apr_socket_close(client);
2091 apr_pool_destroy(client_context);
2092 return;
2093 }
2094
2095 /* Send the metric info for this particular host */
2096 apr_thread_mutex_lock(((Ganglia_host *)val)->mutex);
2097 for(metric_hi = apr_hash_first(client_context, ((Ganglia_host *)val)->metrics);
2098 metric_hi; metric_hi = apr_hash_next(metric_hi))
2099 {
2100 void *metric, *mval;
2101 apr_hash_this(metric_hi, NULL, NULL, &metric);
2102
2103 mval = apr_hash_get(((Ganglia_host *)val)->gmetrics, ((Ganglia_metadata*)metric)->name, APR_HASH_KEY_STRING);
2104
2105 /* Print each of the metrics for a host ... */
2106 if(print_host_metric(client, metric, mval, now) != APR_SUCCESS)
2107 {
2108 /* Release the mutex and close down the accepted socket */
2109 apr_thread_mutex_unlock(((Ganglia_host *)val)->mutex);
2110 apr_thread_mutex_unlock(hosts_mutex);
2111 apr_socket_shutdown(client, APR_SHUTDOWN_READ);
2112 apr_socket_close(client);
2113 apr_pool_destroy(client_context);
2114 return;
2115 }
2116 }
2117 apr_thread_mutex_unlock(((Ganglia_host *)val)->mutex);
2118
2119 /* Close the host tag */
2120 status = print_host_end(client);
2121 if(status != APR_SUCCESS)
2122 {
2123 /* Release the mutex and close down the accepted socket */
2124 apr_thread_mutex_unlock(hosts_mutex);
2125 apr_socket_shutdown(client, APR_SHUTDOWN_READ);
2126 apr_socket_close(client);
2127 apr_pool_destroy(client_context);
2128 return;
2129 }
2130 }
2131 apr_thread_mutex_unlock(hosts_mutex);
2132
2133 /* Close the CLUSTER and GANGLIA_XML tags */
2134 print_xml_footer(client);
2135
2136 status = socket_flush( client, channel->gzip_output );
2137 if (status != APR_SUCCESS)
2138 {
2139 debug_msg("failed to finish compressing stream; returned '%d'",status);
2140 goto close_accept_socket;
2141 }
2142
2143 /* Close down the accepted socket */
2144 close_accept_socket:
2145 apr_socket_shutdown(client, APR_SHUTDOWN_READ);
2146 apr_socket_close(client);
2147 apr_pool_destroy(client_context);
2148 }
2149
2150
2151 static void
poll_udp_listen_channels(apr_interval_time_t timeout,apr_time_t now)2152 poll_udp_listen_channels( apr_interval_time_t timeout, apr_time_t now)
2153 {
2154 apr_status_t status;
2155 const apr_pollfd_t *descs = NULL;
2156 apr_int32_t num = 0;
2157 apr_int32_t i;
2158
2159 /* Poll for incoming UDP data */
2160 status = apr_pollset_poll(udp_listen_channels, timeout, &num, &descs);
2161 if (status != APR_SUCCESS && status != APR_TIMEUP) {
2162 char buff[128];
2163 debug_msg("apr_pollset_poll returned unexpected status %d = %s\n",
2164 status, apr_strerror(status, buff, 128));
2165 return;
2166 }
2167
2168 for(i = 0; i< num ; i++)
2169 {
2170 Ganglia_channel *channel = descs[i].client_data;
2171 switch( channel->type )
2172 {
2173 case UDP_RECV_CHANNEL:
2174 process_udp_recv_channel(descs+i, now);
2175 udp_last_heard = apr_time_now();
2176 break;
2177 default:
2178 continue;
2179 }
2180 }
2181 }
2182
2183 static void
poll_tcp_listen_channels(apr_interval_time_t timeout,apr_time_t now)2184 poll_tcp_listen_channels( apr_interval_time_t timeout, apr_time_t now)
2185 {
2186 apr_status_t status;
2187 const apr_pollfd_t *descs = NULL;
2188 apr_int32_t num = 0;
2189 apr_int32_t i;
2190
2191 /* Poll for incoming TCP requests */
2192 status = apr_pollset_poll(tcp_listen_channels, timeout, &num, &descs);
2193 if (status != APR_SUCCESS && status != APR_TIMEUP) {
2194 char buff[128];
2195 debug_msg("apr_pollset_poll returned unexpected status %d = %s\n",
2196 status, apr_strerror(status, buff, 128));
2197 return;
2198 }
2199
2200 for(i = 0; i< num ; i++)
2201 {
2202 Ganglia_channel *channel = descs[i].client_data;
2203 switch( channel->type )
2204 {
2205 case TCP_ACCEPT_CHANNEL:
2206 debug_msg("[tcp] Request for XML data received.");
2207 process_tcp_accept_channel(descs+i, now);
2208 debug_msg("[tcp] Request for XML data completed.");
2209 break;
2210 default:
2211 continue;
2212 }
2213 }
2214 }
2215
2216 static int
tcp_send_message(char * buf,int len)2217 tcp_send_message( char *buf, int len )
2218 {
2219 /* Mirror of UDP send message for TCP channels */
2220 return 0;
2221 }
2222
2223 static int
send_message(char * buf,int len)2224 send_message( char *buf, int len )
2225 {
2226 return Ganglia_udp_send_message(udp_send_channels, buf, len ) + tcp_send_message( buf, len );
2227 }
2228
2229 static Ganglia_metric_callback *
Ganglia_metric_cb_define(char * name,metric_func cb,int index,mmodule * modp)2230 Ganglia_metric_cb_define(char *name, metric_func cb, int index, mmodule *modp)
2231 {
2232 Ganglia_metric_callback *metric = apr_pcalloc( global_context, sizeof(Ganglia_metric_callback));
2233 if(!metric)
2234 return NULL;
2235
2236 metric->name = apr_pstrdup( global_context, name );
2237 if(!metric->name)
2238 return NULL;
2239
2240 /* index is used to determine which metric to gather for multi-metric
2241 callback functions. This is to support metric modules or handlers
2242 that have the ability to gather more than one metric. */
2243 if (index == CB_NOINDEX)
2244 metric->cb = (metric_func_void)cb;
2245 else
2246 metric->cbindexed = cb;
2247
2248 metric->modp = modp;
2249 metric->multi_metric_index = index;
2250
2251 apr_hash_set( metric_callbacks, metric->name, APR_HASH_KEY_STRING, metric);
2252 return metric;
2253 }
2254
2255 g_val_t
gexec_func(void)2256 gexec_func ( void )
2257 {
2258 g_val_t val;
2259 if( gexec_on )
2260 snprintf(val.str, MAX_G_STRING_SIZE, "%s", "ON");
2261 else
2262 snprintf(val.str, MAX_G_STRING_SIZE, "%s", "OFF");
2263 return val;
2264 }
2265
2266 g_val_t
heartbeat_func(void)2267 heartbeat_func( void )
2268 {
2269 g_val_t val;
2270 val.uint32 = started / APR_USEC_PER_SEC;
2271 return val;
2272 }
2273
2274 g_val_t
location_func(void)2275 location_func(void)
2276 {
2277 g_val_t val;
2278 if(!host_location)
2279 {
2280 cfg_t *host = cfg_getsec(config_file, "host");
2281 host_location = cfg_getstr( host, "location");
2282 }
2283 snprintf(val.str, MAX_G_STRING_SIZE, "%s", host_location);
2284 return val;
2285 }
2286
modular_metric_cleanup(void * param)2287 static apr_status_t modular_metric_cleanup(void *param)
2288 {
2289 mmodule *modp = (mmodule*)param;
2290 if (modp->cleanup) {
2291 modp->cleanup();
2292 }
2293 return APR_SUCCESS;
2294 }
2295
2296 static void
load_metric_modules(void)2297 load_metric_modules( void )
2298 {
2299 cfg_t *tmp;
2300 int j;
2301
2302 tmp = cfg_getsec( config_file, "modules");
2303 for (j = 0; j < cfg_size(tmp, "module"); j++)
2304 {
2305 apr_dso_handle_t *modHandle = NULL;
2306 apr_dso_handle_sym_t modSym;
2307 mmodule *modp;
2308 char *modPath=NULL, *modName=NULL, *modparams=NULL, *modLanguage=NULL;
2309 apr_array_header_t *modParams_list = NULL;
2310 int k, modEnabled;
2311 apr_status_t merge_ret;
2312
2313 cfg_t *module = cfg_getnsec(tmp, "module", j);
2314
2315 /* Check the module language to make sure that
2316 the module is loaded correctly or should be
2317 delegated to an alternate module interface
2318 */
2319 modLanguage = cfg_getstr(module, "language");
2320 if (modLanguage && strcasecmp(modLanguage, "C/C++"))
2321 continue;
2322
2323 /* Check to make sure that the module is enabled.
2324 */
2325 modEnabled = cfg_getbool(module, "enabled");
2326 if (!modEnabled)
2327 continue;
2328
2329 modPath = cfg_getstr(module, "path");
2330 if(modPath && *modPath != '/' && *modPath != '.')
2331 {
2332 if (module_dir)
2333 merge_ret = apr_filepath_merge(&modPath, module_dir,
2334 modPath,
2335 APR_FILEPATH_NOTRELATIVE | APR_FILEPATH_NATIVE,
2336 global_context);
2337 else
2338 merge_ret = apr_filepath_merge(&modPath, GANGLIA_MODULE_DIR,
2339 modPath,
2340 APR_FILEPATH_NOTRELATIVE | APR_FILEPATH_NATIVE,
2341 global_context);
2342
2343 if (merge_ret != APR_SUCCESS)
2344 modPath = cfg_getstr(module, "path");
2345 }
2346 modName = cfg_getstr(module, "name");
2347 modparams = cfg_getstr(module, "params");
2348 modParams_list = apr_array_make(global_context, 2, sizeof(mmparam));
2349
2350 for (k = 0; k < cfg_size(module, "param"); k++)
2351 {
2352 cfg_t *param;
2353 mmparam *node = apr_array_push(modParams_list);
2354
2355 param = cfg_getnsec(module, "param", k);
2356 node->name = apr_pstrdup(global_context, param->title);
2357 node->value = apr_pstrdup(global_context, cfg_getstr(param, "value"));
2358 }
2359
2360 /*
2361 * Load the file into the gmond address space
2362 */
2363 if (apr_dso_load(&modHandle, modPath, global_context) != APR_SUCCESS)
2364 {
2365 char my_error[256];
2366
2367 err_msg("Cannot load %s metric module: %s", modPath,
2368 apr_dso_error(modHandle, my_error, sizeof(my_error)));
2369 if (!modPath)
2370 err_msg("No load path specified for module: %s or incorrect module language designation [%s].\n",
2371 modName, modLanguage);
2372 continue;
2373 }
2374 debug_msg("loaded module: %s", modName);
2375
2376 /*
2377 * Retrieve the pointer to the module structure through the module name.
2378 */
2379 if (apr_dso_sym(&modSym, modHandle, modName) != APR_SUCCESS)
2380 {
2381 char my_error[256];
2382
2383 err_msg("Cannot locate internal module structure '%s' in file %s: %s\nPossibly an incorrect module language designation [%s].\n",
2384 modName, modPath, apr_dso_error(modHandle, my_error, sizeof(my_error)), modLanguage);
2385 continue;
2386 }
2387
2388 modp = (mmodule*) modSym;
2389 modp->dynamic_load_handle = (apr_dso_handle_t *)modHandle;
2390 modp->module_name = apr_pstrdup (global_context, modName);
2391 modp->module_params = apr_pstrdup (global_context, modparams);
2392 modp->module_params_list = modParams_list;
2393 modp->config_file = config_file;
2394
2395 /*
2396 * Make sure the found module structure is really a module structure
2397 *
2398 */
2399 if (modp->magic != MMODULE_MAGIC_COOKIE) {
2400 err_msg("Internal module structure '%s' in file %s is not compatible -"
2401 "perhaps this is not a metric module.\n",
2402 modName, modPath);
2403 continue;
2404 }
2405
2406 /* Validate that the module was built against a compatible module interface API. */
2407 if (modp->version != MMODULE_MAGIC_NUMBER_MAJOR) {
2408 err_msg("Module \"%s\" is not compatible with this "
2409 "version of Gmond (found %d, need %d).",
2410 modName, modp->version, MMODULE_MAGIC_NUMBER_MAJOR);
2411 continue;
2412 }
2413
2414 if (metric_modules != NULL) {
2415 modp->next = metric_modules;
2416 }
2417 metric_modules = modp;
2418 }
2419 return;
2420 }
2421
2422 /* This function imports the metrics from libmetrics right now but in the future
2423 * we could easily do this via DSO. */
2424 static void
setup_metric_callbacks(void)2425 setup_metric_callbacks( void )
2426 {
2427 mmodule *modp = metric_modules;
2428 Ganglia_metric_callback *metric_cb;
2429
2430 /* Create the metric_callbacks hash */
2431 metric_callbacks = apr_hash_make( global_context );
2432
2433 while (modp) {
2434 const Ganglia_25metric* metric_info;
2435 int i;
2436
2437 if (modp->init && modp->init(global_context)) {
2438 err_msg("Module %s failed to initialize.\n", modp->module_name);
2439 }
2440 else
2441 {
2442 apr_pool_cleanup_register(global_context, modp,
2443 modular_metric_cleanup,
2444 apr_pool_cleanup_null);
2445
2446 metric_info = modp->metrics_info;
2447 for (i = 0; metric_info[i].name != NULL; i++)
2448 {
2449 metric_cb = Ganglia_metric_cb_define(metric_info[i].name, modp->handler, i, modp);
2450 if (metric_cb)
2451 metric_cb->info = (Ganglia_25metric*)&(metric_info[i]);
2452 }
2453 }
2454 modp = modp->next;
2455 }
2456 }
2457
2458 void
setup_metric_info_impl(Ganglia_metric_callback * metric_cb,int group_once,const char * name,char * title,float value_threshold,int is_dynamic)2459 setup_metric_info_impl(Ganglia_metric_callback *metric_cb, int group_once, const char *name, char *title, float value_threshold, int is_dynamic)
2460 {
2461 Ganglia_25metric *metric_info = NULL;
2462
2463 if (metric_cb->info)
2464 {
2465 metric_info = apr_pcalloc( global_context, sizeof(Ganglia_25metric));
2466 memcpy (metric_info, metric_cb->info, sizeof(Ganglia_25metric));
2467 metric_info->key = modular_metric;
2468 }
2469
2470 if(metric_info)
2471 {
2472 /* Build the message */
2473 switch(metric_info->type)
2474 {
2475 case GANGLIA_VALUE_UNKNOWN:
2476 /* The 2.5.x protocol doesn't allow for unknown values. :( Do nothing. */
2477 return;
2478 case GANGLIA_VALUE_STRING:
2479 metric_info->key = gmetric_string;
2480 metric_cb->msg.Ganglia_value_msg_u.gstr.fmt = apr_pstrdup(global_context, metric_info->fmt);
2481 break;
2482 case GANGLIA_VALUE_UNSIGNED_SHORT:
2483 metric_info->key = gmetric_ushort;
2484 metric_cb->msg.Ganglia_value_msg_u.gu_short.fmt = apr_pstrdup(global_context, metric_info->fmt);
2485 break;
2486 case GANGLIA_VALUE_SHORT:
2487 metric_info->key = gmetric_short;
2488 metric_cb->msg.Ganglia_value_msg_u.gs_short.fmt = apr_pstrdup(global_context, metric_info->fmt);
2489 break;
2490 case GANGLIA_VALUE_UNSIGNED_INT:
2491 metric_info->key = gmetric_uint;
2492 metric_cb->msg.Ganglia_value_msg_u.gu_int.fmt = apr_pstrdup(global_context, metric_info->fmt);
2493 break;
2494 case GANGLIA_VALUE_INT:
2495 metric_info->key = gmetric_int;
2496 metric_cb->msg.Ganglia_value_msg_u.gs_int.fmt = apr_pstrdup(global_context, metric_info->fmt);
2497 break;
2498 case GANGLIA_VALUE_FLOAT:
2499 metric_info->key = gmetric_float;
2500 metric_cb->msg.Ganglia_value_msg_u.gf.fmt = apr_pstrdup(global_context, metric_info->fmt);
2501 break;
2502 case GANGLIA_VALUE_DOUBLE:
2503 metric_info->key = gmetric_double;
2504 metric_cb->msg.Ganglia_value_msg_u.gd.fmt = apr_pstrdup(global_context, metric_info->fmt);
2505 break;
2506 default:
2507 metric_info->key = gmetric_uint;
2508 }
2509
2510 /* This sets the key for this particular metric.
2511 * The value is set by the callback function later */
2512 metric_cb->msg.id = metric_info->key;
2513
2514 metric_cb->msg.Ganglia_value_msg_u.gstr.metric_id.host = apr_pstrdup(global_context, myname);
2515 metric_cb->msg.Ganglia_value_msg_u.gstr.metric_id.name = apr_pstrdup(global_context, metric_info->name);
2516
2517 /* Replace the host metric name with the spoof data if it exists in the metadata */
2518 if (metric_info->metadata)
2519 {
2520 const char *spfhost_val, *spfname_val;
2521
2522 spfhost_val = apr_table_get((apr_table_t *)metric_info->metadata, SPOOF_HOST);
2523 if (spfhost_val)
2524 {
2525 metric_cb->msg.Ganglia_value_msg_u.gstr.metric_id.host = apr_pstrdup(global_context, spfhost_val);
2526 metric_cb->msg.Ganglia_value_msg_u.gstr.metric_id.spoof = TRUE;
2527 spfhost_val = strchr(spfhost_val,':');
2528 if(spfhost_val)
2529 spfhost_val++;
2530 else
2531 spfhost_val = metric_cb->msg.Ganglia_value_msg_u.gstr.metric_id.host;
2532 }
2533 else
2534 spfhost_val = myname;
2535 spfname_val = apr_table_get((apr_table_t *)metric_info->metadata, SPOOF_NAME);
2536 if (spfname_val)
2537 {
2538 char *spoofedname = apr_pstrcat(global_context, spfname_val, ":", name, NULL);
2539 char *spoofedkey = apr_pstrcat(global_context, spfname_val, ":", name, ":", spfhost_val, NULL);
2540
2541 metric_cb->msg.Ganglia_value_msg_u.gstr.metric_id.name = spoofedname;
2542 metric_cb->msg.Ganglia_value_msg_u.gstr.metric_id.spoof = TRUE;
2543
2544 /* Reinsert the same metric_callback structure pointer under the spoofed name.
2545 This will put the same metric info in the hash table twice but under
2546 the spoofed name. */
2547 apr_hash_set( metric_callbacks, spoofedkey, APR_HASH_KEY_STRING, metric_cb);
2548 }
2549 }
2550
2551 /* Save the location of information about this particular metric */
2552 metric_cb->info = metric_info;
2553
2554 /* Set the value threshold for this particular metric */
2555 metric_cb->value_threshold = value_threshold;
2556
2557 /* Fill in the title or short descriptive name of the metric if
2558 * one had been given in the configuration file. Otherwise just
2559 * copy the metric name as the title. */
2560 if (title)
2561 {
2562 metric_cb->title = apr_pstrdup(global_context, title);
2563 }
2564 else
2565 {
2566 metric_cb->title = apr_pstrdup(global_context, metric_info->name);
2567 }
2568
2569 /* If this metric will only be collected once, run it now at setup... */
2570 if(group_once)
2571 {
2572 if (metric_cb->multi_metric_index == CB_NOINDEX)
2573 metric_cb->now = metric_cb->cb();
2574 else
2575 metric_cb->now = metric_cb->cbindexed(metric_cb->multi_metric_index);
2576 }
2577 else
2578 {
2579 /* ... otherwise set it to zero */
2580 memset( &(metric_cb->now), 0, sizeof(g_val_t));
2581 }
2582 memset( &(metric_cb->last), 0, sizeof(g_val_t));
2583 }
2584
2585 return;
2586 }
2587
2588 void
setup_metric_info(Ganglia_metric_callback * metric_cb,int group_once,cfg_t * metric_cfg,int is_dynamic)2589 setup_metric_info(Ganglia_metric_callback *metric_cb, int group_once, cfg_t *metric_cfg, int is_dynamic)
2590 {
2591 char *name = cfg_getstr ( metric_cfg, "name");
2592 char *title = cfg_getstr ( metric_cfg, "title");
2593 float value_threshold = cfg_getfloat( metric_cfg, "value_threshold");
2594
2595 setup_metric_info_impl(metric_cb, group_once, name, title, value_threshold, is_dynamic);
2596 }
2597
2598 #define PCRE_MAX_SUBPATTERNS 9
2599 #define PCRE_OVECCOUNT ((1 + PCRE_MAX_SUBPATTERNS) * 3)
2600
2601 double
setup_collection_groups(void)2602 setup_collection_groups( void )
2603 {
2604 int i, num_collection_groups = cfg_size( config_file, "collection_group" );
2605 double bytes_per_sec = 0;
2606
2607 /* Create the collection group array */
2608 collection_groups = apr_array_make( global_context, num_collection_groups,
2609 sizeof(Ganglia_collection_group *));
2610
2611 for(i = 0; i < num_collection_groups; i++)
2612 {
2613 int j, num_metrics;
2614 cfg_t *group_conf;
2615 Ganglia_collection_group *group = apr_pcalloc( global_context,
2616 sizeof(Ganglia_collection_group));
2617 if(!group)
2618 {
2619 err_msg("Unable to malloc memory for collection group. Exiting.\n");
2620 exit(EXIT_FAILURE);
2621 }
2622
2623 group_conf = cfg_getnsec( config_file, "collection_group", i);
2624 group->once = cfg_getbool( group_conf, "collect_once");
2625 group->collect_every = cfg_getint( group_conf, "collect_every");
2626 group->time_threshold = cfg_getint( group_conf, "time_threshold");
2627
2628 if(group->once)
2629 {
2630 /* TODO: this isn't pretty but simplifies the code( next collect in a year)
2631 since we will collect the value in this function */
2632 group->next_collect = apr_time_now() + (31536000 * APR_USEC_PER_SEC);
2633 }
2634 else
2635 {
2636 group->next_collect = 0;
2637 }
2638
2639 group->next_send = 0;
2640
2641 num_metrics = cfg_size( group_conf, "metric" );
2642 group->metric_array = apr_array_make(global_context, num_metrics,
2643 sizeof(Ganglia_metric_callback *));
2644 for(j=0; j< num_metrics; j++)
2645 {
2646 cfg_t *metric = cfg_getnsec( group_conf, "metric", j );
2647 char *name = cfg_getstr ( metric, "name");
2648 #ifdef HAVE_LIBPCRE
2649 char *name_match = cfg_getstr ( metric, "name_match");
2650
2651 if(name_match != NULL)
2652 {
2653 pcre *pcre_re;
2654 const char *pcre_err_ptr;
2655 int pcre_err_offset;
2656 int pcre_ovector[PCRE_OVECCOUNT];
2657 int pcre_rc;
2658
2659 apr_hash_index_t *hi;
2660 apr_pool_t *p;
2661 void *val;
2662 const char *key;
2663 int found = 0;
2664
2665 if((pcre_re = pcre_compile(name_match, 0, &pcre_err_ptr, &pcre_err_offset, NULL)) == NULL)
2666 {
2667 err_msg ("pcre_compile failed on %s\n", name_match);
2668 exit (1);
2669 }
2670
2671
2672 /* Create a sub-pool for this channel */
2673 if(apr_pool_create(&p, global_context) != APR_SUCCESS)
2674 {
2675 err_msg("pool creation failed\n");
2676 exit(EXIT_FAILURE);
2677 }
2678
2679 for(hi = apr_hash_first(p, metric_callbacks);
2680 hi;
2681 hi = apr_hash_next(hi))
2682 {
2683 Ganglia_metric_callback *cb;
2684
2685 apr_hash_this(hi, (const void**)&key, NULL, &val);
2686 if((pcre_rc = pcre_exec(pcre_re, NULL, key, strlen(key), 0, 0, pcre_ovector, PCRE_OVECCOUNT)) < 1)
2687 {
2688 switch(pcre_rc)
2689 {
2690 case PCRE_ERROR_NOMATCH:
2691 break;
2692 case 0:
2693 /* output vector not big enough */
2694 default:
2695 /* unexpected error */
2696 err_msg ("unexpected pcre_exec error\n");
2697 exit (1);
2698 }
2699 }
2700 else
2701 {
2702 char *title_r = NULL;
2703 char *title_tmpl = cfg_getstr ( metric, "title");
2704 float value_threshold = cfg_getfloat( metric, "value_threshold");
2705
2706 if(title_tmpl != NULL)
2707 {
2708 struct iovec *ptrs;
2709 int i, k = 0, j = 0;
2710
2711 if((ptrs = apr_pcalloc(p, strlen(title_tmpl) * sizeof(struct iovec))) == NULL)
2712 {
2713 err_msg("apr_pcalloc failed\n");
2714 exit(EXIT_FAILURE);
2715 }
2716 for (i = 0; title_tmpl[i] != 0; i++)
2717 {
2718 char c = title_tmpl[i];
2719 if(c == '\\')
2720 {
2721 if(i > k)
2722 {
2723 ptrs[j].iov_base = apr_pstrndup(p, title_tmpl+k, i-k);
2724 ptrs[j++].iov_len = i-k;
2725 }
2726 i++;
2727 k = i+1;
2728 c = title_tmpl[i];
2729 if(c != 0)
2730 {
2731 int pos1, pos2, index = (c - '0');
2732 if(index < 1 || index > PCRE_MAX_SUBPATTERNS)
2733 {
2734 err_msg("title [%s] contains invalid reference to subpattern\n", title_tmpl);
2735 exit(EXIT_FAILURE);
2736 }
2737 pos1 = pcre_ovector[index * 2];
2738 pos2 = pcre_ovector[index * 2 + 1];
2739 ptrs[j].iov_base = apr_pstrndup(p, key + pos1, pos2-pos1);
2740 ptrs[j++].iov_len = pos2-pos1;
2741 }
2742 else
2743 i--;
2744 }
2745 }
2746
2747 if(i-k > 0)
2748 {
2749 ptrs[j].iov_base = apr_pstrndup(p, title_tmpl+k, i-k);
2750 ptrs[j++].iov_len = i-k;
2751 }
2752
2753 title_r = apr_pstrcatv(p, ptrs, j, NULL);
2754 }
2755
2756 cb = val;
2757
2758 /* setup the metric instance */
2759 setup_metric_info_impl (cb, group->once, key, title_r, value_threshold, 1);
2760
2761 if (cb->info) {
2762 bytes_per_sec += ( (double)(cb->info->msg_size) / (double)group->time_threshold );
2763 }
2764
2765 /* Push this metric onto the metric_array for this group */
2766 *(Ganglia_metric_callback **)apr_array_push(group->metric_array) = cb;
2767 found = 1;
2768 }
2769 }
2770 apr_pool_destroy(p);
2771
2772 if (!found)
2773 err_msg("Unable to find any metric information for '%s'. Possible that a module has not been loaded.\n", name_match);
2774
2775 }
2776 else
2777 #endif
2778 {
2779 Ganglia_metric_callback *metric_cb = (Ganglia_metric_callback *)
2780 apr_hash_get( metric_callbacks, name, APR_HASH_KEY_STRING );
2781
2782 if(!metric_cb)
2783 {
2784 /*
2785 * If a metric callback was not found and the metric is dynamic,
2786 * then search through the callback sequentially.
2787 */
2788
2789 apr_hash_index_t *hi;
2790 apr_pool_t *p;
2791 apr_ssize_t klen = strlen(name);
2792 void *val;
2793 const char *key;
2794 int found = 0;
2795
2796 /* Create a sub-pool for this channel */
2797 apr_pool_create(&p, global_context);
2798
2799 for(hi = apr_hash_first(p, metric_callbacks);
2800 hi;
2801 hi = apr_hash_next(hi))
2802 {
2803 Ganglia_metric_callback *cb;
2804
2805 apr_hash_this(hi, (const void**)&key, NULL, &val);
2806
2807 if (strncasecmp(key, name, klen) == 0) {
2808 cb = val;
2809 setup_metric_info (cb, group->once, metric, 1);
2810
2811 if (cb->info) {
2812 bytes_per_sec += ( (double)(cb->info->msg_size) / (double)group->time_threshold );
2813 }
2814
2815 /* Push this metric onto the metric_array for this group */
2816 *(Ganglia_metric_callback **)apr_array_push(group->metric_array) = cb;
2817 found = 1;
2818 }
2819 }
2820 apr_pool_destroy(p);
2821
2822 if (!found)
2823 err_msg("Unable to find the metric information for '%s'. Possible that the module has not been loaded.\n", name);
2824 }
2825 else
2826 {
2827 setup_metric_info (metric_cb, group->once, metric, 0);
2828
2829 if (metric_cb->info) {
2830 bytes_per_sec += ( (double)(metric_cb->info->msg_size) / (double)group->time_threshold );
2831 }
2832
2833 /* Push this metric onto the metric_array for this group */
2834 *(Ganglia_metric_callback **)apr_array_push(group->metric_array) = metric_cb;
2835 }
2836 }
2837 }
2838
2839 /* Save the collection group the collection group array */
2840 *(Ganglia_collection_group **)apr_array_push(collection_groups) = group;
2841 }
2842
2843 return bytes_per_sec;
2844 }
2845
2846 void
Ganglia_collection_group_collect(Ganglia_collection_group * group,apr_time_t now)2847 Ganglia_collection_group_collect( Ganglia_collection_group *group, apr_time_t now)
2848 {
2849 int i;
2850
2851 /* Collect data for all the metrics in the groups metric array */
2852 for(i=0; i< group->metric_array->nelts; i++)
2853 {
2854 Ganglia_metric_callback *cb = ((Ganglia_metric_callback **)(group->metric_array->elts))[i];
2855
2856 debug_msg("\tmetric '%s' being collected now", cb->name);
2857 cb->last = cb->now;
2858 if (cb->multi_metric_index == CB_NOINDEX)
2859 cb->now = cb->cb();
2860 else
2861 cb->now = cb->cbindexed(cb->multi_metric_index);
2862
2863 /* Check the value threshold. If passed.. set this group to send immediately. */
2864 if( cb->value_threshold >= 0.0 )
2865 {
2866 debug_msg("\tmetric '%s' has value_threshold %f", cb->name, cb->value_threshold);
2867 switch(cb->info->type)
2868 {
2869 case GANGLIA_VALUE_UNKNOWN:
2870 case GANGLIA_VALUE_STRING:
2871 /* do nothing for non-numeric data */
2872 break;
2873 case GANGLIA_VALUE_UNSIGNED_SHORT:
2874 if( abs( cb->last.uint16 - cb->now.uint16 ) >= cb->value_threshold )
2875 group->next_send = 0; /* send immediately */
2876 break;
2877 case GANGLIA_VALUE_SHORT:
2878 if( abs( cb->last.int16 - cb->now.int16 ) >= cb->value_threshold )
2879 group->next_send = 0; /* send immediately */
2880 break;
2881 case GANGLIA_VALUE_UNSIGNED_INT:
2882 if( abs( cb->last.uint32 - cb->now.uint32 ) >= cb->value_threshold )
2883 group->next_send = 0; /* send immediately */
2884 break;
2885 case GANGLIA_VALUE_INT:
2886 if( abs( cb->last.int32 - cb->now.int32 ) >= cb->value_threshold )
2887 group->next_send = 0; /* send immediately */
2888 break;
2889 case GANGLIA_VALUE_FLOAT:
2890 if( fabsf( cb->last.f - cb->now.f ) >= cb->value_threshold )
2891 group->next_send = 0; /* send immediately */
2892 break;
2893 case GANGLIA_VALUE_DOUBLE:
2894 if( fabs( cb->last.d - cb->now.d ) >= cb->value_threshold )
2895 group->next_send = 0; /* send immediately */
2896 break;
2897 default:
2898 break;
2899 }
2900 }
2901 /* If the metadata_last_set has been set to 0 then a request
2902 * to resend the metadata has been received. Send the group
2903 * immediately */
2904 if (cb->metadata_last_sent == 0)
2905 {
2906 group->next_send = 0;
2907 }
2908 }
2909
2910 /* Set the next time this group should be collected */
2911 group->next_collect = now + (group->collect_every * APR_USEC_PER_SEC);
2912 }
2913
2914 void
Ganglia_collection_group_send(Ganglia_collection_group * group,apr_time_t now)2915 Ganglia_collection_group_send( Ganglia_collection_group *group, apr_time_t now)
2916 {
2917 int i;
2918
2919 /* This group needs to be sent */
2920 for(i=0; i< group->metric_array->nelts; i++)
2921 {
2922 XDR x;
2923 int len, errors;
2924 char metricmsg[max_udp_message_len];
2925 Ganglia_metric_callback *cb = ((Ganglia_metric_callback **)(group->metric_array->elts))[i];
2926
2927 /* Build the message */
2928 switch(cb->info->type)
2929 {
2930 case GANGLIA_VALUE_UNKNOWN:
2931 /* The 2.5.x protocol doesn't allow for unknown values. :( Do nothing. */
2932 continue;
2933 case GANGLIA_VALUE_STRING:
2934 cb->msg.Ganglia_value_msg_u.gstr.str = cb->now.str;
2935 break;
2936 case GANGLIA_VALUE_UNSIGNED_SHORT:
2937 cb->msg.Ganglia_value_msg_u.gu_short.us = cb->now.uint16;
2938 break;
2939 case GANGLIA_VALUE_SHORT:
2940 cb->msg.Ganglia_value_msg_u.gs_short.ss = cb->now.int16;
2941 break;
2942 case GANGLIA_VALUE_UNSIGNED_INT:
2943 cb->msg.Ganglia_value_msg_u.gu_int.ui = cb->now.uint32;
2944 break;
2945 case GANGLIA_VALUE_INT:
2946 cb->msg.Ganglia_value_msg_u.gs_int.si = cb->now.int32;
2947 break;
2948 case GANGLIA_VALUE_FLOAT:
2949 cb->msg.Ganglia_value_msg_u.gf.f = cb->now.f;
2950 break;
2951 case GANGLIA_VALUE_DOUBLE:
2952 cb->msg.Ganglia_value_msg_u.gd.d = cb->now.d;
2953 break;
2954 default:
2955 continue;
2956 }
2957
2958 /* Send the full metadata packet if the specified interval has elapsed or a
2959 * request has been received to resend the metadata. In this case the
2960 * metadata_last_set field will be 0. No need to send the full data
2961 * with every value update.
2962 */
2963 if (!cb->metadata_last_sent || (send_metadata_interval &&
2964 (cb->metadata_last_sent < (now - apr_time_make(send_metadata_interval,0)))))
2965 {
2966 Ganglia_metric gmetric = Ganglia_metric_create((Ganglia_pool)global_context);
2967 char *name, *val, *type;
2968 apr_pool_t *gm_pool = (apr_pool_t*)gmetric->pool;
2969
2970 if(!gmetric)
2971 {
2972 /* no memory */
2973 return;
2974 }
2975
2976 name = cb->msg.Ganglia_value_msg_u.gstr.metric_id.name;
2977 if (override_hostname != NULL)
2978 {
2979 cb->msg.Ganglia_value_msg_u.gstr.metric_id.host = apr_pstrcat(gm_pool, (char *)( override_ip != NULL ? override_ip : override_hostname ), ":", (char *) override_hostname, NULL);
2980 cb->msg.Ganglia_value_msg_u.gstr.metric_id.spoof = TRUE;
2981 }
2982 val = apr_pstrdup(gm_pool, host_metric_value(cb->info, &(cb->msg)));
2983 type = apr_pstrdup(gm_pool, host_metric_type(cb->info->type));
2984
2985 errors = Ganglia_metric_set(gmetric, name, val, type,
2986 cb->info->units, cstr_to_slope( cb->info->slope),
2987 cb->info->tmax, 0);
2988
2989 if (errors)
2990 {
2991 err_msg("Error %d setting the modular data for %s\n", errors, cb->name);
2992 }
2993 else
2994 {
2995 Ganglia_metadata_add(gmetric, "TITLE", cb->title);
2996 Ganglia_metadata_add(gmetric, "DESC", cb->info->desc);
2997
2998 /* Add the rest of the metadata here by interating through
2999 * the metadata table of the metric_info structure */
3000 if (cb->info->metadata)
3001 {
3002 int i;
3003 const apr_array_header_t *arr = apr_table_elts((apr_table_t*)cb->info->metadata);
3004 const apr_table_entry_t *elts = (const apr_table_entry_t *)arr->elts;
3005
3006 /* add all of the metadata to the packet */
3007 for (i = 0; i < arr->nelts; ++i)
3008 {
3009 if (elts[i].key == NULL)
3010 continue;
3011 Ganglia_metadata_add(gmetric, elts[i].key, elts[i].val);
3012 }
3013 }
3014
3015 debug_msg("\tsending metadata for metric: %s", cb->name);
3016
3017 ganglia_scoreboard_inc(PKTS_SENT_METADATA);
3018 ganglia_scoreboard_inc(PKTS_SENT_ALL);
3019 if (override_hostname != NULL)
3020 {
3021 errors = Ganglia_metadata_send_real(gmetric, udp_send_channels, cb->msg.Ganglia_value_msg_u.gstr.metric_id.host);
3022 }
3023 else
3024 {
3025 errors = Ganglia_metadata_send(gmetric, udp_send_channels);
3026 }
3027 if (errors)
3028 {
3029 err_msg("Error %d sending the modular data for %s\n", errors, cb->name);
3030 debug_msg("\tsent message '%s' with %d errors", cb->name, errors);
3031 ganglia_scoreboard_inc(PKTS_SENT_FAILED);
3032 }
3033 else
3034 {
3035 cb->metadata_last_sent = now; /* mark the metadata as sent */
3036 }
3037 }
3038
3039 Ganglia_metric_destroy(gmetric);
3040 }
3041
3042 /* Send the updated value packet ever time it is collected */
3043 xdrmem_create(&x, metricmsg, max_udp_message_len, XDR_ENCODE);
3044 xdr_Ganglia_value_msg(&x, &(cb->msg));
3045 len = xdr_getpos(&x);
3046 errors = send_message( metricmsg, len );
3047 debug_msg("\tsent message '%s' of length %d with %d errors", cb->name, len, errors);
3048 ganglia_scoreboard_inc(PKTS_SENT_VALUE);
3049 ganglia_scoreboard_inc(PKTS_SENT_ALL);
3050
3051 if(!errors)
3052 {
3053 /* If the message send ok. Schedule the next time threshold. */
3054 group->next_send = now + (group->time_threshold * APR_USEC_PER_SEC);
3055 }
3056 else
3057 ganglia_scoreboard_inc(PKTS_SENT_FAILED);
3058 }
3059 }
3060
3061 /* TODO: It might be necessary in the future to use a heap for the collection groups.
3062 * Running through an array should suffice for now */
3063 apr_time_t
process_collection_groups(apr_time_t now)3064 process_collection_groups( apr_time_t now )
3065 {
3066 int i;
3067 apr_time_t next = 0;
3068
3069 /* Run through each collection group and collect any data that needs collecting... */
3070 for(i=0; i< collection_groups->nelts; i++)
3071 {
3072 Ganglia_collection_group *group = ((Ganglia_collection_group **)(collection_groups->elts))[i];
3073 if(group->next_collect <= now)
3074 {
3075 Ganglia_collection_group_collect(group, now);
3076 }
3077 }
3078
3079 /* Run through each collection group and send any data that needs sending... */
3080 for(i=0; i< collection_groups->nelts; i++)
3081 {
3082 Ganglia_collection_group *group = ((Ganglia_collection_group **)(collection_groups->elts))[i];
3083 if( group->next_send <= now )
3084 {
3085 Ganglia_collection_group_send(group, now);
3086 }
3087 }
3088
3089 /* Run through each collection group and find when our next event (collect|send) occurs */
3090 for(i=0; i< collection_groups->nelts; i++)
3091 {
3092 apr_time_t min;
3093 Ganglia_collection_group *group = ((Ganglia_collection_group **)(collection_groups->elts))[i];
3094 min = group->next_send < group->next_collect? group->next_send : group->next_collect;
3095 if(!next)
3096 {
3097 next = min;
3098 }
3099 else
3100 {
3101 if(min < next)
3102 {
3103 next = min;
3104 }
3105 }
3106 }
3107
3108 /* make sure we don't schedule for the past */
3109 return next < now ? now + 1 * APR_USEC_PER_SEC: next;
3110 }
3111
3112 static void
print_metric_list(void)3113 print_metric_list( void )
3114 {
3115 apr_hash_index_t *hi;
3116 void *val;
3117 char modular_desc[1024];
3118
3119 for(hi = apr_hash_first(global_context, metric_callbacks);
3120 hi;
3121 hi = apr_hash_next(hi))
3122 {
3123 Ganglia_metric_callback *cb;
3124 Ganglia_25metric *metric_info;
3125 char *desc = NULL;
3126
3127 apr_hash_this(hi, NULL, NULL, &val);
3128 cb = val;
3129 metric_info = NULL;
3130
3131 if (cb->modp)
3132 {
3133 int i;
3134
3135 metric_info = (Ganglia_25metric *)cb->modp->metrics_info;
3136 for (i = 0; metric_info[i].name != NULL; i++)
3137 {
3138 if (strcasecmp(cb->name, metric_info[i].name) == 0)
3139 {
3140 snprintf (modular_desc, sizeof(modular_desc),
3141 "%s (module %s)",
3142 metric_info[i].desc,
3143 cb->modp->module_name);
3144
3145 desc = (char*)modular_desc;
3146 break;
3147 }
3148 }
3149 }
3150
3151 if (desc == NULL)
3152 {
3153 desc = "<no description available>";
3154 }
3155
3156 fprintf(stdout, "%-15s\t%s\n", cb->name, desc);
3157 }
3158 }
3159
3160 static void
cleanup_data(apr_pool_t * pool,apr_time_t now)3161 cleanup_data( apr_pool_t *pool, apr_time_t now )
3162 {
3163 apr_hash_index_t *hi, *metric_hi;
3164
3165 /* Walk the host hash */
3166 apr_thread_mutex_lock(hosts_mutex);
3167 for(hi = apr_hash_first(pool, hosts);
3168 hi;
3169 hi = apr_hash_next(hi))
3170 {
3171 void *val;
3172 Ganglia_host *host;
3173 apr_hash_this(hi, NULL, NULL, &val);
3174 host = val;
3175
3176 if( host_dmax && (now - host->last_heard_from) > (host_dmax * APR_USEC_PER_SEC) )
3177 {
3178 /* this host is older than dmax... delete it */
3179 debug_msg("deleting old host '%s' from host hash'", host->hostname);
3180 /* remove it from the hash */
3181 apr_hash_set( hosts, host->ip, APR_HASH_KEY_STRING, NULL);
3182 /* free all its memory */
3183 if(host->location)
3184 {
3185 free(host->location);
3186 }
3187 apr_pool_destroy( host->pool);
3188 }
3189 else
3190 {
3191 /* this host isn't being deleted but it might have some stale gmetric data */
3192 apr_thread_mutex_lock(host->mutex);
3193 for( metric_hi = apr_hash_first( pool, host->metrics );
3194 metric_hi;
3195 metric_hi = apr_hash_next( metric_hi ))
3196 {
3197 void *val;
3198 Ganglia_metadata *metric;
3199 int dmax;
3200
3201 apr_hash_this( metric_hi, NULL, NULL, &val );
3202 metric = val;
3203
3204 if(!metric || metric->message_u.f_message.id != gmetadata_full)
3205 continue; /* this shouldn't happen */
3206
3207 dmax = metric->message_u.f_message.Ganglia_metadata_msg_u.gfull.metric.dmax;
3208 if( dmax && (now - metric->last_heard_from) > (dmax * APR_USEC_PER_SEC) )
3209 {
3210 Ganglia_metadata *value_metric = (Ganglia_metadata *)apr_hash_get( host->gmetrics,
3211 metric->name,
3212 APR_HASH_KEY_STRING);
3213 /* this is a stale gmetric */
3214 debug_msg("deleting old metric '%s' from host '%s'", metric->name, host->hostname);
3215
3216 /* remove the metric from the metric and values hash */
3217 apr_hash_set( host->metrics, metric->name, APR_HASH_KEY_STRING, NULL);
3218 apr_hash_set( host->gmetrics, metric->name, APR_HASH_KEY_STRING, NULL);
3219 /* destroy any memory that was allocated for this gmetric */
3220 apr_pool_destroy( metric->pool );
3221 if (value_metric)
3222 {
3223 apr_pool_destroy( value_metric->pool );
3224 }
3225 }
3226 }
3227 apr_thread_mutex_unlock(host->mutex);
3228 }
3229 }
3230 apr_thread_mutex_unlock(hosts_mutex);
3231
3232 apr_pool_clear( pool );
3233 }
3234
initialize_scoreboard()3235 void initialize_scoreboard()
3236 {
3237 ganglia_scoreboard_init(global_context);
3238 ganglia_scoreboard_add(PKTS_RECVD_ALL, GSB_READ_RESET);
3239 ganglia_scoreboard_add(PKTS_RECVD_FAILED, GSB_READ_RESET);
3240 ganglia_scoreboard_add(PKTS_RECVD_IGNORED, GSB_READ_RESET);
3241 ganglia_scoreboard_add(PKTS_RECVD_METADATA, GSB_READ_RESET);
3242 ganglia_scoreboard_add(PKTS_RECVD_VALUE, GSB_READ_RESET);
3243 ganglia_scoreboard_add(PKTS_RECVD_REQUEST, GSB_READ_RESET);
3244 ganglia_scoreboard_add(PKTS_SENT_ALL, GSB_READ_RESET);
3245 ganglia_scoreboard_add(PKTS_SENT_FAILED, GSB_READ_RESET);
3246 ganglia_scoreboard_add(PKTS_SENT_METADATA, GSB_READ_RESET);
3247 ganglia_scoreboard_add(PKTS_SENT_VALUE, GSB_READ_RESET);
3248 ganglia_scoreboard_add(PKTS_SENT_REQUEST, GSB_READ_RESET);
3249 }
3250
3251 int done = 0;
3252 int reload_required = 0;
3253
set_reload_required()3254 void set_reload_required()
3255 {
3256 done = 1;
3257 reload_required = 1;
3258 }
3259
sig_handler(int i)3260 void sig_handler(int i)
3261 {
3262 switch(i)
3263 {
3264 case SIGHUP:
3265 set_reload_required();
3266 break;
3267 default:
3268 done = 1;
3269 }
3270 }
3271
tcp_listener(apr_thread_t * thd,void * data)3272 static void* APR_THREAD_FUNC tcp_listener(apr_thread_t *thd, void *data)
3273 {
3274 apr_time_t now;
3275 apr_interval_time_t wait = 100 * 1000; // 100ms
3276
3277 if (tcp_listen_channels == NULL)
3278 return NULL;
3279
3280 debug_msg("[tcp] Starting TCP listener thread...");
3281 for(;!done;)
3282 {
3283 now = apr_time_now();
3284 /* Pull in incoming data */
3285 poll_tcp_listen_channels(wait, now);
3286 }
3287 apr_thread_exit(thd, APR_SUCCESS);
3288
3289 return NULL;
3290 }
3291
3292 int
main(int argc,char * argv[])3293 main ( int argc, char *argv[] )
3294 {
3295 apr_time_t now, next_collection, last_cleanup;
3296 apr_pool_t *cleanup_context;
3297
3298 gmond_argv = argv;
3299
3300 if (cmdline_parser (argc, argv, &args_info) != 0)
3301 exit(EXIT_FAILURE);
3302
3303 if(args_info.convert_given)
3304 {
3305 exit (print_ganglia_25_config( args_info.convert_arg ));
3306 }
3307
3308 /* Create the global context */
3309 global_context = (apr_pool_t*)Ganglia_pool_create(NULL);
3310
3311 /* Create the cleanup context from the global context */
3312 cleanup_context = (apr_pool_t*)Ganglia_pool_create((Ganglia_pool)global_context);
3313
3314 /* Mark the time this gmond started */
3315 started = apr_time_now();
3316
3317 /* Builds a default configuration based on platform */
3318 build_default_gmond_configuration((Ganglia_pool)global_context);
3319
3320 if(args_info.default_config_flag)
3321 {
3322 fprintf(stdout, "%s", default_gmond_configuration);
3323 fflush( stdout );
3324 exit(EXIT_SUCCESS);
3325 }
3326
3327 process_configuration_file();
3328
3329 #ifdef SFLOW
3330 sflow_udp_port = init_sflow(config_file);
3331 #endif
3332
3333 /* Should over-ride any value from the configuration file */
3334 if(args_info.location_given)
3335 {
3336 host_location = args_info.location_arg;
3337 }
3338
3339 load_metric_modules();
3340
3341 if(args_info.metrics_flag)
3342 {
3343 initialize_scoreboard();
3344 setup_metric_callbacks();
3345 print_metric_list();
3346 fflush( stdout );
3347 exit(EXIT_SUCCESS);
3348 }
3349
3350 if(args_info.bandwidth_flag)
3351 {
3352 double bytes_per_sec;
3353 setup_metric_callbacks();
3354 bytes_per_sec = setup_collection_groups();
3355 fprintf(stdout, "%f bytes/sec\n", bytes_per_sec);
3356 exit(EXIT_SUCCESS);
3357 }
3358
3359 daemonize_if_necessary( argv );
3360 if (args_info.pid_file_given)
3361 {
3362 update_pidfile (args_info.pid_file_arg);
3363 }
3364
3365 /* Collect my hostname */
3366 apr_gethostname( myname, APRMAXHOSTLEN+1, global_context);
3367
3368 apr_signal( SIGPIPE, SIG_IGN );
3369 apr_signal( SIGINT, sig_handler );
3370 apr_signal( SIGHUP, sig_handler );
3371 apr_signal( SIGTERM, sig_handler );
3372
3373 initialize_scoreboard();
3374
3375 /* This must occur before we setuid_if_necessary() particularly on freebsd
3376 * where we need to be root to access /dev/mem to initialize metric collection */
3377 setup_metric_callbacks();
3378
3379 setuid_if_necessary();
3380
3381 process_deaf_mute_mode();
3382 process_allow_extra_data_mode();
3383
3384 if(!deaf)
3385 {
3386 setup_listen_channels_pollset();
3387 }
3388
3389 /* even if mute, a send channel may be needed to send a request for metadata */
3390 udp_send_channels = Ganglia_udp_send_channels_create((Ganglia_pool)global_context,
3391 (Ganglia_gmond_config)config_file);
3392 if(!udp_send_channels)
3393 {
3394 /* if there are no send channels defined, we are equivalent to mute */
3395 mute = 1;
3396 }
3397 if(!mute)
3398 {
3399 setup_collection_groups();
3400 }
3401
3402 /* Create the host hash table */
3403 hosts = apr_hash_make( global_context );
3404
3405 /* Create the hosts mutex */
3406 if (apr_thread_mutex_create(&hosts_mutex, APR_THREAD_MUTEX_DEFAULT, global_context) != APR_SUCCESS)
3407 {
3408 err_msg("Failed to create thread mutex. Exiting.\n");
3409 exit(EXIT_FAILURE);
3410 }
3411
3412 /* Initialize time variables */
3413 udp_last_heard = last_cleanup = next_collection = now = apr_time_now();
3414
3415 /* Create TCP listener thread */
3416 if(!deaf)
3417 {
3418 apr_thread_t *thread;
3419 if (apr_thread_create(&thread, NULL, tcp_listener, NULL, global_context) != APR_SUCCESS)
3420 {
3421 err_msg("Failed to create TCP listener thread. Exiting.\n");
3422 exit(EXIT_FAILURE);
3423 }
3424 }
3425
3426 /* Loop */
3427 for(;!done;)
3428 {
3429 /* Make sure we never wait for negative seconds (shouldn't happen) */
3430 apr_interval_time_t wait = next_collection >= now ? next_collection - now : 1;
3431
3432 if (udp_listen_channels != NULL)
3433 {
3434 /* Pull in incoming data */
3435 poll_udp_listen_channels(wait, now);
3436 }
3437 else
3438 {
3439 /* Sleep until next collection */
3440 apr_sleep( wait );
3441 }
3442
3443 /* only continue if it's time to process our collection groups */
3444 now = apr_time_now();
3445 if(now < next_collection)
3446 continue;
3447
3448 if(!deaf)
3449 {
3450 /* if we went deaf, re-subscribe to the multicast channel */
3451 if ((now - udp_last_heard) > 60 * APR_USEC_PER_SEC)
3452 {
3453 /* FIXME: maybe this should be done for the affected
3454 channel only? */
3455 reset_mcast_channels();
3456 /* reset the timer */
3457 udp_last_heard = now;
3458 }
3459
3460 /* cleanup the data if the cleanup threshold has been met */
3461 if( (now - last_cleanup) > apr_time_make(cleanup_threshold,0))
3462 {
3463 cleanup_data( cleanup_context, now );
3464 last_cleanup = now;
3465 }
3466 }
3467
3468 if(!mute)
3469 {
3470 /* collect data from collection_groups */
3471 next_collection = process_collection_groups( now );
3472 }
3473 else
3474 {
3475 /* we're mute. nothing to collect and send. */
3476 next_collection = now + 60 * APR_USEC_PER_SEC;
3477 }
3478 }
3479
3480 apr_pool_destroy(global_context);
3481
3482 if(reload_required == 1)
3483 reload_ganglia_configuration();
3484
3485 return 0;
3486 }
3487