1 #ifdef HAVE_CONFIG_H
2 #include "config.h"
3 #endif
4 #include <ganglia.h> /* for the libgmond messaging */
5 #include <gm_metric.h>
6 
7 #include <stdio.h>
8 #include <stdlib.h>
9 #include <unistd.h>
10 #include <syslog.h>
11 #include <math.h>
12 #ifdef SOLARIS
13 #define fabsf(f) ((float)fabs(f))
14 #endif
15 #ifdef _AIX
16 #ifndef _AIX52
17 /* _AIX52 is defined on all versions of AIX >= 5.2
18    fabsf doesn't exist on versions prior to 5.2 */
19 #define fabsf(f) ((float)fabs(f))
20 #endif
21 #endif
22 #ifdef LINUX
23 #include <sys/utsname.h>
24 #endif
25 #include <zlib.h>
26 
27 #include <apr.h>
28 #include <apr_strings.h>
29 #include <apr_hash.h>
30 #include <apr_time.h>
31 #include <apr_pools.h>
32 #include <apr_poll.h>
33 #include <apr_network_io.h>
34 #include <apr_signal.h>
35 #include <apr_thread_proc.h>
36 #include <apr_tables.h>
37 #include <apr_dso.h>
38 #include <apr_version.h>
39 
40 #ifdef HAVE_LIBPCRE
41 #if defined (HAVE_PCRE_PCRE_H)
42 #include <pcre/pcre.h>
43 #else
44 #include <pcre.h>
45 #endif
46 #endif
47 
48 #include "cmdline.h"   /* generated by cmdline.sh which runs gengetopt */
49 #include "become_a_nobody.h"
50 #include "apr_net.h"   /* our private network functions based on apr */
51 #include "dtd.h"       /* the DTD definition for our XML */
52 #include "g25_config.h" /* for converting old file formats to new */
53 #include "update_pidfile.h"
54 #include "gm_scoreboard.h"
55 #include "ganglia_priv.h"
56 
57 /* Specifies a single value metric callback */
58 #define CB_NOINDEX -1
59 
60 /* If a bind fails, and retry_bind is true, this is the interval to sleep
61    before retry.  Specified in seconds */
62 #define RETRY_BIND_DELAY 60
63 
64 /* The key in the apr_socket_t struct where our gzipped data is stored */
65 #define GZIP_KEY "gzip"
66 
67 /* When this gmond was started */
68 apr_time_t started;
69 /* My name */
70 char myname[APRMAXHOSTLEN+1];
71 /* The commandline options */
72 struct gengetopt_args_info args_info;
73 /* The configuration file */
74 cfg_t *config_file;
75 /* The debug level (in debug_msg.c) */
76 static int debug_level;
77 /* The global context */
78 apr_pool_t *global_context = NULL;
79 /* Deaf mode boolean */
80 int deaf;
81 /* Mute mode boolean */
82 int mute;
83 /* Allow extra data boolean */
84 int allow_extra_data;
85 /* last time we received any data */
86 apr_time_t udp_last_heard;
87 /* Cluster tag boolean */
88 int cluster_tag = 0;
89 /* This host's location */
90 char *host_location = NULL;
91 /* This host name, spoofed */
92 char *override_hostname = NULL;
93 /* This host ip, spoofed */
94 char *override_ip = NULL;
95 /* Tags */
96 char *tags = NULL;
97 /* Boolean. Will this host received gexec requests? */
98 int gexec_on = 0;
99 /* This is tweakable by globals{max_udp_msg_len=...} */
100 int max_udp_message_len = 1472;
101 /* The default configuration for gmond.  Found in conf.c. */
102 extern char *default_gmond_configuration;
103 /* The number of seconds to hold "dead" hosts in the hosts hash */
104 int host_dmax = 0;
105 /* The number of seconds to wait for a message before considering it down */
106 int host_tmax = 20;
107 /* The amount of time between cleanups */
108 int cleanup_threshold = 300;
109 /* Time interval before send another metadata packet */
110 int send_metadata_interval = 0;
111 /* The directory where DSO modules are located */
112 char *module_dir = NULL;
113 
114 /* The array for outgoing UDP message channels */
115 Ganglia_udp_send_channels udp_send_channels = NULL;
116 
117 /* TODO: The array for outgoing TCP message channels (later) */
118 apr_array_header_t *tcp_send_array = NULL;
119 
120 enum Ganglia_action_types {
121   GANGLIA_ACCESS_DENY = 0,
122   GANGLIA_ACCESS_ALLOW = 1
123 };
124 typedef enum Ganglia_action_types Ganglia_action_types;
125 
126 /* This is the structure used for the access control lists */
127 struct Ganglia_access {
128   apr_ipsubnet_t *ipsub;
129   Ganglia_action_types action;
130 };
131 typedef struct Ganglia_access Ganglia_access;
132 
133 struct Ganglia_acl {
134   apr_array_header_t *access_array;
135   Ganglia_action_types default_action;
136 };
137 typedef struct Ganglia_acl Ganglia_acl;
138 
139 /* This is the channel definitions */
140 enum Ganglia_channel_types {
141   TCP_ACCEPT_CHANNEL,
142   UDP_RECV_CHANNEL
143 };
144 typedef enum Ganglia_channel_types Ganglia_channel_types;
145 
146 struct Ganglia_channel {
147   Ganglia_channel_types type;
148   Ganglia_acl *acl;
149   int timeout;
150   int gzip_output;
151 };
152 typedef struct Ganglia_channel Ganglia_channel;
153 
154 /* Two separate pollsets hold the tcp_accept and udp_recv channels */
155 apr_pollset_t *udp_listen_channels = NULL;
156 apr_pollset_t *tcp_listen_channels = NULL;
157 
158 /* These are the TCP listen channels */
159 apr_socket_t **tcp_sockets = NULL;
160 /* These are the UDP sockets */
161 apr_socket_t **udp_recv_sockets = NULL;
162 
163 /* The hash to hold the hosts (key = host IP) */
164 apr_hash_t *hosts = NULL;
165 apr_thread_mutex_t *hosts_mutex = NULL;
166 /* The "hosts" hash contains values of type "hostdata" */
167 
168 #ifdef SFLOW
169 #include "sflow.h"
170 uint16_t sflow_udp_port = SFLOW_IANA_REGISTERED_PORT;
171 #endif
172 
173 #include "gmond_internal.h"
174 
175 /* This is the structure of the data save to each host->metric hash */
176 struct Ganglia_metadata {
177   /* The pool used for allocating memory */
178   apr_pool_t *pool;
179   /* The name of the metric */
180   char *name;
181   union {
182       /* The ganglia message */
183       Ganglia_metadata_msg f_message;
184       Ganglia_value_msg v_message;
185   } message_u;
186   /* Last heard from */
187   apr_time_t last_heard_from;
188 };
189 typedef struct Ganglia_metadata Ganglia_metadata;
190 
191 /* The hash to hold the metrics available on this platform */
192 apr_hash_t *metric_callbacks = NULL;
193 
194 /* The "metrics" hash contains values of type "Ganglia_metric_callback" */
195 /* This is where libmetrics meets gmond */
196 struct Ganglia_metric_callback {
197    char *name;          /* metric name */
198    float value_threshold;/* the value threshold */
199    char *title;         /* Altername metric name or short description */
200    Ganglia_25metric *info;/* the information about this metric */
201    metric_func_void cb; /* callback function (deprecated) */
202    metric_func cbindexed; /* multi-metric callback function */
203    g_val_t now;         /* the current value */
204    g_val_t last;        /* the last value */
205    Ganglia_value_msg msg;     /* the message to send */
206    mmodule *modp;       /* dynamic module info struct */
207    int multi_metric_index; /* index identifying which metric is wanted */
208    apr_time_t metadata_last_sent; /* when the metadata was last sent */
209 };
210 typedef struct Ganglia_metric_callback Ganglia_metric_callback;
211 
212 /* This is the structure of a collection group */
213 struct Ganglia_collection_group {
214   apr_time_t next_collect;  /* When to collect next */
215   apr_time_t next_send;     /* When to send next (tmax) */
216   int once;
217   int collect_every;
218   int time_threshold;
219   apr_array_header_t *metric_array;
220 };
221 typedef struct Ganglia_collection_group Ganglia_collection_group;
222 
223 /* This is the array of collection groups that we are processing... */
224 apr_array_header_t *collection_groups = NULL;
225 
226 mmodule *metric_modules = NULL;
227 extern int daemon_proc; /* defined in error.c */
228 
229 char **gmond_argv;
230 extern char **environ;
231 
232 /* apr_socket_send can't assure all characters in buf been sent. */
233 static apr_status_t
socket_send_raw(apr_socket_t * sock,const char * buf,apr_size_t * len)234 socket_send_raw(apr_socket_t *sock, const char *buf, apr_size_t *len)
235 {
236   apr_size_t total = *len;
237   apr_size_t thisTime = total;
238   const char* p = buf;
239   apr_status_t ret;
240   for(ret=apr_socket_send(sock, p, &thisTime); ret == APR_SUCCESS;
241           ret=apr_socket_send(sock, p, &thisTime))
242     {
243 
244       if(thisTime < total)
245         {
246           total -= thisTime;
247           p += thisTime;
248           thisTime = total;
249         }
250       else
251           break;
252     }
253   return ret;
254 }
255 
256 /* wrap socket_send_raw with gzip deflate if enabled. */
257 static apr_status_t
socket_send(apr_socket_t * sock,const char * buf,apr_size_t * len)258 socket_send(apr_socket_t *sock, const char *buf, apr_size_t *len)
259 {
260   char outputbuffer[2048];
261   const int outputlen = sizeof(outputbuffer);
262   apr_size_t wlen;
263   apr_status_t ret;
264   z_stream *strm;
265   int z_ret;
266 
267   ret = apr_socket_data_get((void**)&strm, GZIP_KEY, sock);
268   if (ret != APR_SUCCESS)
269   {
270     return ret;
271   }
272 
273   if (!strm)
274   {
275     ret = socket_send_raw( sock, buf, len );
276   }
277   else
278   {
279     strm->next_in = (Bytef *)buf;
280     strm->avail_in = *len;
281 
282     while( strm->avail_in )
283     {
284       strm->next_out = (Bytef *)outputbuffer;
285       strm->avail_out = outputlen;
286 
287       z_ret = deflate( strm, 0 );
288       if (z_ret != Z_OK)
289       {
290         return APR_ENOMEM;
291       }
292 
293       wlen = outputlen - strm->avail_out;
294       if( wlen )
295       {
296         ret = socket_send_raw( sock, outputbuffer, &wlen );
297         if(ret != APR_SUCCESS)
298         {
299           return ret;
300         }
301       }
302     }
303   }
304 
305   return ret;
306 }
307 
308 /* Reload the Ganglia configuration */
309 void
reload_ganglia_configuration(void)310 reload_ganglia_configuration(void)
311 {
312   int i = 0;
313   char *gmond_bin = gmond_argv[0];
314 
315   if(udp_listen_channels != NULL)
316     apr_pollset_destroy(udp_listen_channels);
317   if(tcp_listen_channels != NULL)
318     apr_pollset_destroy(tcp_listen_channels);
319   if(tcp_sockets != NULL)
320     for(i = 0; tcp_sockets[i] != 0; i++)
321       apr_socket_close(tcp_sockets[i]);
322   if(udp_recv_sockets != NULL)
323     for(i = 0; udp_recv_sockets[i] != 0; i++)
324       apr_socket_close(udp_recv_sockets[i]);
325   debug_msg("reloading %s", gmond_bin);
326 #ifndef CYGWIN
327   /* To do: over-ride some config opts:
328      - tell new process not to re-daemonize
329      - tell new process not to setuid
330      Any paths in gmond_argv must be absolute or relative to / */
331   execve(gmond_bin, gmond_argv, environ);
332 #else
333   /* Exit and let Windows service manager restart the process, as
334      neither Cygwin nor apr provide a perfect equivalent to execve */
335   exit(EXIT_SUCCESS);
336 #endif
337   err_msg("execve failed to reload %s: %s", gmond_bin, strerror(errno));
338   exit(EXIT_FAILURE);
339 }
340 
341 /* this is just a temporary function */
342 void
process_configuration_file(void)343 process_configuration_file(void)
344 {
345   cfg_t *tmp;
346 
347   /* this is a global for now */
348   config_file = (cfg_t*)Ganglia_gmond_config_create( args_info.conf_arg, !args_info.conf_given );
349 
350   /* Initialize a few variables */
351   tmp = cfg_getsec( config_file, "globals");
352   /* Get the maximum UDP message size */
353   max_udp_message_len = cfg_getint( tmp, "max_udp_msg_len");
354   /* Get the gexec status requested */
355   gexec_on            = cfg_getbool(tmp, "gexec");
356   /* Get the host dmax ... */
357   host_dmax           = cfg_getint( tmp, "host_dmax");
358   /* Get the host tmax ... */
359   host_tmax           = cfg_getint( tmp, "host_tmax");
360   /* Get the cleanup threshold */
361   cleanup_threshold   = cfg_getint( tmp, "cleanup_threshold");
362   /* Get the send meta data packet interval */
363   send_metadata_interval = cfg_getint( tmp, "send_metadata_interval");
364   /* Get the DSO module dir */
365   module_dir = cfg_getstr(tmp, "module_dir");
366   /* Acquire spoof name/ip, if they are specified */
367   override_hostname = cfg_getstr(tmp, "override_hostname");
368   override_ip = cfg_getstr(tmp, "override_ip");
369 
370   /* Any tags for this host */
371   tags = cfg_getstr(tmp, "tags");
372 
373   /* Commandline for debug_level trumps configuration file behaviour ... */
374   if (args_info.debug_given)
375     {
376       debug_level = args_info.debug_arg;
377     }
378   else
379     {
380       debug_level = cfg_getint ( tmp, "debug_level");
381     }
382   set_debug_msg_level(debug_level);
383 
384 }
385 
386 static void
daemonize_if_necessary(char * argv[])387 daemonize_if_necessary( char *argv[] )
388 {
389   int should_daemonize;
390   cfg_t *tmp;
391   tmp = cfg_getsec( config_file, "globals");
392   should_daemonize = cfg_getbool( tmp, "daemonize");
393 
394   /* Daemonize if needed */
395   if(!args_info.foreground_flag && should_daemonize && !debug_level)
396     {
397       char *cwd;
398 
399       apr_filepath_get(&cwd, 0, global_context);
400       apr_proc_detach(1);
401       apr_filepath_set(cwd, global_context);
402 
403       /* enable errmsg logging to syslog */
404       daemon_proc = 1;
405       openlog (argv[0], LOG_PID, LOG_DAEMON);
406     }
407 }
408 
409 static void
setuid_if_necessary(void)410 setuid_if_necessary( void )
411 {
412   cfg_t *tmp;
413   int setuid;
414 #ifndef CYGWIN
415   char *user;
416 #endif
417 
418   tmp    = cfg_getsec( config_file, "globals");
419   setuid = cfg_getbool( tmp, "setuid" );
420   if(setuid)
421     {
422 #ifdef CYGWIN
423       fprintf(stderr,"Windows does not support setuid.\n");
424 #else
425       user = cfg_getstr(tmp, "user" );
426       become_a_nobody(user);
427 #endif
428     }
429 }
430 
431 static void
process_deaf_mute_mode(void)432 process_deaf_mute_mode( void )
433 {
434   cfg_t *tmp = cfg_getsec( config_file, "globals");
435   deaf =       cfg_getbool( tmp, "deaf");
436   mute =       cfg_getbool( tmp, "mute");
437   if(deaf && mute)
438     {
439       err_msg("Configured to run both deaf and mute. Nothing to do. Exiting.\n");
440       exit(EXIT_FAILURE);
441     }
442 }
443 
444 static void
process_allow_extra_data_mode(void)445 process_allow_extra_data_mode( void )
446 {
447   cfg_t *tmp = cfg_getsec( config_file, "globals");
448   allow_extra_data = cfg_getbool( tmp, "allow_extra_data");
449 }
450 
451 static Ganglia_acl *
Ganglia_acl_create(cfg_t * channel,apr_pool_t * pool)452 Ganglia_acl_create ( cfg_t *channel, apr_pool_t *pool )
453 {
454   apr_status_t status;
455   Ganglia_acl *acl = NULL;
456   cfg_t *acl_config;
457   char *default_action;
458   int num_access = 0;
459   int i;
460 
461   if(!channel || !pool)
462     {
463       return acl;
464     }
465 
466 
467   acl_config = cfg_getsec(channel, "acl");
468   if(!acl_config)
469     {
470       return acl;
471     }
472 
473   /* Find out the number of access entries */
474   num_access = cfg_size( acl_config, "access" );
475   if(!num_access)
476     {
477       return acl;
478     }
479 
480   /* Create a new ACL from the pool */
481   acl = apr_pcalloc( pool, sizeof(Ganglia_acl));
482   if(!acl)
483     {
484       err_msg("Unable to allocate memory for ACL. Exiting.\n");
485       exit(EXIT_FAILURE);
486     }
487 
488   default_action = cfg_getstr( acl_config, "default");
489   if(!apr_strnatcasecmp( default_action, "deny"))
490     {
491       acl->default_action = GANGLIA_ACCESS_DENY;
492     }
493   else if(!apr_strnatcasecmp( default_action, "allow"))
494     {
495       acl->default_action = GANGLIA_ACCESS_ALLOW;
496     }
497   else
498     {
499       err_msg("Invalid default ACL '%s'. Exiting.\n", default_action);
500       exit(EXIT_FAILURE);
501     }
502 
503   /* Create an array to hold each of the access instructions */
504   acl->access_array  = apr_array_make( pool, num_access, sizeof(Ganglia_acl *));
505   if(!acl->access_array)
506     {
507       err_msg("Unable to malloc access array. Exiting.\n");
508       exit(EXIT_FAILURE);
509     }
510   for(i=0; i< num_access; i++)
511     {
512       cfg_t *access_config   = cfg_getnsec( acl_config, "access", i);
513       Ganglia_access *access = apr_pcalloc( pool, sizeof(Ganglia_access));
514       char *ip, *mask, *action;
515 
516       if(!access_config)
517         {
518           /* This shouldn't happen unless maybe acl is empty and
519            * the safest thing to do it exit */
520           err_msg("Unable to process ACLs. Exiting.\n");
521           exit(EXIT_FAILURE);
522         }
523 
524       ip     = cfg_getstr( access_config, "ip");
525       mask   = cfg_getstr( access_config, "mask");
526       action = cfg_getstr( access_config, "action");
527       if(!ip && !mask && !action)
528         {
529           err_msg("An access record requires an ip, mask and action. Exiting.\n");
530           exit(EXIT_FAILURE);
531         }
532 
533       /* Process the action first */
534       if(!apr_strnatcasecmp( action, "deny" ))
535         {
536           access->action = GANGLIA_ACCESS_DENY;
537         }
538       else if(!apr_strnatcasecmp( action, "allow"))
539         {
540           access->action = GANGLIA_ACCESS_ALLOW;
541         }
542       else
543         {
544           err_msg("ACL access entry has action '%s'. Must be deny|allow. Exiting.\n", action);
545           exit(EXIT_FAILURE);
546         }
547 
548       /* Create the subnet */
549       access->ipsub = NULL;
550       status = apr_ipsubnet_create( &(access->ipsub), ip, mask, pool);
551       if(status != APR_SUCCESS)
552         {
553           err_msg("ACL access entry has invalid ip('%s')/mask('%s'). Exiting.\n", ip, mask);
554           exit(EXIT_FAILURE);
555         }
556 
557       /* Save this access entry to the acl */
558       *(Ganglia_access **)apr_array_push( acl->access_array ) = access;
559     }
560   return acl;
561 }
562 
563 
564 static int
Ganglia_acl_action(Ganglia_acl * acl,apr_sockaddr_t * addr)565 Ganglia_acl_action( Ganglia_acl *acl, apr_sockaddr_t *addr )
566 {
567   int i;
568 
569   if(!acl)
570     {
571       /* If no ACL is specified, we assume there is no access control */
572       return GANGLIA_ACCESS_ALLOW;
573     }
574 
575   for(i=0; i< acl->access_array->nelts; i++)
576     {
577       Ganglia_access *access = ((Ganglia_access **)(acl->access_array->elts))[i];
578       if(!apr_ipsubnet_test( access->ipsub, addr ))
579         {
580           /* no action will occur because addr is not in this subnet */
581           continue;
582         }
583       else
584         {
585           return access->action;
586         }
587     }
588 
589   /* No matches in the access list so we return the default */
590   return acl->default_action;
591 }
592 
593 static int32_t
get_sock_family(char * family)594 get_sock_family( char *family )
595 {
596   if( strchr( family, '4' ))
597     {
598       return APR_INET;
599     }
600   else if( strchr( family, '6'))
601     {
602 #if APR_INET6
603       return APR_INET6;
604 #else
605       err_msg("IPv6 is not supported on this host. Exiting.\n");
606       exit(EXIT_FAILURE);
607 #endif
608     }
609 
610   err_msg("Unknown family '%s'. Try inet4|inet6. Exiting.\n", family);
611   exit(EXIT_FAILURE);
612   /* shouldn't get here */
613   return APR_UNSPEC;
614 }
615 
616 static void
reset_mcast_channels(void)617 reset_mcast_channels( void )
618 {
619   int i;
620   int num_udp_recv_channels   = cfg_size( config_file, "udp_recv_channel");
621 
622   for(i = 0; i< num_udp_recv_channels; i++)
623     {
624       cfg_t *udp_recv_channel;
625       char *mcast_join, *mcast_if;
626       int port;
627       apr_socket_t *socket = NULL;
628 
629       udp_recv_channel = cfg_getnsec( config_file, "udp_recv_channel", i);
630       mcast_join     = cfg_getstr( udp_recv_channel, "mcast_join" );
631       mcast_if       = cfg_getstr( udp_recv_channel, "mcast_if" );
632       port           = cfg_getint( udp_recv_channel, "port");
633 
634       if ( mcast_join )
635         {
636           socket = udp_recv_sockets[i];
637           join_mcast(global_context, socket, mcast_join, port, mcast_if);
638         }
639     }
640 }
641 
642 static void
setup_listen_channels_pollset(void)643 setup_listen_channels_pollset( void )
644 {
645   apr_status_t status;
646   int i;
647   int num_udp_recv_channels   = cfg_size( config_file, "udp_recv_channel");
648   int num_tcp_accept_channels = cfg_size( config_file, "tcp_accept_channel");
649   int total_listen_channels   = num_udp_recv_channels + num_tcp_accept_channels;
650   Ganglia_channel *channel;
651   int pollset_opts = 0;
652 
653   /* check if gmond was really meant to be deaf */
654   if (total_listen_channels == 0)
655     {
656       deaf = 1;
657       return;
658     }
659 
660   /* Create my incoming pollset */
661 #ifdef LINUX
662   struct utsname _name;
663   if(uname(&_name) >= 0) {
664     if(strcmp(_name.release, "2.6") >= 0)
665       pollset_opts = APR_POLLSET_THREADSAFE;
666   }
667 #endif
668   if (num_udp_recv_channels > 0) {
669     if((status = apr_pollset_create(&udp_listen_channels, num_udp_recv_channels, global_context, pollset_opts)) != APR_SUCCESS)
670       {
671         char apr_err[512];
672         apr_strerror(status, apr_err, 511);
673         err_msg("apr_pollset_create failed: %s", apr_err);
674         exit(EXIT_FAILURE);
675       }
676   }
677 
678   if (num_tcp_accept_channels > 0) {
679     if((status = apr_pollset_create(&tcp_listen_channels, num_tcp_accept_channels, global_context, pollset_opts)) != APR_SUCCESS)
680       {
681         char apr_err[512];
682         apr_strerror(status, apr_err, 511);
683         err_msg("apr_pollset_create failed: %s", apr_err);
684         exit(EXIT_FAILURE);
685       }
686   }
687 
688   if((udp_recv_sockets = (apr_socket_t **)apr_pcalloc(global_context, sizeof(apr_socket_t *) * (num_udp_recv_channels + 1))) == NULL)
689     err_quit("unable to allocate UDP listening sockets");
690 
691   /* Process all the udp_recv_channels */
692   for(i = 0; i< num_udp_recv_channels; i++)
693     {
694       cfg_t *udp_recv_channel;
695       char *mcast_join, *mcast_if, *bindaddr, *family;
696       int port, retry_bind, buffer;
697       apr_socket_t *socket = NULL;
698       apr_pollfd_t socket_pollfd;
699       apr_pool_t *pool = NULL;
700       int32_t sock_family = APR_INET;
701       apr_int32_t rx_buf_sz;
702       socklen_t _optlen;
703 
704       udp_recv_channel = cfg_getnsec( config_file, "udp_recv_channel", i);
705       mcast_join     = cfg_getstr( udp_recv_channel, "mcast_join" );
706       mcast_if       = cfg_getstr( udp_recv_channel, "mcast_if" );
707       port           = cfg_getint( udp_recv_channel, "port");
708       bindaddr       = cfg_getstr( udp_recv_channel, "bind");
709       family         = cfg_getstr( udp_recv_channel, "family");
710       retry_bind     = cfg_getbool( udp_recv_channel, "retry_bind");
711       buffer         = cfg_getint( udp_recv_channel, "buffer");
712 
713       debug_msg("udp_recv_channel mcast_join=%s mcast_if=%s port=%d bind=%s buffer=%d",
714                 mcast_join? mcast_join:"NULL",
715                 mcast_if? mcast_if:"NULL", port,
716                 bindaddr? bindaddr: "NULL", buffer);
717 
718 
719       /* Create a sub-pool for this channel */
720       apr_pool_create(&pool, global_context);
721 
722       sock_family = get_sock_family(family);
723 
724       if( mcast_join )
725         {
726           /* Listen on the specified multicast channel */
727           socket = create_mcast_server(pool, sock_family, mcast_join, port, bindaddr, mcast_if );
728 
729           while(!socket)
730             {
731               if(retry_bind == cfg_false)
732                 {
733                   err_msg("Error creating multicast server mcast_join=%s port=%d mcast_if=%s family='%s'. Try setting retry_bind.  Exiting.\n",
734                   mcast_join? mcast_join: "NULL", port, mcast_if? mcast_if:"NULL",family);
735                   exit(EXIT_FAILURE);
736                 }
737               err_msg("Error creating multicast server mcast_join=%s port=%d mcast_if=%s family='%s'.  Will try again...\n",
738                   mcast_join? mcast_join: "NULL", port, mcast_if? mcast_if:"NULL",family);
739               apr_sleep(APR_USEC_PER_SEC * RETRY_BIND_DELAY);
740               socket = create_mcast_server(pool, sock_family, mcast_join, port, bindaddr, mcast_if );
741             }
742         }
743       else
744         {
745           /* Create a UDP server */
746           socket = create_udp_server( pool, sock_family, port, bindaddr );
747           while(!socket)
748             {
749               if(retry_bind == cfg_false)
750                 {
751                   err_msg("Error creating UDP server on port %d bind=%s.  Try setting retry_bind.  Exiting.\n",
752                     port, bindaddr? bindaddr: "unspecified");
753                   exit(EXIT_FAILURE);
754                 }
755               err_msg("Error creating UDP server on port %d bind=%s.  Will try again...\n",
756                   port, bindaddr? bindaddr: "unspecified");
757               apr_sleep(APR_USEC_PER_SEC * RETRY_BIND_DELAY);
758               socket = create_udp_server( pool, sock_family, port, bindaddr );
759             }
760         }
761 
762       if(buffer)
763         {
764           debug_msg("setting UDP socket receive buffer to: %d\n", (apr_int32_t) buffer);
765           if(apr_socket_opt_set(socket, APR_SO_RCVBUF, (apr_int32_t) buffer) == APR_SUCCESS)
766             {
767               debug_msg("APR reports success setting APR_SO_RCVBUF to: %d\n", (apr_int32_t)buffer );
768 
769               /* RB: Let's check if it actually worked to be sure */
770               _optlen = sizeof(rx_buf_sz);
771               if(getsockopt(get_apr_os_socket(socket), SOL_SOCKET, SO_RCVBUF,
772                               &rx_buf_sz, &_optlen) == 0)
773                 {
774                   debug_msg("socket created, SO_RCVBUF = %d\n", rx_buf_sz);
775 
776                   if(buffer)
777                     {
778                       /* RB: getsockopt() returns double SO_RCVBUF since kernel reserves overhead space */
779                       if(rx_buf_sz!=(buffer*2))
780                         {
781                           err_msg("Error setting UDP receive buffer for port %d bind=%s to size: %d.\n",
782                             port, bindaddr? bindaddr: "unspecified", (apr_int32_t) buffer);
783                           err_msg("Reported buffer size by OS: %d : does not match config setting %d.\n",
784                             (int) (rx_buf_sz/2), (int) buffer);
785                           err_msg("NOTE: only supported on systems that have Apache Portable Runtime library version 0.9.4 or higher.\n");
786                           err_msg("Check Operating System (kernel) limits, change or disable buffer size. Exiting.\n");
787                           exit(EXIT_FAILURE);
788                         }
789                       else
790                         { /* RB: Eureka */
791                           debug_msg("Actual receive buffer size reported by OS matches config setting. Success.");
792                         }
793                     }
794                 }
795               else
796                 {
797                   err_msg("Unable to verify UDP receive buffer for port %d bind=%s to size: %d. Check Operating System (limits) or change buffer size. Exiting.\n",
798                            port, bindaddr? bindaddr: "unspecified", buffer);
799                   exit(EXIT_FAILURE);
800                 }
801             }
802           else
803             {
804               err_msg("Error setting UDP receive buffer for port %d bind=%s to size: %d. Check Operating System limits or change buffer size. Exiting.\n",
805                 port, bindaddr? bindaddr: "unspecified", (apr_int32_t) buffer);
806               err_msg("This is currently only supported on systems that have Apache Portable Runtime library version 0.9.4 or higher.\n");
807               err_msg("Check Operating System (kernel) limits, change or disable buffer size. Exiting.\n");
808               exit(EXIT_FAILURE);
809             }
810         }
811 
812       /* Find out about the RX socket buffer
813          This is logged to help people troubleshoot
814          Some users have observed messages about errors when sending
815          or receiving metric packets, and a small buffer size
816          could be an issue */
817       /* RB: Just log this for debugging purposes now */
818       _optlen = sizeof(rx_buf_sz);
819       if(getsockopt(get_apr_os_socket(socket), SOL_SOCKET, SO_RCVBUF,
820                       &rx_buf_sz, &_optlen) == 0)
821         {
822           debug_msg("socket created, SO_RCVBUF = %d\n", rx_buf_sz);
823         }
824       else
825         {
826           debug_msg("getsockopt SO_RCVBUF failed\n");
827         }
828 
829       /* Build the socket poll file descriptor structure */
830       socket_pollfd.desc_type   = APR_POLL_SOCKET;
831       socket_pollfd.reqevents   = APR_POLLIN;
832       socket_pollfd.desc.s      = socket;
833 
834       udp_recv_sockets[i] = socket;
835 
836       channel = apr_pcalloc( pool, sizeof(Ganglia_channel));
837       if(!channel)
838         {
839           err_msg("Unable to malloc memory for channel.  Exiting. \n");
840           exit(EXIT_FAILURE);
841         }
842 
843       /* Mark this channel as a udp_recv_channel */
844       channel->type = UDP_RECV_CHANNEL;
845 
846       /* Make sure this socket never blocks */
847       channel->timeout = 0;
848       apr_socket_timeout_set( socket, channel->timeout);
849 
850       /* Save the ACL information */
851       channel->acl = Ganglia_acl_create ( udp_recv_channel, pool );
852 
853       /* Save the pointer to this socket specific data */
854       socket_pollfd.client_data = channel;
855 
856       /* Add the socket to the pollset */
857       status = apr_pollset_add(udp_listen_channels, &socket_pollfd);
858       if(status != APR_SUCCESS)
859         {
860           err_msg("Failed to add socket to pollset. Exiting.\n");
861           exit(EXIT_FAILURE);
862         }
863     }
864 
865   if ((tcp_sockets = (apr_socket_t **)apr_pcalloc(global_context, sizeof(apr_socket_t *) * (num_tcp_accept_channels + 1))) == NULL)
866     err_quit("Unable to allocate TCP listening sockets");
867 
868   /* Process all the tcp_accept_channels */
869   for(i=0; i< num_tcp_accept_channels; i++)
870     {
871       cfg_t *tcp_accept_channel = cfg_getnsec( config_file, "tcp_accept_channel", i);
872       char *bindaddr, *interface, *family;
873       int port, timeout, gzip_output;
874       apr_socket_t *socket = NULL;
875       apr_pollfd_t socket_pollfd;
876       apr_pool_t *pool = NULL;
877       int32_t sock_family;
878 
879       port           = cfg_getint( tcp_accept_channel, "port");
880       bindaddr       = cfg_getstr( tcp_accept_channel, "bind");
881       interface      = cfg_getstr( tcp_accept_channel, "interface");
882       timeout        = cfg_getint( tcp_accept_channel, "timeout");
883       family         = cfg_getstr( tcp_accept_channel, "family");
884       gzip_output     = cfg_getbool( tcp_accept_channel, "gzip_output");
885 
886       debug_msg("tcp_accept_channel bind=%s port=%d gzip_output=%d",
887                 bindaddr? bindaddr: "NULL", port, gzip_output);
888 
889       /* Create a subpool context */
890       apr_pool_create(&pool, global_context);
891 
892       sock_family = get_sock_family(family);
893 
894       /* Create the socket for the channel, blocking w/timeout */
895       socket = create_tcp_server(pool, sock_family, port, bindaddr,
896                                  interface, 1, gzip_output);
897       if(!socket)
898         {
899           err_msg("Unable to create tcp_accept_channel. Exiting.\n");
900           exit(EXIT_FAILURE);
901         }
902 
903       tcp_sockets[i] = socket;
904 
905       /* Build the socket poll file descriptor structure */
906       socket_pollfd.desc_type   = APR_POLL_SOCKET;
907       socket_pollfd.reqevents   = APR_POLLIN;
908       socket_pollfd.desc.s      = socket;
909 
910       channel = apr_pcalloc( pool, sizeof(Ganglia_channel));
911       if(!channel)
912         {
913           err_msg("Unable to malloc data for channel. Exiting.\n");
914           exit(EXIT_FAILURE);
915         }
916 
917       channel->type = TCP_ACCEPT_CHANNEL;
918 
919       /* Save the timeout for this socket */
920       channel->timeout = timeout;
921 
922       // Does channel support gzip
923       channel->gzip_output = gzip_output;
924 
925       /* Save the ACL information */
926       channel->acl = Ganglia_acl_create( tcp_accept_channel, pool );
927 
928       /* Save the pointer to this channel data */
929       socket_pollfd.client_data = channel;
930 
931       /* Add the socket to the pollset */
932       status = apr_pollset_add(tcp_listen_channels, &socket_pollfd);
933       if(status != APR_SUCCESS)
934          {
935             err_msg("Failed to add socket to pollset. Exiting.\n");
936             exit(EXIT_FAILURE);
937          }
938     }
939 }
940 
sanitize_metric_name(char * metric_name,int is_spoof_msg)941 void sanitize_metric_name(char *metric_name, int is_spoof_msg)
942 {
943     if (metric_name == NULL) return;
944     if (strlen(metric_name) < 1) return;
945     char *p = metric_name;
946     while (p < (metric_name + strlen(metric_name))) {
947         if (
948                !(*p >= 'A' && *p <= 'Z')
949             && !(*p >= 'a' && *p <= 'z')
950             && !(*p >= '0' && *p <= '9')
951             && (*p != '_')
952             && (*p != '-')
953             && (*p != '.')
954             && (*p == ':' && !is_spoof_msg)
955             && (*p != '\0')
956            ) {
957             *p = '_';
958         }
959         p++;
960     }
961 }
962 
963 
964 static void
get_metric_names(Ganglia_metric_id * metric_id,char ** metricName,char ** realName)965 get_metric_names (Ganglia_metric_id *metric_id, char **metricName, char **realName)
966 {
967     char *firstName=NULL, *secondName=NULL, *buff=NULL;
968     int name_len;
969 
970     *metricName = *realName = NULL;
971     firstName = metric_id->name;
972 
973     if (metric_id->spoof)
974       {
975         name_len = strlen(firstName);
976         buff = malloc(name_len + 1);
977         strncpy(buff, firstName, name_len + 1);
978         firstName = buff;
979         secondName = strchr(buff + 1, ':');
980         if(secondName)
981           {
982             *secondName = 0;
983             secondName++;
984           }
985       }
986 
987     if (firstName) {
988         *metricName = strdup(firstName);
989         if (secondName) {
990             *realName = strdup(secondName);
991         }
992     }
993 
994     if (buff)
995         free(buff);
996     return;
997 }
998 
999 Ganglia_host *
Ganglia_host_get(char * remIP,apr_sockaddr_t * sa,Ganglia_metric_id * metric_id)1000 Ganglia_host_get( char *remIP, apr_sockaddr_t *sa, Ganglia_metric_id *metric_id)
1001 {
1002   apr_status_t status;
1003   Ganglia_host *hostdata;
1004   apr_pool_t *pool;
1005   char *hostname = NULL;
1006   char *remoteip = remIP;
1007   char *buff = NULL;
1008 
1009   if(!remoteip || !sa)
1010     {
1011       return NULL;
1012     }
1013 
1014   /* split out the spoofed host name and ip address so that it can
1015    * be used to get the spoofed host. */
1016   if(metric_id && metric_id->spoof)
1017     {
1018       char *spoofName;
1019       char *spoofIP;
1020       int spoof_info_len;
1021 
1022       spoof_info_len = strlen(metric_id->host);
1023       buff = malloc(spoof_info_len+1);
1024       strncpy(buff, metric_id->host, spoof_info_len + 1);
1025       spoofIP = buff;
1026       if( !(spoofName = strchr(buff+1,':')) ){
1027           err_msg("Incorrect format for spoof argument. exiting.\n");
1028           if (spoofIP) debug_msg("spoofIP: %s \n",spoofIP);
1029           if (buff) debug_msg("buff: %s \n",buff);
1030           if (buff) free(buff);
1031           return NULL;
1032       }
1033       *spoofName = 0;
1034       spoofName++;
1035       if(!(*spoofName)){
1036           err_msg("Incorrect format for spoof argument. exiting.\n");
1037           if (buff) free(buff);
1038           return NULL;
1039       }
1040       debug_msg(" spoofName: %s    spoofIP: %s \n",spoofName,spoofIP);
1041 
1042       hostname = spoofName;
1043       remoteip = spoofIP;
1044     }
1045 
1046   apr_thread_mutex_lock(hosts_mutex);
1047   hostdata =  (Ganglia_host *)apr_hash_get( hosts, remoteip, APR_HASH_KEY_STRING );
1048   apr_thread_mutex_unlock(hosts_mutex);
1049   if(!hostdata)
1050     {
1051       /* Lookup the hostname or use the proxy information if available */
1052       if( !hostname )
1053         {
1054           /* We'll use the resolver to find the hostname */
1055           status = apr_getnameinfo(&hostname, sa, 0);
1056           if(status != APR_SUCCESS)
1057             {
1058               /* If hostname lookup fails.. set it to the ip */
1059               hostname = remoteip;
1060             }
1061         }
1062 
1063       /* This is the first time we've heard from this host.. create a new pool */
1064       status = apr_pool_create( &pool, global_context );
1065       if(status != APR_SUCCESS)
1066         {
1067           if (buff) free(buff);
1068           return NULL;
1069         }
1070 
1071       /* Malloc the hostdata_t from the new pool */
1072       hostdata = apr_pcalloc( pool, sizeof( Ganglia_host ));
1073       if(!hostdata)
1074         {
1075           if (buff) free(buff);
1076           apr_pool_destroy(pool);
1077           return NULL;
1078         }
1079 
1080       /* Save the pool address for later.. freeing this pool free everthing
1081        * for this particular host */
1082       hostdata->pool = pool;
1083 
1084       /* Save the hostname */
1085       hostdata->hostname = apr_pstrdup( pool, hostname );
1086 
1087       /* Dup the remoteip (it will be freed later) */
1088       hostdata->ip =  apr_pstrdup( pool, remoteip);
1089 
1090       /* We don't know the location yet */
1091       hostdata->location = NULL;
1092 
1093       /* Set the timestamps */
1094       hostdata->first_heard_from = hostdata->last_heard_from = apr_time_now();
1095 
1096       /* Create the hostdata mutex */
1097       if (apr_thread_mutex_create(&hostdata->mutex, APR_THREAD_MUTEX_DEFAULT, pool) != APR_SUCCESS)
1098         {
1099           if (buff) free(buff);
1100           apr_pool_destroy(pool);
1101           return NULL;
1102         }
1103 
1104       /* Create a hash for the metric data */
1105       hostdata->metrics = apr_hash_make( pool );
1106       if(!hostdata->metrics)
1107         {
1108           if (buff) free(buff);
1109           apr_pool_destroy(pool);
1110           return NULL;
1111         }
1112 
1113       /* Create a hash for the gmetric data */
1114       hostdata->gmetrics = apr_hash_make( pool );
1115       if(!hostdata->gmetrics)
1116         {
1117           if (buff) free(buff);
1118           apr_pool_destroy(pool);
1119           return NULL;
1120         }
1121 
1122       /* Save this host data to the "hosts" hash */
1123       apr_thread_mutex_lock(hosts_mutex);
1124       apr_hash_set( hosts, hostdata->ip, APR_HASH_KEY_STRING, hostdata);
1125       apr_thread_mutex_unlock(hosts_mutex);
1126     }
1127   else
1128     {
1129       /* We already have this host in our "hosts" hash update timestamp */
1130       hostdata->last_heard_from = apr_time_now();
1131     }
1132 
1133   if (buff) free(buff);
1134   return hostdata;
1135 }
1136 
1137 void
Ganglia_update_vidals(Ganglia_host * host,Ganglia_value_msg * vmsg)1138 Ganglia_update_vidals( Ganglia_host *host, Ganglia_value_msg *vmsg)
1139 {
1140     char *metricName=NULL, *realName=NULL;
1141 
1142     if (!vmsg)
1143         return;
1144 
1145     metricName = vmsg->Ganglia_value_msg_u.gstr.metric_id.name;
1146     get_metric_names (&(vmsg->Ganglia_value_msg_u.gstr.metric_id), &metricName, &realName);
1147 
1148     if(!strcasecmp("location", metricName))
1149       {
1150         /* We have to manage this memory here because.. returning NULL
1151          * will not cause Ganglia_message_save to be run.  Maybe this
1152          * could be done better later i.e should these metrics be
1153          * in the host->metrics list instead of the host structure? */
1154         if(host->location)
1155           {
1156             /* Free old location */
1157             free(host->location);
1158           }
1159         /* Save new location */
1160         host->location = strdup(vmsg->Ganglia_value_msg_u.gstr.str);
1161         debug_msg("Got a location message %s\n", host->location);
1162         /* Processing is finished */
1163       }
1164     else if(!strcasecmp("heartbeat", metricName))
1165       {
1166         /* nothing more needs to be done. we handled the timestamps above. */
1167         host->gmond_started = vmsg->Ganglia_value_msg_u.gu_int.ui;
1168         debug_msg("Got a heartbeat message %d\n", host->gmond_started);
1169         /* Processing is finished */
1170       }
1171     else if(vmsg->Ganglia_value_msg_u.gstr.metric_id.spoof)
1172       {
1173         /* nothing more needs to be done. we handled the timestamps above. */
1174         debug_msg("Got a spoof message %s from %s\n", vmsg->Ganglia_value_msg_u.gstr.metric_id.name,
1175                   vmsg->Ganglia_value_msg_u.gstr.metric_id.host);
1176         /* Processing is finished */
1177       }
1178 
1179     if (metricName) free(metricName);
1180     if (realName) free(realName);
1181     return;
1182 }
1183 
1184 void
Ganglia_metadata_check(Ganglia_host * host,Ganglia_value_msg * vmsg)1185 Ganglia_metadata_check(Ganglia_host *host, Ganglia_value_msg *vmsg )
1186 {
1187     char *metric_name = vmsg->Ganglia_value_msg_u.gstr.metric_id.name;
1188     int is_spoof_msg = vmsg->Ganglia_value_msg_u.gstr.metric_id.spoof;
1189     Ganglia_metadata *metric;
1190 
1191     apr_thread_mutex_lock(host->mutex);
1192     metric = (Ganglia_metadata *)apr_hash_get(host->metrics, metric_name, APR_HASH_KEY_STRING);
1193     apr_thread_mutex_unlock(host->mutex);
1194 
1195     if(!metric)
1196       {
1197         int len;
1198         XDR x;
1199         char msgbuf[GANGLIA_MAX_MESSAGE_LEN];
1200         char hostbuf[512];
1201         Ganglia_metadata_msg msg;
1202 
1203         msg.id = gmetadata_request;
1204         if (is_spoof_msg)
1205             apr_snprintf(hostbuf, 512, "%s:%s", host->ip, host->hostname);
1206         else
1207             apr_snprintf(hostbuf, 512, "%s", host->hostname);
1208         msg.Ganglia_metadata_msg_u.grequest.metric_id.host = hostbuf;
1209         msg.Ganglia_metadata_msg_u.grequest.metric_id.name = metric_name;
1210         msg.Ganglia_metadata_msg_u.grequest.metric_id.spoof = is_spoof_msg;
1211 
1212         debug_msg("sending metadata request flag for metric: %s host: %s", metric_name, host->hostname);
1213         ganglia_scoreboard_inc(PKTS_SENT_REQUEST);
1214         ganglia_scoreboard_inc(PKTS_SENT_ALL);
1215 
1216         /* Send the message */
1217         xdrmem_create(&x, msgbuf, GANGLIA_MAX_MESSAGE_LEN, XDR_ENCODE);
1218         if(!xdr_Ganglia_metadata_msg(&x, &msg))
1219           {
1220             return;
1221           }
1222         len = xdr_getpos(&x);
1223         /* Send the encoded data along...*/
1224         Ganglia_udp_send_message( udp_send_channels, msgbuf, len);
1225       }
1226 
1227     return;
1228 }
1229 
1230 #if 0
1231 static void
1232 Ganglia_metadata_free( Ganglia_metadata *metric )
1233 {
1234   if(!metric)
1235     return;
1236   apr_pool_destroy( metric->pool );
1237 }
1238 #endif
1239 
1240 void
Ganglia_metadata_save(Ganglia_host * host,Ganglia_metadata_msg * message)1241 Ganglia_metadata_save( Ganglia_host *host, Ganglia_metadata_msg *message )
1242 {
1243     Ganglia_metadata *metric;
1244 
1245     if(!host || !message)
1246         return;
1247 
1248     /* Search for the Ganglia_metadata in the Ganglia_host */
1249     sanitize_metric_name(message->Ganglia_metadata_msg_u.gfull.metric_id.name, message->Ganglia_metadata_msg_u.gfull.metric_id.spoof);
1250 
1251     apr_thread_mutex_lock(host->mutex);
1252     metric = (Ganglia_metadata *)apr_hash_get(host->metrics,
1253                                               message->Ganglia_metadata_msg_u.gfull.metric_id.name,
1254                                               APR_HASH_KEY_STRING);
1255     apr_thread_mutex_unlock(host->mutex);
1256 
1257     if(metric)
1258       {
1259         apr_pool_clear(metric->pool);
1260       }
1261     else
1262       {
1263         apr_status_t status;
1264 
1265         /* This is a new metric sent from this host... allocate space for this data */
1266 
1267         /* Allocate a new metric from this context */
1268         metric = apr_pcalloc(host->pool, sizeof(Ganglia_metadata));
1269         if(!metric)
1270             return;
1271 
1272         /* Create the context for this metric */
1273         status = apr_pool_create(&(metric->pool), host->pool);
1274         if(status != APR_SUCCESS)
1275             return;
1276 
1277         debug_msg("***Allocating metadata packet for host--%s-- and metric --%s-- ****\n", host->hostname, message->Ganglia_metadata_msg_u.gfull.metric_id.name);
1278     }
1279 
1280     if(metric)
1281       {
1282         Ganglia_metadata_msg *fmessage = &(metric->message_u.f_message);
1283         u_int i,mlen = message->Ganglia_metadata_msg_u.gfull.metric.metadata.metadata_len;
1284 
1285         metric->name = apr_pstrdup(metric->pool, message->Ganglia_metadata_msg_u.gfull.metric_id.name);
1286         fmessage->id = message->id;
1287         fmessage->Ganglia_metadata_msg_u.gfull.metric_id.host =
1288             apr_pstrdup(metric->pool, message->Ganglia_metadata_msg_u.gfull.metric_id.host);
1289         fmessage->Ganglia_metadata_msg_u.gfull.metric_id.name =
1290             apr_pstrdup(metric->pool, message->Ganglia_metadata_msg_u.gfull.metric_id.name);
1291         fmessage->Ganglia_metadata_msg_u.gfull.metric_id.spoof =
1292             message->Ganglia_metadata_msg_u.gfull.metric_id.spoof;
1293         fmessage->Ganglia_metadata_msg_u.gfull.metric.type =
1294             apr_pstrdup(metric->pool, message->Ganglia_metadata_msg_u.gfull.metric.type);
1295         fmessage->Ganglia_metadata_msg_u.gfull.metric.name =
1296             apr_pstrdup(metric->pool, message->Ganglia_metadata_msg_u.gfull.metric.name);
1297         fmessage->Ganglia_metadata_msg_u.gfull.metric.units =
1298             apr_pstrdup(metric->pool, message->Ganglia_metadata_msg_u.gfull.metric.units);
1299         fmessage->Ganglia_metadata_msg_u.gfull.metric.slope =
1300             message->Ganglia_metadata_msg_u.gfull.metric.slope;
1301         fmessage->Ganglia_metadata_msg_u.gfull.metric.tmax =
1302             message->Ganglia_metadata_msg_u.gfull.metric.tmax;
1303         fmessage->Ganglia_metadata_msg_u.gfull.metric.dmax =
1304             message->Ganglia_metadata_msg_u.gfull.metric.dmax;
1305         fmessage->Ganglia_metadata_msg_u.gfull.metric.metadata.metadata_len = mlen;
1306 
1307         fmessage->Ganglia_metadata_msg_u.gfull.metric.metadata.metadata_val =
1308             apr_pcalloc(metric->pool, sizeof(Ganglia_extra_data)*mlen);
1309         for (i = 0; i < mlen; i++)
1310           {
1311             fmessage->Ganglia_metadata_msg_u.gfull.metric.metadata.metadata_val[i].name =
1312                 apr_pstrdup(metric->pool,
1313                             message->Ganglia_metadata_msg_u.gfull.metric.metadata.metadata_val[i].name);
1314             fmessage->Ganglia_metadata_msg_u.gfull.metric.metadata.metadata_val[i].data =
1315                 apr_pstrdup(metric->pool,
1316                             message->Ganglia_metadata_msg_u.gfull.metric.metadata.metadata_val[i].data);
1317           }
1318 
1319         metric->last_heard_from = apr_time_now();
1320 
1321         /* Save the full metric */
1322         apr_thread_mutex_lock(host->mutex);
1323         apr_hash_set(host->metrics, metric->name, APR_HASH_KEY_STRING, metric);
1324         apr_thread_mutex_unlock(host->mutex);
1325         debug_msg("saving metadata for metric: %s host: %s", metric->name, host->hostname);
1326       }
1327 }
1328 
1329 static void
Ganglia_metadata_request(Ganglia_host * host,Ganglia_metadata_msg * message)1330 Ganglia_metadata_request( Ganglia_host *host, Ganglia_metadata_msg *message )
1331 {
1332   char *name = message->Ganglia_metadata_msg_u.grequest.metric_id.name;
1333   Ganglia_metric_callback *metric_cb;
1334   int is_spoof_msg = message->Ganglia_metadata_msg_u.grequest.metric_id.spoof;
1335   char srch_name[512];
1336 
1337   if(is_spoof_msg)
1338     {
1339       apr_snprintf(srch_name, 512, "%s:%s", name, host->hostname);
1340       metric_cb =  (Ganglia_metric_callback *)apr_hash_get( metric_callbacks, srch_name, APR_HASH_KEY_STRING );
1341     }
1342   else
1343     {
1344       metric_cb =  (Ganglia_metric_callback *)apr_hash_get( metric_callbacks, name, APR_HASH_KEY_STRING );
1345     }
1346 
1347   if(!host || !message)
1348     return;
1349 
1350   if (metric_cb)
1351       metric_cb->metadata_last_sent = 0;
1352   debug_msg("setting metadata request flag for metric: %s host: %s", name, host->hostname);
1353 }
1354 
1355 void
Ganglia_value_save(Ganglia_host * host,Ganglia_value_msg * message)1356 Ganglia_value_save( Ganglia_host *host, Ganglia_value_msg *message )
1357 {
1358   Ganglia_metadata *metric;
1359 
1360   if(!host || !message)
1361     return;
1362 
1363   /* Search for the Ganglia_metric in the Ganglia_host */
1364   apr_thread_mutex_lock(host->mutex);
1365   metric = (Ganglia_metadata *)apr_hash_get(host->gmetrics,
1366                                             message->Ganglia_value_msg_u.gstr.metric_id.name,
1367                                             APR_HASH_KEY_STRING);
1368   apr_thread_mutex_unlock(host->mutex);
1369 
1370   if(metric)
1371     {
1372       apr_pool_clear(metric->pool);
1373     }
1374   else
1375     {
1376       apr_status_t status;
1377 
1378       /* This is a new metric sent from this host... allocate space for this data */
1379 
1380       /* Allocate a new metric from this context */
1381       metric = apr_pcalloc(host->pool, sizeof(Ganglia_metadata));
1382       if(!metric)
1383         {
1384           /* no memory */
1385           return;
1386         }
1387 
1388       /* Create the context for this metric */
1389       status = apr_pool_create(&(metric->pool), host->pool);
1390       if(status != APR_SUCCESS)
1391           return;
1392 
1393       debug_msg("***Allocating value packet for host--%s-- and metric --%s-- ****\n", message->Ganglia_value_msg_u.gstr.metric_id.host, message->Ganglia_value_msg_u.gstr.metric_id.name );
1394     }
1395 
1396 
1397   if(metric)
1398     {
1399       Ganglia_value_msg *vmessage = &(metric->message_u.v_message);
1400 
1401       metric->name = apr_pstrdup(metric->pool, message->Ganglia_value_msg_u.gstr.metric_id.name );
1402       vmessage->id = message->id;
1403       vmessage->Ganglia_value_msg_u.gstr.metric_id.host =
1404           apr_pstrdup(metric->pool, message->Ganglia_value_msg_u.gstr.metric_id.host);
1405       vmessage->Ganglia_value_msg_u.gstr.metric_id.name =
1406           apr_pstrdup(metric->pool, message->Ganglia_value_msg_u.gstr.metric_id.name);
1407       vmessage->Ganglia_value_msg_u.gstr.metric_id.spoof =
1408           message->Ganglia_value_msg_u.gstr.metric_id.spoof;
1409       vmessage->Ganglia_value_msg_u.gstr.fmt =
1410           apr_pstrdup(metric->pool, message->Ganglia_value_msg_u.gstr.fmt);
1411 
1412       switch(message->id)
1413         {
1414         case gmetric_string:
1415           vmessage->Ganglia_value_msg_u.gstr.str =
1416               apr_pstrdup(metric->pool, message->Ganglia_value_msg_u.gstr.str);
1417           break;
1418         case gmetric_ushort:
1419           vmessage->Ganglia_value_msg_u.gu_short.us =
1420               message->Ganglia_value_msg_u.gu_short.us;
1421           break;
1422         case gmetric_short:
1423           vmessage->Ganglia_value_msg_u.gs_short.ss =
1424               message->Ganglia_value_msg_u.gs_short.ss;
1425           break;
1426         case gmetric_uint:
1427           vmessage->Ganglia_value_msg_u.gu_int.ui =
1428               message->Ganglia_value_msg_u.gu_int.ui;
1429           break;
1430         case gmetric_int:
1431           vmessage->Ganglia_value_msg_u.gs_int.si =
1432               message->Ganglia_value_msg_u.gs_int.si;
1433           break;
1434         case gmetric_float:
1435           vmessage->Ganglia_value_msg_u.gf.f =
1436               message->Ganglia_value_msg_u.gf.f;
1437           break;
1438         case gmetric_double:
1439           vmessage->Ganglia_value_msg_u.gd.d =
1440               message->Ganglia_value_msg_u.gd.d;
1441           break;
1442         default:
1443           break;
1444         }
1445 
1446       metric->last_heard_from = apr_time_now();
1447 
1448       /* Save the last update metric */
1449       apr_thread_mutex_lock(host->mutex);
1450       apr_hash_set(host->gmetrics, metric->name, APR_HASH_KEY_STRING, metric);
1451       apr_thread_mutex_unlock(host->mutex);
1452     }
1453 }
1454 
1455 static void
process_udp_recv_channel(const apr_pollfd_t * desc,apr_time_t now)1456 process_udp_recv_channel(const apr_pollfd_t *desc, apr_time_t now)
1457 {
1458   apr_status_t status;
1459   apr_socket_t *socket;
1460   apr_sockaddr_t *remotesa = NULL;
1461 #ifdef SFLOW
1462   uint16_t localport;
1463   char *errorMsg = NULL;
1464 #endif
1465   char  remoteip[256];
1466   char buf[max_udp_message_len];
1467   apr_size_t len = max_udp_message_len;
1468   Ganglia_channel *channel;
1469   XDR x;
1470   Ganglia_metadata_msg fmsg;
1471   Ganglia_value_msg vmsg;
1472   Ganglia_host *hostdata = NULL;
1473   apr_pool_t *p = NULL;
1474   Ganglia_msg_formats id;
1475   bool_t ret;
1476 
1477   socket         = desc->desc.s;
1478   /* We could also use the apr_socket_data_get/set() functions
1479    * to have per socket user data .. see APR docs */
1480   channel       = desc->client_data;
1481 
1482   /* We need to create a copy of the local sockaddr so that the
1483      recvfrom call has a place holder to put the remote information.
1484      Getting the remote sockaddr might not work since a SOCK_DGRAM
1485      type socket is connectionless. */
1486   apr_pool_create(&p, global_context);
1487   status = apr_socket_addr_get(&remotesa, APR_LOCAL, socket);
1488 #ifdef SFLOW
1489   /* remember this before it gets overwritten */
1490   localport = remotesa->port;
1491 #endif
1492   status = apr_sockaddr_info_get(&remotesa, NULL, remotesa->family, remotesa->port, 0, p);
1493 
1494   /* Grab the data */
1495   status = apr_socket_recvfrom(remotesa, socket, 0, buf, &len);
1496   if(status != APR_SUCCESS)
1497     {
1498       apr_pool_destroy(p);
1499       return;
1500     }
1501 
1502   /* This function is in ./lib/apr_net.c and not APR. The
1503    * APR counterpart is apr_sockaddr_ip_get() but we don't
1504    * want to malloc memory evertime we call this */
1505   apr_sockaddr_ip_buffer_get(remoteip, 256, remotesa);
1506 
1507   /* Check the ACL */
1508   if(Ganglia_acl_action( channel->acl, remotesa) != GANGLIA_ACCESS_ALLOW)
1509     {
1510       apr_pool_destroy(p);
1511       return;
1512     }
1513 
1514   ganglia_scoreboard_inc(PKTS_RECVD_ALL);
1515 
1516 #ifdef SFLOW
1517   if(localport == sflow_udp_port) {
1518     if(process_sflow_datagram(remotesa, buf, len, now, &errorMsg)) {
1519       ganglia_scoreboard_inc(PKTS_RECVD_VALUE);
1520     }
1521     else {
1522       if(errorMsg) {
1523 	debug_msg("sFlow error: %s", errorMsg);
1524       }
1525       ganglia_scoreboard_inc(PKTS_RECVD_FAILED);
1526     }
1527     apr_pool_destroy(p);
1528     return;
1529   }
1530 #endif
1531 
1532   /* Create the XDR receive stream */
1533   xdrmem_create(&x, buf, max_udp_message_len, XDR_DECODE);
1534 
1535   /* Flush the data... */
1536   memset( &fmsg, 0, sizeof(Ganglia_metadata_msg));
1537   memset( &vmsg, 0, sizeof(Ganglia_value_msg));
1538 
1539   /* Figure out what kind of message it we got */
1540   xdr_Ganglia_msg_formats(&x, &id);
1541   xdr_setpos (&x, 0);
1542 
1543   /* Read the gangliaMessage from the stream */
1544   /* Save the message from this particular host */
1545   switch (id)
1546     {
1547     case gmetadata_request:
1548       ganglia_scoreboard_inc(PKTS_RECVD_REQUEST);
1549       ret = xdr_Ganglia_metadata_msg(&x, &fmsg);
1550       if (ret)
1551           hostdata = Ganglia_host_get(remoteip, remotesa, &(fmsg.Ganglia_metadata_msg_u.grequest.metric_id));
1552       sanitize_metric_name(fmsg.Ganglia_metadata_msg_u.grequest.metric_id.name, fmsg.Ganglia_metadata_msg_u.grequest.metric_id.spoof);
1553       if(!ret || !hostdata)
1554         {
1555           ganglia_scoreboard_inc(PKTS_RECVD_FAILED);
1556           /* Processing of this message is finished ... */
1557           xdr_free((xdrproc_t)xdr_Ganglia_metadata_msg, (char *)&fmsg);
1558           break;
1559         }
1560       debug_msg("Processing a metric metadata request message from %s", hostdata->hostname);
1561       Ganglia_metadata_request(hostdata, &fmsg);
1562       xdr_free((xdrproc_t)xdr_Ganglia_metadata_msg, (char *)&fmsg);
1563       break;
1564     case gmetadata_full:
1565       ganglia_scoreboard_inc(PKTS_RECVD_METADATA);
1566       ret = xdr_Ganglia_metadata_msg(&x, &fmsg);
1567       if (ret)
1568           hostdata = Ganglia_host_get(remoteip, remotesa, &(fmsg.Ganglia_metadata_msg_u.gfull.metric_id));
1569       sanitize_metric_name(fmsg.Ganglia_metadata_msg_u.gfull.metric_id.name, fmsg.Ganglia_metadata_msg_u.gfull.metric_id.spoof);
1570       if(!ret || !hostdata)
1571         {
1572           ganglia_scoreboard_inc(PKTS_RECVD_FAILED);
1573           /* Processing of this message is finished ... */
1574           xdr_free((xdrproc_t)xdr_Ganglia_metadata_msg, (char *)&fmsg);
1575           break;
1576         }
1577       debug_msg("Processing a metric metadata message from %s", hostdata->hostname);
1578       Ganglia_metadata_save( hostdata, &fmsg );
1579       xdr_free((xdrproc_t)xdr_Ganglia_metadata_msg, (char *)&fmsg);
1580       break;
1581     case gmetric_ushort:
1582     case gmetric_short:
1583     case gmetric_int:
1584     case gmetric_uint:
1585     case gmetric_string:
1586     case gmetric_float:
1587     case gmetric_double:
1588       ganglia_scoreboard_inc(PKTS_RECVD_VALUE);
1589       ret = xdr_Ganglia_value_msg(&x, &vmsg);
1590       if (ret)
1591           hostdata = Ganglia_host_get(remoteip, remotesa, &(vmsg.Ganglia_value_msg_u.gstr.metric_id));
1592       sanitize_metric_name(vmsg.Ganglia_value_msg_u.gstr.metric_id.name, vmsg.Ganglia_value_msg_u.gstr.metric_id.spoof);
1593       if(!ret || !hostdata)
1594         {
1595           ganglia_scoreboard_inc(PKTS_RECVD_FAILED);
1596           /* Processing of this message is finished ... */
1597           xdr_free((xdrproc_t)xdr_Ganglia_value_msg, (char *)&vmsg);
1598           break;
1599         }
1600       debug_msg("Processing a metric value message from %s", hostdata->hostname);
1601       Ganglia_value_save(hostdata, &vmsg);
1602       Ganglia_update_vidals(hostdata, &vmsg);
1603       Ganglia_metadata_check(hostdata, &vmsg);
1604       xdr_free((xdrproc_t)xdr_Ganglia_value_msg, (char *)&vmsg);
1605       break;
1606     default:
1607       ganglia_scoreboard_inc(PKTS_RECVD_IGNORED);
1608       break;
1609   }
1610 
1611   apr_pool_destroy(p);
1612 
1613   return;
1614 }
1615 
1616 static z_stream *
zstream_new()1617 zstream_new()
1618 {
1619   int err;
1620 
1621   z_stream *strm = calloc(1, sizeof(z_stream));
1622   if (strm == 0)
1623     {
1624       return NULL;
1625     }
1626 
1627   /* Yes, 15 + 16 are 2 special magic values documented in zlib.h */
1628   err = deflateInit2(strm, Z_DEFAULT_COMPRESSION, Z_DEFLATED, 15 + 16, 8, Z_DEFAULT_STRATEGY);
1629   if (err != Z_OK)
1630     {
1631       free( strm );
1632       return NULL;
1633     }
1634 
1635   return strm;
1636 }
1637 
1638 static apr_status_t
socket_flush(apr_socket_t * client,int gzip_output)1639 socket_flush( apr_socket_t *client, int gzip_output )
1640 {
1641   char outputbuffer[2048];
1642   const int outputlen = sizeof(outputbuffer);
1643   int ret;
1644   int status;
1645   apr_size_t wlen;
1646   z_stream *strm;
1647 
1648   if ( !gzip_output )
1649     {
1650       return APR_SUCCESS;
1651     }
1652 
1653   if (APR_SUCCESS == apr_socket_data_get((void**)&strm, GZIP_KEY, client))
1654     {
1655       while( 1 )
1656 	{
1657 	  strm->next_out  = (Bytef *)outputbuffer;
1658 	  strm->avail_out = outputlen;
1659 
1660 	  ret = deflate( strm, Z_FINISH );
1661 	  if (ret != Z_OK && ret != Z_STREAM_END)
1662 	    {
1663 	      return APR_ENOMEM;
1664 	    }
1665 
1666 	  wlen = outputlen - strm->avail_out;
1667 	  status = socket_send_raw( client, outputbuffer, &wlen );
1668 	  if(status != APR_SUCCESS)
1669 	    return status;
1670 
1671 	  if(ret == Z_STREAM_END)
1672 	    return APR_SUCCESS;
1673 	}
1674     }
1675   return APR_SUCCESS;
1676 }
1677 
1678 static void
zstream_destroy(z_stream * strm)1679 zstream_destroy( z_stream *strm )
1680 {
1681   if (strm)
1682     {
1683       deflateEnd(strm);
1684       free (strm);
1685     }
1686 }
1687 
1688 static apr_status_t
print_xml_header(apr_socket_t * client)1689 print_xml_header( apr_socket_t *client )
1690 {
1691   apr_status_t status;
1692   apr_size_t len = strlen(DTD);
1693   char gangliaxml[128];
1694   char clusterxml[1024];
1695   static int clusterinit = 0;
1696   static char *name = NULL;
1697   static char *owner = NULL;
1698   static char *latlong = NULL;
1699   static char *url = NULL;
1700   apr_time_t now = apr_time_now();
1701 
1702   status = socket_send( client, DTD, &len );
1703   if(status != APR_SUCCESS)
1704     return status;
1705 
1706   len = apr_snprintf( gangliaxml, 128, "<GANGLIA_XML VERSION=\"%s\" SOURCE=\"gmond\">\n",
1707                       VERSION);
1708   status = socket_send( client, gangliaxml, &len);
1709   if(status != APR_SUCCESS)
1710     return status;
1711 
1712   if(!clusterinit)
1713     {
1714       /* We only run this on the first connection we process */
1715       cfg_t *cluster = cfg_getsec(config_file, "cluster");
1716       if(cluster)
1717         {
1718           name    = cfg_getstr( cluster, "name" );
1719           owner   = cfg_getstr( cluster, "owner" );
1720           latlong = cfg_getstr( cluster, "latlong" );
1721           url     = cfg_getstr( cluster, "url" );
1722           if(name || owner || latlong || url)
1723             {
1724               cluster_tag =1;
1725             }
1726         }
1727       clusterinit = 1;
1728     }
1729 
1730   if(cluster_tag)
1731     {
1732       len = apr_snprintf( clusterxml, 1024,
1733         "<CLUSTER NAME=\"%s\" LOCALTIME=\"%d\" OWNER=\"%s\" LATLONG=\"%s\" URL=\"%s\">\n",
1734                   name?name:"unspecified",
1735                   (int)(now / APR_USEC_PER_SEC),
1736                   owner?owner:"unspecified",
1737                   latlong?latlong:"unspecified",
1738                   url?url:"unspecified");
1739 
1740       return socket_send( client, clusterxml, &len);
1741     }
1742 
1743   return APR_SUCCESS;
1744 }
1745 
1746 static apr_status_t
print_xml_footer(apr_socket_t * client)1747 print_xml_footer( apr_socket_t *client )
1748 {
1749   apr_status_t status;
1750   apr_size_t len;
1751   if(cluster_tag)
1752     {
1753       len = 11;
1754       status = socket_send(client, "</CLUSTER>\n", &len);
1755       if(status != APR_SUCCESS)
1756         {
1757           return status;
1758         }
1759     }
1760   len = 15;
1761   return socket_send( client, "</GANGLIA_XML>\n", &len);
1762 }
1763 
1764 static apr_status_t
print_host_start(apr_socket_t * client,Ganglia_host * hostinfo)1765 print_host_start( apr_socket_t *client, Ganglia_host *hostinfo)
1766 {
1767   apr_size_t len;
1768   char hostxml[1024]; /* for <HOST></HOST> */
1769   apr_time_t now = apr_time_now();
1770   int tn = (now - hostinfo->last_heard_from) / APR_USEC_PER_SEC;
1771 
1772   len = apr_snprintf(hostxml, 1024,
1773            "<HOST NAME=\"%s\" IP=\"%s\" TAGS=\"%s\" REPORTED=\"%d\" TN=\"%d\" TMAX=\"%d\" DMAX=\"%d\" LOCATION=\"%s\" GMOND_STARTED=\"%d\">\n",
1774                      hostinfo->hostname,
1775                      hostinfo->ip,
1776                      tags ? tags : "",
1777                      (int)(hostinfo->last_heard_from / APR_USEC_PER_SEC),
1778                      tn,
1779                      host_tmax,
1780                      host_dmax,
1781                      hostinfo->location? hostinfo->location: "unspecified",
1782                      hostinfo->gmond_started);
1783 
1784   return socket_send(client, hostxml, &len);
1785 }
1786 
1787 /* NOT THREAD SAFE */
1788 static char *
host_metric_type(Ganglia_value_types type)1789 host_metric_type( Ganglia_value_types type)
1790 {
1791   switch(type)
1792     {
1793     case GANGLIA_VALUE_UNKNOWN:
1794       return "unknown";
1795     case GANGLIA_VALUE_STRING:
1796       return "string";
1797     case GANGLIA_VALUE_UNSIGNED_SHORT:
1798       return "uint16";
1799     case GANGLIA_VALUE_SHORT:
1800       return "int16";
1801     case GANGLIA_VALUE_UNSIGNED_INT:
1802       return "uint32";
1803     case GANGLIA_VALUE_INT:
1804       return "int32";
1805     case GANGLIA_VALUE_FLOAT:
1806       return "float";
1807     case GANGLIA_VALUE_DOUBLE:
1808       return "double";
1809     }
1810   return "undef";
1811 }
1812 
1813 /* NOT THREAD SAFE */
1814 static char *
host_metric_value(Ganglia_25metric * metric,Ganglia_value_msg * message)1815 host_metric_value( Ganglia_25metric *metric, Ganglia_value_msg *message )
1816 {
1817   static char value[1024];
1818   if(!metric||!message)
1819     {
1820       return "unknown";
1821     }
1822 
1823   switch(metric->type)
1824     {
1825     case GANGLIA_VALUE_UNKNOWN:
1826       return "unknown";
1827     case GANGLIA_VALUE_STRING:
1828       return message->Ganglia_value_msg_u.gstr.str;
1829     case GANGLIA_VALUE_UNSIGNED_SHORT:
1830       apr_snprintf(value, 1024, metric->fmt, message->Ganglia_value_msg_u.gu_short.us);
1831       return value;
1832     case GANGLIA_VALUE_SHORT:
1833       /* For right now.. there are no metrics which are signed shorts... use u_short */
1834       apr_snprintf(value, 1024, metric->fmt, message->Ganglia_value_msg_u.gs_short.ss);
1835       return value;
1836     case GANGLIA_VALUE_UNSIGNED_INT:
1837       apr_snprintf(value, 1024, metric->fmt, message->Ganglia_value_msg_u.gu_int.ui);
1838       return value;
1839     case GANGLIA_VALUE_INT:
1840       /* For right now.. there are no metric which are signed ints... use u_int */
1841       apr_snprintf(value, 1024, metric->fmt, message->Ganglia_value_msg_u.gs_int.si);
1842       return value;
1843     case GANGLIA_VALUE_FLOAT:
1844       apr_snprintf(value, 1024, metric->fmt, message->Ganglia_value_msg_u.gf.f);
1845       return value;
1846     case GANGLIA_VALUE_DOUBLE:
1847       apr_snprintf(value, 1024, metric->fmt, message->Ganglia_value_msg_u.gd.d);
1848       return value;
1849     }
1850 
1851   return "unknown";
1852 }
1853 
1854 static int
xml_escape(char * dst,const char * src,size_t max_dst_len)1855 xml_escape(char *dst, const char *src, size_t max_dst_len)
1856 {
1857     unsigned int ch;
1858     const char *s = src;
1859     char *dst_start = dst;
1860 
1861     max_dst_len -= 7; //6+1 is the max length of an entity
1862 
1863     *dst = '\0';
1864     while (*s)
1865         {
1866         if (dst - dst_start > max_dst_len)
1867 	  {
1868             *dst='\0'; break;
1869 	  }
1870         switch (*s)
1871 	    {
1872             case '&':
1873               strncat(dst, "&amp;", max_dst_len);
1874               dst += 5;
1875               break;
1876             case '<':
1877               strncat(dst, "&lt;", max_dst_len);
1878               dst += 4;
1879               break;
1880             case '>':
1881               strncat(dst, "&gt;", max_dst_len);
1882 	      dst += 4;
1883 	      break;
1884             case '"':
1885               strncat(dst, "&quot;", max_dst_len);
1886               dst += 6;
1887               break;
1888             case '\'':
1889               strncat(dst, "&apos;", max_dst_len);
1890               dst += 6;
1891               break;
1892             default:
1893               ch = (unsigned int)(*s);
1894               if ((ch < 32) || (ch > 126)) {
1895                   *dst ++ = '&';
1896                   *dst ++ = '#';
1897                   *dst ++ = (char)((ch / 10) + '0');
1898                   *dst ++ = (char)((ch % 10) + '0');
1899                   *dst ++ =';';
1900                   *dst = '\0';
1901               } else {
1902                   *dst ++ = *s;
1903                   *dst = '\0';
1904               }
1905 	    }
1906 	  s++;
1907         }
1908     return (dst - dst_start);
1909 }
1910 
1911 static char *
gmetric_value_to_str(Ganglia_value_msg * message)1912 gmetric_value_to_str(Ganglia_value_msg *message)
1913 {
1914   static char value[1024];
1915   if(!message)
1916     {
1917       return "unknown";
1918     }
1919 
1920   switch(message->id)
1921     {
1922     case gmetric_string:
1923       xml_escape(value, message->Ganglia_value_msg_u.gstr.str, 1024);
1924       return value;
1925     case gmetric_ushort:
1926       apr_snprintf(value, 1024, message->Ganglia_value_msg_u.gu_short.fmt, message->Ganglia_value_msg_u.gu_short.us);
1927       return value;
1928     case gmetric_short:
1929       /* For right now.. there are no metrics which are signed shorts... use u_short */
1930       apr_snprintf(value, 1024, message->Ganglia_value_msg_u.gs_short.fmt, message->Ganglia_value_msg_u.gs_short.ss);
1931       return value;
1932     case gmetric_uint:
1933       apr_snprintf(value, 1024, message->Ganglia_value_msg_u.gu_int.fmt, message->Ganglia_value_msg_u.gu_int.ui);
1934       return value;
1935     case gmetric_int:
1936       /* For right now.. there are no metric which are signed ints... use u_int */
1937       apr_snprintf(value, 1024, message->Ganglia_value_msg_u.gs_int.fmt, message->Ganglia_value_msg_u.gs_int.si);
1938       return value;
1939     case gmetric_float:
1940       apr_snprintf(value, 1024, message->Ganglia_value_msg_u.gf.fmt, message->Ganglia_value_msg_u.gf.f);
1941       return value;
1942     case gmetric_double:
1943       apr_snprintf(value, 1024, message->Ganglia_value_msg_u.gd.fmt, message->Ganglia_value_msg_u.gd.d);
1944       return value;
1945     case gmetadata_full:
1946     case gmetadata_request:
1947     default:
1948       return "unknown";
1949     }
1950 }
1951 
1952 static apr_status_t
print_host_metric(apr_socket_t * client,Ganglia_metadata * data,Ganglia_metadata * val,apr_time_t now)1953 print_host_metric( apr_socket_t *client, Ganglia_metadata *data, Ganglia_metadata *val, apr_time_t now )
1954 {
1955   char metricxml[1024];
1956   apr_size_t len;
1957   apr_status_t ret;
1958   char *metricName=NULL, *realName=NULL;
1959 
1960   if (!data || !val)
1961       return APR_SUCCESS;
1962 
1963   get_metric_names (&(data->message_u.f_message.Ganglia_metadata_msg_u.gfull.metric_id), &metricName, &realName);
1964 
1965   if (!metricName || (!strcasecmp(metricName, "heartbeat") || !strcasecmp(metricName, "location")))
1966     {
1967       if (metricName) free(metricName);
1968       if (realName) free(realName);
1969       return APR_SUCCESS;
1970     }
1971 
1972   len = apr_snprintf(metricxml, 1024,
1973           "<METRIC NAME=\"%s\" VAL=\"%s\" TYPE=\"%s\" UNITS=\"%s\" TN=\"%d\" TMAX=\"%d\" DMAX=\"%d\" SLOPE=\"%s\">\n",
1974               metricName,
1975               gmetric_value_to_str(&(val->message_u.v_message)),
1976               data->message_u.f_message.Ganglia_metadata_msg_u.gfull.metric.type,
1977               data->message_u.f_message.Ganglia_metadata_msg_u.gfull.metric.units,
1978               (int)((now - val->last_heard_from) / APR_USEC_PER_SEC),
1979               data->message_u.f_message.Ganglia_metadata_msg_u.gfull.metric.tmax,
1980               data->message_u.f_message.Ganglia_metadata_msg_u.gfull.metric.dmax,
1981               slope_to_cstr(data->message_u.f_message.Ganglia_metadata_msg_u.gfull.metric.slope));
1982 
1983   if (metricName) free(metricName);
1984   if (realName) free(realName);
1985 
1986   ret = socket_send(client, metricxml, &len);
1987   if ((ret == APR_SUCCESS) && allow_extra_data)
1988     {
1989       int extra_len = data->message_u.f_message.Ganglia_metadata_msg_u.gfull.metric.metadata.metadata_len;
1990       len = apr_snprintf(metricxml, 1024, "<EXTRA_DATA>\n");
1991       socket_send(client, metricxml, &len);
1992       for (; extra_len > 0; extra_len--)
1993         {
1994           len = apr_snprintf(metricxml, 1024, "<EXTRA_ELEMENT NAME=\"%s\" VAL=\"%s\"/>\n",
1995                  data->message_u.f_message.Ganglia_metadata_msg_u.gfull.metric.metadata.metadata_val[extra_len-1].name,
1996                  data->message_u.f_message.Ganglia_metadata_msg_u.gfull.metric.metadata.metadata_val[extra_len-1].data);
1997           socket_send(client, metricxml, &len);
1998         }
1999         len = apr_snprintf(metricxml, 1024, "</EXTRA_DATA>\n");
2000         socket_send(client, metricxml, &len);
2001     }
2002   /* Send the closing tag */
2003   len = apr_snprintf(metricxml, 1024, "</METRIC>\n");
2004 
2005   return socket_send(client, metricxml, &len);
2006 }
2007 
2008 static apr_status_t
print_host_end(apr_socket_t * client)2009 print_host_end( apr_socket_t *client)
2010 {
2011   apr_size_t len = 8;
2012   return socket_send(client, "</HOST>\n", &len);
2013 }
2014 
2015 static void
process_tcp_accept_channel(const apr_pollfd_t * desc,apr_time_t now)2016 process_tcp_accept_channel(const apr_pollfd_t *desc, apr_time_t now)
2017 {
2018   apr_status_t status;
2019   apr_hash_index_t *hi, *metric_hi;
2020   void *val;
2021   apr_socket_t *client, *server;
2022   apr_sockaddr_t *remotesa = NULL;
2023   char  remoteip[256];
2024   apr_pool_t *client_context = NULL;
2025   Ganglia_channel *channel;
2026 
2027   server         = desc->desc.s;
2028   /* We could also use the apr_socket_data_get/set() functions
2029    * to have per socket user data .. see APR docs */
2030   channel        = desc->client_data;
2031 
2032   /* Create a context for the client connection */
2033   apr_pool_create(&client_context, global_context);
2034 
2035   /* Accept the connection */
2036   status = apr_socket_accept(&client, server, client_context);
2037   if(status != APR_SUCCESS)
2038     {
2039       goto close_accept_socket;
2040     }
2041 
2042   /* Set the timeout for writing to the client */
2043   apr_socket_timeout_set( client, channel->timeout);
2044 
2045   apr_socket_addr_get(&remotesa, APR_REMOTE, client);
2046   /* This function is in ./lib/apr_net.c and not APR. The
2047    * APR counterpart is apr_sockaddr_ip_get() but we don't
2048    * want to malloc memory evertime we call this */
2049   apr_sockaddr_ip_buffer_get(remoteip, 256, remotesa);
2050 
2051   /* Check the ACL */
2052   if(Ganglia_acl_action( channel->acl, remotesa ) != GANGLIA_ACCESS_ALLOW)
2053     goto close_accept_socket;
2054 
2055   if ( channel->gzip_output )
2056     {
2057       z_stream *strm = zstream_new();
2058       if (strm == NULL)
2059 	{
2060 	  debug_msg("failed to allocate gzip stream");
2061 	  goto close_accept_socket;
2062 	}
2063       apr_status_t r = apr_socket_data_set(client, strm, GZIP_KEY, &zstream_destroy);
2064       if (r != APR_SUCCESS)
2065 	{
2066 	  debug_msg("failed to set socket user data");
2067 	  zstream_destroy(strm);
2068 	  goto close_accept_socket;
2069 	}
2070     }
2071 
2072   /* Print the DTD, GANGLIA_XML and CLUSTER tags */
2073   status = print_xml_header(client);
2074   if(status != APR_SUCCESS)
2075     goto close_accept_socket;
2076 
2077   /* Walk the host hash */
2078   apr_thread_mutex_lock(hosts_mutex);
2079   for(hi = apr_hash_first(client_context, hosts);
2080       hi;
2081       hi = apr_hash_next(hi))
2082     {
2083       apr_hash_this(hi, NULL, NULL, &val);
2084       status = print_host_start(client, (Ganglia_host *)val);
2085       if(status != APR_SUCCESS)
2086         {
2087           /* Release the mutex and close down the accepted socket */
2088           apr_thread_mutex_unlock(hosts_mutex);
2089           apr_socket_shutdown(client, APR_SHUTDOWN_READ);
2090           apr_socket_close(client);
2091           apr_pool_destroy(client_context);
2092           return;
2093         }
2094 
2095       /* Send the metric info for this particular host */
2096       apr_thread_mutex_lock(((Ganglia_host *)val)->mutex);
2097       for(metric_hi = apr_hash_first(client_context, ((Ganglia_host *)val)->metrics);
2098           metric_hi; metric_hi = apr_hash_next(metric_hi))
2099         {
2100           void *metric, *mval;
2101           apr_hash_this(metric_hi, NULL, NULL, &metric);
2102 
2103           mval = apr_hash_get(((Ganglia_host *)val)->gmetrics, ((Ganglia_metadata*)metric)->name, APR_HASH_KEY_STRING);
2104 
2105           /* Print each of the metrics for a host ... */
2106           if(print_host_metric(client, metric, mval, now) != APR_SUCCESS)
2107             {
2108               /* Release the mutex and close down the accepted socket */
2109               apr_thread_mutex_unlock(((Ganglia_host *)val)->mutex);
2110               apr_thread_mutex_unlock(hosts_mutex);
2111               apr_socket_shutdown(client, APR_SHUTDOWN_READ);
2112               apr_socket_close(client);
2113               apr_pool_destroy(client_context);
2114               return;
2115             }
2116         }
2117       apr_thread_mutex_unlock(((Ganglia_host *)val)->mutex);
2118 
2119       /* Close the host tag */
2120       status = print_host_end(client);
2121       if(status != APR_SUCCESS)
2122         {
2123           /* Release the mutex and close down the accepted socket */
2124           apr_thread_mutex_unlock(hosts_mutex);
2125           apr_socket_shutdown(client, APR_SHUTDOWN_READ);
2126           apr_socket_close(client);
2127           apr_pool_destroy(client_context);
2128           return;
2129         }
2130     }
2131   apr_thread_mutex_unlock(hosts_mutex);
2132 
2133   /* Close the CLUSTER and GANGLIA_XML tags */
2134   print_xml_footer(client);
2135 
2136   status = socket_flush( client, channel->gzip_output );
2137   if (status != APR_SUCCESS)
2138     {
2139       debug_msg("failed to finish compressing stream; returned '%d'",status);
2140       goto close_accept_socket;
2141     }
2142 
2143   /* Close down the accepted socket */
2144 close_accept_socket:
2145   apr_socket_shutdown(client, APR_SHUTDOWN_READ);
2146   apr_socket_close(client);
2147   apr_pool_destroy(client_context);
2148 }
2149 
2150 
2151 static void
poll_udp_listen_channels(apr_interval_time_t timeout,apr_time_t now)2152 poll_udp_listen_channels( apr_interval_time_t timeout, apr_time_t now)
2153 {
2154   apr_status_t status;
2155   const apr_pollfd_t *descs = NULL;
2156   apr_int32_t num = 0;
2157   apr_int32_t i;
2158 
2159   /* Poll for incoming UDP data */
2160   status = apr_pollset_poll(udp_listen_channels, timeout, &num, &descs);
2161   if (status != APR_SUCCESS && status != APR_TIMEUP) {
2162       char buff[128];
2163       debug_msg("apr_pollset_poll returned unexpected status %d = %s\n",
2164           status, apr_strerror(status, buff, 128));
2165       return;
2166   }
2167 
2168   for(i = 0; i< num ; i++)
2169     {
2170       Ganglia_channel *channel = descs[i].client_data;
2171       switch( channel->type )
2172         {
2173         case UDP_RECV_CHANNEL:
2174           process_udp_recv_channel(descs+i, now);
2175           udp_last_heard = apr_time_now();
2176           break;
2177         default:
2178           continue;
2179         }
2180     }
2181 }
2182 
2183 static void
poll_tcp_listen_channels(apr_interval_time_t timeout,apr_time_t now)2184 poll_tcp_listen_channels( apr_interval_time_t timeout, apr_time_t now)
2185 {
2186   apr_status_t status;
2187   const apr_pollfd_t *descs = NULL;
2188   apr_int32_t num = 0;
2189   apr_int32_t i;
2190 
2191   /* Poll for incoming TCP requests */
2192   status = apr_pollset_poll(tcp_listen_channels, timeout, &num, &descs);
2193   if (status != APR_SUCCESS && status != APR_TIMEUP) {
2194       char buff[128];
2195       debug_msg("apr_pollset_poll returned unexpected status %d = %s\n",
2196           status, apr_strerror(status, buff, 128));
2197       return;
2198   }
2199 
2200   for(i = 0; i< num ; i++)
2201     {
2202       Ganglia_channel *channel = descs[i].client_data;
2203       switch( channel->type )
2204         {
2205         case TCP_ACCEPT_CHANNEL:
2206           debug_msg("[tcp] Request for XML data received.");
2207           process_tcp_accept_channel(descs+i, now);
2208           debug_msg("[tcp] Request for XML data completed.");
2209           break;
2210         default:
2211           continue;
2212         }
2213     }
2214 }
2215 
2216 static int
tcp_send_message(char * buf,int len)2217 tcp_send_message( char *buf, int len )
2218 {
2219   /* Mirror of UDP send message for TCP channels */
2220   return 0;
2221 }
2222 
2223 static int
send_message(char * buf,int len)2224 send_message( char *buf, int len )
2225 {
2226   return Ganglia_udp_send_message(udp_send_channels, buf, len ) + tcp_send_message( buf, len );
2227 }
2228 
2229 static Ganglia_metric_callback *
Ganglia_metric_cb_define(char * name,metric_func cb,int index,mmodule * modp)2230 Ganglia_metric_cb_define(char *name, metric_func cb, int index, mmodule *modp)
2231 {
2232   Ganglia_metric_callback *metric = apr_pcalloc( global_context, sizeof(Ganglia_metric_callback));
2233   if(!metric)
2234     return NULL;
2235 
2236   metric->name = apr_pstrdup( global_context, name );
2237   if(!metric->name)
2238     return NULL;
2239 
2240   /* index is used to determine which metric to gather for multi-metric
2241      callback functions.  This is to support metric modules or handlers
2242      that have the ability to gather more than one metric. */
2243   if (index == CB_NOINDEX)
2244       metric->cb = (metric_func_void)cb;
2245   else
2246       metric->cbindexed = cb;
2247 
2248   metric->modp = modp;
2249   metric->multi_metric_index = index;
2250 
2251   apr_hash_set( metric_callbacks, metric->name, APR_HASH_KEY_STRING, metric);
2252   return metric;
2253 }
2254 
2255 g_val_t
gexec_func(void)2256 gexec_func ( void )
2257 {
2258    g_val_t val;
2259    if( gexec_on )
2260       snprintf(val.str, MAX_G_STRING_SIZE, "%s", "ON");
2261    else
2262       snprintf(val.str, MAX_G_STRING_SIZE, "%s", "OFF");
2263    return val;
2264 }
2265 
2266 g_val_t
heartbeat_func(void)2267 heartbeat_func( void )
2268 {
2269    g_val_t val;
2270    val.uint32 = started / APR_USEC_PER_SEC;
2271    return val;
2272 }
2273 
2274 g_val_t
location_func(void)2275 location_func(void)
2276 {
2277    g_val_t val;
2278    if(!host_location)
2279      {
2280        cfg_t *host = cfg_getsec(config_file, "host");
2281        host_location = cfg_getstr( host, "location");
2282      }
2283    snprintf(val.str, MAX_G_STRING_SIZE, "%s", host_location);
2284    return val;
2285 }
2286 
modular_metric_cleanup(void * param)2287 static apr_status_t modular_metric_cleanup(void *param)
2288 {
2289     mmodule *modp = (mmodule*)param;
2290     if (modp->cleanup) {
2291         modp->cleanup();
2292     }
2293     return APR_SUCCESS;
2294 }
2295 
2296 static void
load_metric_modules(void)2297 load_metric_modules( void )
2298 {
2299     cfg_t *tmp;
2300     int j;
2301 
2302     tmp = cfg_getsec( config_file, "modules");
2303     for (j = 0; j < cfg_size(tmp, "module"); j++)
2304       {
2305         apr_dso_handle_t *modHandle = NULL;
2306         apr_dso_handle_sym_t modSym;
2307         mmodule *modp;
2308         char *modPath=NULL, *modName=NULL, *modparams=NULL, *modLanguage=NULL;
2309         apr_array_header_t *modParams_list = NULL;
2310         int k, modEnabled;
2311         apr_status_t merge_ret;
2312 
2313         cfg_t *module = cfg_getnsec(tmp, "module", j);
2314 
2315         /* Check the module language to make sure that
2316            the module is loaded correctly or should be
2317            delegated to an alternate module interface
2318         */
2319         modLanguage = cfg_getstr(module, "language");
2320         if (modLanguage && strcasecmp(modLanguage, "C/C++"))
2321             continue;
2322 
2323         /* Check to make sure that the module is enabled.
2324         */
2325         modEnabled = cfg_getbool(module, "enabled");
2326         if (!modEnabled)
2327             continue;
2328 
2329         modPath = cfg_getstr(module, "path");
2330         if(modPath && *modPath != '/' && *modPath != '.')
2331           {
2332             if (module_dir)
2333                 merge_ret = apr_filepath_merge(&modPath, module_dir,
2334                                 modPath,
2335                                 APR_FILEPATH_NOTRELATIVE | APR_FILEPATH_NATIVE,
2336                                 global_context);
2337             else
2338                 merge_ret = apr_filepath_merge(&modPath, GANGLIA_MODULE_DIR,
2339                                 modPath,
2340                                 APR_FILEPATH_NOTRELATIVE | APR_FILEPATH_NATIVE,
2341                                 global_context);
2342 
2343             if (merge_ret != APR_SUCCESS)
2344                 modPath = cfg_getstr(module, "path");
2345           }
2346         modName = cfg_getstr(module, "name");
2347         modparams = cfg_getstr(module, "params");
2348         modParams_list = apr_array_make(global_context, 2, sizeof(mmparam));
2349 
2350         for (k = 0; k < cfg_size(module, "param"); k++)
2351           {
2352             cfg_t *param;
2353             mmparam *node = apr_array_push(modParams_list);
2354 
2355             param = cfg_getnsec(module, "param", k);
2356             node->name = apr_pstrdup(global_context, param->title);
2357             node->value = apr_pstrdup(global_context, cfg_getstr(param, "value"));
2358           }
2359 
2360         /*
2361          * Load the file into the gmond address space
2362          */
2363         if (apr_dso_load(&modHandle, modPath, global_context) != APR_SUCCESS)
2364           {
2365             char my_error[256];
2366 
2367             err_msg("Cannot load %s metric module: %s", modPath,
2368                      apr_dso_error(modHandle, my_error, sizeof(my_error)));
2369             if (!modPath)
2370                 err_msg("No load path specified for module: %s or incorrect module language designation [%s].\n",
2371                         modName, modLanguage);
2372             continue;
2373           }
2374         debug_msg("loaded module: %s", modName);
2375 
2376         /*
2377          * Retrieve the pointer to the module structure through the module name.
2378          */
2379         if (apr_dso_sym(&modSym, modHandle, modName) != APR_SUCCESS)
2380           {
2381             char my_error[256];
2382 
2383             err_msg("Cannot locate internal module structure '%s' in file %s: %s\nPossibly an incorrect module language designation [%s].\n",
2384                      modName, modPath, apr_dso_error(modHandle, my_error, sizeof(my_error)), modLanguage);
2385             continue;
2386           }
2387 
2388         modp = (mmodule*) modSym;
2389         modp->dynamic_load_handle = (apr_dso_handle_t *)modHandle;
2390         modp->module_name = apr_pstrdup (global_context, modName);
2391         modp->module_params = apr_pstrdup (global_context, modparams);
2392         modp->module_params_list = modParams_list;
2393         modp->config_file = config_file;
2394 
2395         /*
2396          * Make sure the found module structure is really a module structure
2397          *
2398          */
2399         if (modp->magic != MMODULE_MAGIC_COOKIE) {
2400             err_msg("Internal module structure '%s' in file %s is not compatible -"
2401                      "perhaps this is not a metric module.\n",
2402                      modName, modPath);
2403             continue;
2404         }
2405 
2406         /* Validate that the module was built against a compatible module interface API. */
2407         if (modp->version != MMODULE_MAGIC_NUMBER_MAJOR) {
2408             err_msg("Module \"%s\" is not compatible with this "
2409                     "version of Gmond (found %d, need %d).",
2410                     modName, modp->version, MMODULE_MAGIC_NUMBER_MAJOR);
2411             continue;
2412         }
2413 
2414         if (metric_modules != NULL) {
2415             modp->next = metric_modules;
2416         }
2417         metric_modules = modp;
2418       }
2419     return;
2420 }
2421 
2422 /* This function imports the metrics from libmetrics right now but in the future
2423  * we could easily do this via DSO. */
2424 static void
setup_metric_callbacks(void)2425 setup_metric_callbacks( void )
2426 {
2427   mmodule *modp = metric_modules;
2428   Ganglia_metric_callback *metric_cb;
2429 
2430   /* Create the metric_callbacks hash */
2431   metric_callbacks = apr_hash_make( global_context );
2432 
2433   while (modp) {
2434       const Ganglia_25metric* metric_info;
2435       int i;
2436 
2437       if (modp->init && modp->init(global_context)) {
2438           err_msg("Module %s failed to initialize.\n", modp->module_name);
2439       }
2440       else
2441       {
2442           apr_pool_cleanup_register(global_context, modp,
2443                                     modular_metric_cleanup,
2444                                     apr_pool_cleanup_null);
2445 
2446           metric_info = modp->metrics_info;
2447           for (i = 0; metric_info[i].name != NULL; i++)
2448             {
2449               metric_cb = Ganglia_metric_cb_define(metric_info[i].name, modp->handler, i, modp);
2450               if (metric_cb)
2451                   metric_cb->info = (Ganglia_25metric*)&(metric_info[i]);
2452             }
2453       }
2454       modp = modp->next;
2455   }
2456 }
2457 
2458 void
setup_metric_info_impl(Ganglia_metric_callback * metric_cb,int group_once,const char * name,char * title,float value_threshold,int is_dynamic)2459 setup_metric_info_impl(Ganglia_metric_callback *metric_cb, int group_once, const char *name, char *title, float value_threshold, int is_dynamic)
2460 {
2461     Ganglia_25metric *metric_info = NULL;
2462 
2463     if (metric_cb->info)
2464     {
2465         metric_info = apr_pcalloc( global_context, sizeof(Ganglia_25metric));
2466         memcpy (metric_info, metric_cb->info, sizeof(Ganglia_25metric));
2467         metric_info->key = modular_metric;
2468     }
2469 
2470     if(metric_info)
2471     {
2472         /* Build the message */
2473         switch(metric_info->type)
2474         {
2475             case GANGLIA_VALUE_UNKNOWN:
2476                 /* The 2.5.x protocol doesn't allow for unknown values. :(  Do nothing. */
2477                 return;
2478             case GANGLIA_VALUE_STRING:
2479                 metric_info->key = gmetric_string;
2480                 metric_cb->msg.Ganglia_value_msg_u.gstr.fmt = apr_pstrdup(global_context, metric_info->fmt);
2481                 break;
2482             case GANGLIA_VALUE_UNSIGNED_SHORT:
2483                 metric_info->key = gmetric_ushort;
2484                 metric_cb->msg.Ganglia_value_msg_u.gu_short.fmt = apr_pstrdup(global_context, metric_info->fmt);
2485                 break;
2486             case GANGLIA_VALUE_SHORT:
2487                 metric_info->key = gmetric_short;
2488                 metric_cb->msg.Ganglia_value_msg_u.gs_short.fmt = apr_pstrdup(global_context, metric_info->fmt);
2489                 break;
2490             case GANGLIA_VALUE_UNSIGNED_INT:
2491                 metric_info->key = gmetric_uint;
2492                 metric_cb->msg.Ganglia_value_msg_u.gu_int.fmt = apr_pstrdup(global_context, metric_info->fmt);
2493                 break;
2494             case GANGLIA_VALUE_INT:
2495                 metric_info->key = gmetric_int;
2496                 metric_cb->msg.Ganglia_value_msg_u.gs_int.fmt = apr_pstrdup(global_context, metric_info->fmt);
2497                 break;
2498             case GANGLIA_VALUE_FLOAT:
2499                 metric_info->key = gmetric_float;
2500                 metric_cb->msg.Ganglia_value_msg_u.gf.fmt = apr_pstrdup(global_context, metric_info->fmt);
2501                 break;
2502             case GANGLIA_VALUE_DOUBLE:
2503                 metric_info->key = gmetric_double;
2504                 metric_cb->msg.Ganglia_value_msg_u.gd.fmt = apr_pstrdup(global_context, metric_info->fmt);
2505                 break;
2506             default:
2507                 metric_info->key = gmetric_uint;
2508         }
2509 
2510         /* This sets the key for this particular metric.
2511          * The value is set by the callback function later */
2512         metric_cb->msg.id = metric_info->key;
2513 
2514         metric_cb->msg.Ganglia_value_msg_u.gstr.metric_id.host = apr_pstrdup(global_context, myname);
2515         metric_cb->msg.Ganglia_value_msg_u.gstr.metric_id.name = apr_pstrdup(global_context, metric_info->name);
2516 
2517         /* Replace the host metric name with the spoof data if it exists in the metadata */
2518         if (metric_info->metadata)
2519         {
2520             const char *spfhost_val, *spfname_val;
2521 
2522             spfhost_val = apr_table_get((apr_table_t *)metric_info->metadata, SPOOF_HOST);
2523             if (spfhost_val)
2524             {
2525                 metric_cb->msg.Ganglia_value_msg_u.gstr.metric_id.host = apr_pstrdup(global_context, spfhost_val);
2526                 metric_cb->msg.Ganglia_value_msg_u.gstr.metric_id.spoof = TRUE;
2527                 spfhost_val = strchr(spfhost_val,':');
2528                 if(spfhost_val)
2529                     spfhost_val++;
2530                 else
2531                     spfhost_val = metric_cb->msg.Ganglia_value_msg_u.gstr.metric_id.host;
2532             }
2533             else
2534               spfhost_val = myname;
2535             spfname_val = apr_table_get((apr_table_t *)metric_info->metadata, SPOOF_NAME);
2536             if (spfname_val)
2537             {
2538                 char *spoofedname = apr_pstrcat(global_context, spfname_val, ":", name, NULL);
2539                 char *spoofedkey = apr_pstrcat(global_context, spfname_val, ":", name, ":", spfhost_val, NULL);
2540 
2541                 metric_cb->msg.Ganglia_value_msg_u.gstr.metric_id.name = spoofedname;
2542                 metric_cb->msg.Ganglia_value_msg_u.gstr.metric_id.spoof = TRUE;
2543 
2544                 /* Reinsert the same metric_callback structure pointer under the spoofed name.
2545                     This will put the same metric info in the hash table twice but under
2546                     the spoofed name. */
2547                 apr_hash_set( metric_callbacks, spoofedkey, APR_HASH_KEY_STRING, metric_cb);
2548             }
2549         }
2550 
2551         /* Save the location of information about this particular metric */
2552         metric_cb->info   = metric_info;
2553 
2554         /* Set the value threshold for this particular metric */
2555         metric_cb->value_threshold = value_threshold;
2556 
2557         /* Fill in the title or short descriptive name of the metric if
2558          * one had been given in the configuration file. Otherwise just
2559          * copy the metric name as the title. */
2560         if (title)
2561         {
2562             metric_cb->title = apr_pstrdup(global_context, title);
2563         }
2564         else
2565         {
2566             metric_cb->title = apr_pstrdup(global_context, metric_info->name);
2567         }
2568 
2569         /* If this metric will only be collected once, run it now at setup... */
2570         if(group_once)
2571         {
2572             if (metric_cb->multi_metric_index == CB_NOINDEX)
2573                 metric_cb->now = metric_cb->cb();
2574             else
2575                 metric_cb->now = metric_cb->cbindexed(metric_cb->multi_metric_index);
2576         }
2577         else
2578         {
2579             /* ... otherwise set it to zero */
2580             memset( &(metric_cb->now), 0, sizeof(g_val_t));
2581         }
2582         memset( &(metric_cb->last), 0, sizeof(g_val_t));
2583     }
2584 
2585     return;
2586 }
2587 
2588 void
setup_metric_info(Ganglia_metric_callback * metric_cb,int group_once,cfg_t * metric_cfg,int is_dynamic)2589 setup_metric_info(Ganglia_metric_callback *metric_cb, int group_once, cfg_t *metric_cfg, int is_dynamic)
2590 {
2591     char *name            = cfg_getstr  ( metric_cfg, "name");
2592     char *title           = cfg_getstr  ( metric_cfg, "title");
2593     float value_threshold = cfg_getfloat( metric_cfg, "value_threshold");
2594 
2595     setup_metric_info_impl(metric_cb, group_once, name, title, value_threshold, is_dynamic);
2596 }
2597 
2598 #define PCRE_MAX_SUBPATTERNS 9
2599 #define PCRE_OVECCOUNT ((1 + PCRE_MAX_SUBPATTERNS) * 3)
2600 
2601 double
setup_collection_groups(void)2602 setup_collection_groups( void )
2603 {
2604   int i, num_collection_groups = cfg_size( config_file, "collection_group" );
2605   double bytes_per_sec = 0;
2606 
2607   /* Create the collection group array */
2608   collection_groups = apr_array_make( global_context, num_collection_groups,
2609                                       sizeof(Ganglia_collection_group *));
2610 
2611   for(i = 0; i < num_collection_groups; i++)
2612     {
2613       int j, num_metrics;
2614       cfg_t *group_conf;
2615       Ganglia_collection_group *group = apr_pcalloc( global_context,
2616                                                      sizeof(Ganglia_collection_group));
2617       if(!group)
2618         {
2619           err_msg("Unable to malloc memory for collection group. Exiting.\n");
2620           exit(EXIT_FAILURE);
2621         }
2622 
2623       group_conf  = cfg_getnsec( config_file, "collection_group", i);
2624       group->once = cfg_getbool( group_conf, "collect_once");
2625       group->collect_every = cfg_getint( group_conf, "collect_every");
2626       group->time_threshold = cfg_getint( group_conf, "time_threshold");
2627 
2628       if(group->once)
2629         {
2630           /* TODO: this isn't pretty but simplifies the code( next collect in a year)
2631              since we will collect the value in this function */
2632           group->next_collect = apr_time_now() + (31536000 * APR_USEC_PER_SEC);
2633         }
2634       else
2635         {
2636           group->next_collect = 0;
2637         }
2638 
2639       group->next_send    = 0;
2640 
2641       num_metrics = cfg_size( group_conf, "metric" );
2642       group->metric_array = apr_array_make(global_context, num_metrics,
2643                                            sizeof(Ganglia_metric_callback *));
2644       for(j=0; j< num_metrics; j++)
2645         {
2646           cfg_t *metric         = cfg_getnsec( group_conf, "metric", j );
2647           char *name            = cfg_getstr  ( metric, "name");
2648 #ifdef HAVE_LIBPCRE
2649           char *name_match      = cfg_getstr  ( metric, "name_match");
2650 
2651           if(name_match != NULL)
2652             {
2653               pcre *pcre_re;
2654               const char *pcre_err_ptr;
2655               int pcre_err_offset;
2656               int pcre_ovector[PCRE_OVECCOUNT];
2657               int pcre_rc;
2658 
2659               apr_hash_index_t *hi;
2660               apr_pool_t *p;
2661               void *val;
2662               const char *key;
2663               int found = 0;
2664 
2665               if((pcre_re = pcre_compile(name_match, 0, &pcre_err_ptr, &pcre_err_offset, NULL)) == NULL)
2666                 {
2667                   err_msg ("pcre_compile failed on %s\n", name_match);
2668                   exit (1);
2669                 }
2670 
2671 
2672               /* Create a sub-pool for this channel */
2673               if(apr_pool_create(&p, global_context) != APR_SUCCESS)
2674                 {
2675                   err_msg("pool creation failed\n");
2676 		  exit(EXIT_FAILURE);
2677                 }
2678 
2679               for(hi = apr_hash_first(p, metric_callbacks);
2680                   hi;
2681                   hi = apr_hash_next(hi))
2682                 {
2683                   Ganglia_metric_callback *cb;
2684 
2685                   apr_hash_this(hi, (const void**)&key, NULL, &val);
2686                   if((pcre_rc = pcre_exec(pcre_re, NULL, key, strlen(key), 0, 0, pcre_ovector, PCRE_OVECCOUNT)) < 1)
2687                     {
2688                       switch(pcre_rc)
2689                         {
2690                           case PCRE_ERROR_NOMATCH:
2691                             break;
2692                           case 0:
2693                             /* output vector not big enough */
2694                           default:
2695                             /* unexpected error */
2696                             err_msg ("unexpected pcre_exec error\n");
2697                             exit (1);
2698                         }
2699                     }
2700                   else
2701                     {
2702                       char *title_r = NULL;
2703                       char *title_tmpl = cfg_getstr  ( metric, "title");
2704                       float value_threshold = cfg_getfloat( metric, "value_threshold");
2705 
2706                       if(title_tmpl != NULL)
2707                         {
2708                           struct iovec *ptrs;
2709                           int i, k = 0, j = 0;
2710 
2711                           if((ptrs = apr_pcalloc(p, strlen(title_tmpl) * sizeof(struct iovec))) == NULL)
2712                             {
2713                               err_msg("apr_pcalloc failed\n");
2714 			      exit(EXIT_FAILURE);
2715                             }
2716                           for (i = 0; title_tmpl[i] != 0; i++)
2717                             {
2718                               char c = title_tmpl[i];
2719                               if(c == '\\')
2720                                 {
2721                                   if(i > k)
2722                                     {
2723                                       ptrs[j].iov_base = apr_pstrndup(p, title_tmpl+k, i-k);
2724                                       ptrs[j++].iov_len = i-k;
2725                                     }
2726                                   i++;
2727                                   k = i+1;
2728                                   c = title_tmpl[i];
2729                                   if(c != 0)
2730                                     {
2731                                       int pos1, pos2, index = (c - '0');
2732                                       if(index < 1 || index > PCRE_MAX_SUBPATTERNS)
2733                                         {
2734                                           err_msg("title [%s] contains invalid reference to subpattern\n", title_tmpl);
2735 					  exit(EXIT_FAILURE);
2736                                         }
2737                                       pos1 = pcre_ovector[index * 2];
2738                                       pos2 = pcre_ovector[index * 2 + 1];
2739                                       ptrs[j].iov_base = apr_pstrndup(p, key + pos1, pos2-pos1);
2740                                       ptrs[j++].iov_len = pos2-pos1;
2741                                     }
2742                                   else
2743                                     i--;
2744                                 }
2745                             }
2746 
2747                           if(i-k > 0)
2748                             {
2749                               ptrs[j].iov_base = apr_pstrndup(p, title_tmpl+k, i-k);
2750                               ptrs[j++].iov_len = i-k;
2751                             }
2752 
2753                           title_r = apr_pstrcatv(p, ptrs, j, NULL);
2754                         }
2755 
2756                       cb = val;
2757 
2758                       /* setup the metric instance */
2759                       setup_metric_info_impl (cb, group->once, key, title_r, value_threshold, 1);
2760 
2761                       if (cb->info) {
2762                           bytes_per_sec += ( (double)(cb->info->msg_size) / (double)group->time_threshold );
2763                       }
2764 
2765                       /* Push this metric onto the metric_array for this group */
2766                       *(Ganglia_metric_callback **)apr_array_push(group->metric_array) = cb;
2767                       found = 1;
2768                   }
2769                 }
2770               apr_pool_destroy(p);
2771 
2772               if (!found)
2773                   err_msg("Unable to find any metric information for '%s'. Possible that a module has not been loaded.\n", name_match);
2774 
2775             }
2776           else
2777 #endif
2778             {
2779               Ganglia_metric_callback *metric_cb =  (Ganglia_metric_callback *)
2780                         apr_hash_get( metric_callbacks, name, APR_HASH_KEY_STRING );
2781 
2782               if(!metric_cb)
2783                 {
2784                   /*
2785                    * If a metric callback was not found and the metric is dynamic,
2786                    * then search through the callback sequentially.
2787                    */
2788 
2789                   apr_hash_index_t *hi;
2790                   apr_pool_t *p;
2791                   apr_ssize_t klen = strlen(name);
2792                   void *val;
2793                   const char *key;
2794                   int found = 0;
2795 
2796                   /* Create a sub-pool for this channel */
2797                   apr_pool_create(&p, global_context);
2798 
2799                   for(hi = apr_hash_first(p, metric_callbacks);
2800                       hi;
2801                       hi = apr_hash_next(hi))
2802                     {
2803                       Ganglia_metric_callback *cb;
2804 
2805                       apr_hash_this(hi, (const void**)&key, NULL, &val);
2806 
2807                       if (strncasecmp(key, name, klen) == 0) {
2808                           cb = val;
2809                           setup_metric_info (cb, group->once, metric, 1);
2810 
2811                           if (cb->info) {
2812                               bytes_per_sec += ( (double)(cb->info->msg_size) / (double)group->time_threshold );
2813                           }
2814 
2815                           /* Push this metric onto the metric_array for this group */
2816                           *(Ganglia_metric_callback **)apr_array_push(group->metric_array) = cb;
2817                           found = 1;
2818                       }
2819                     }
2820                   apr_pool_destroy(p);
2821 
2822                   if (!found)
2823                       err_msg("Unable to find the metric information for '%s'. Possible that the module has not been loaded.\n", name);
2824                 }
2825               else
2826                 {
2827                   setup_metric_info (metric_cb, group->once, metric, 0);
2828 
2829                   if (metric_cb->info) {
2830                       bytes_per_sec += ( (double)(metric_cb->info->msg_size) / (double)group->time_threshold );
2831                   }
2832 
2833                   /* Push this metric onto the metric_array for this group */
2834                   *(Ganglia_metric_callback **)apr_array_push(group->metric_array) = metric_cb;
2835                 }
2836             }
2837           }
2838 
2839       /* Save the collection group the collection group array */
2840       *(Ganglia_collection_group **)apr_array_push(collection_groups) = group;
2841     }
2842 
2843   return bytes_per_sec;
2844 }
2845 
2846 void
Ganglia_collection_group_collect(Ganglia_collection_group * group,apr_time_t now)2847 Ganglia_collection_group_collect( Ganglia_collection_group *group, apr_time_t now)
2848 {
2849   int i;
2850 
2851   /* Collect data for all the metrics in the groups metric array */
2852   for(i=0; i< group->metric_array->nelts; i++)
2853     {
2854       Ganglia_metric_callback *cb = ((Ganglia_metric_callback **)(group->metric_array->elts))[i];
2855 
2856       debug_msg("\tmetric '%s' being collected now", cb->name);
2857       cb->last = cb->now;
2858       if (cb->multi_metric_index == CB_NOINDEX)
2859           cb->now = cb->cb();
2860       else
2861           cb->now = cb->cbindexed(cb->multi_metric_index);
2862 
2863       /* Check the value threshold.  If passed.. set this group to send immediately. */
2864       if( cb->value_threshold >= 0.0 )
2865         {
2866           debug_msg("\tmetric '%s' has value_threshold %f", cb->name, cb->value_threshold);
2867           switch(cb->info->type)
2868             {
2869             case GANGLIA_VALUE_UNKNOWN:
2870             case GANGLIA_VALUE_STRING:
2871               /* do nothing for non-numeric data */
2872               break;
2873             case GANGLIA_VALUE_UNSIGNED_SHORT:
2874               if( abs( cb->last.uint16 - cb->now.uint16 ) >= cb->value_threshold )
2875                   group->next_send = 0; /* send immediately */
2876               break;
2877             case GANGLIA_VALUE_SHORT:
2878               if( abs( cb->last.int16 - cb->now.int16 ) >= cb->value_threshold )
2879                   group->next_send = 0; /* send immediately */
2880               break;
2881             case GANGLIA_VALUE_UNSIGNED_INT:
2882               if( abs( cb->last.uint32 - cb->now.uint32 ) >= cb->value_threshold )
2883                   group->next_send = 0; /* send immediately */
2884               break;
2885             case GANGLIA_VALUE_INT:
2886               if( abs( cb->last.int32 - cb->now.int32 ) >= cb->value_threshold )
2887                   group->next_send = 0; /* send immediately */
2888               break;
2889             case GANGLIA_VALUE_FLOAT:
2890               if( fabsf( cb->last.f - cb->now.f ) >= cb->value_threshold )
2891                   group->next_send = 0; /* send immediately */
2892               break;
2893             case GANGLIA_VALUE_DOUBLE:
2894               if( fabs( cb->last.d - cb->now.d ) >= cb->value_threshold )
2895                   group->next_send = 0; /* send immediately */
2896               break;
2897             default:
2898               break;
2899             }
2900         }
2901       /* If the metadata_last_set has been set to 0 then a request
2902        *  to resend the metadata has been received. Send the group
2903        *  immediately */
2904       if (cb->metadata_last_sent == 0)
2905         {
2906           group->next_send = 0;
2907         }
2908     }
2909 
2910   /* Set the next time this group should be collected */
2911   group->next_collect = now + (group->collect_every * APR_USEC_PER_SEC);
2912 }
2913 
2914 void
Ganglia_collection_group_send(Ganglia_collection_group * group,apr_time_t now)2915 Ganglia_collection_group_send( Ganglia_collection_group *group, apr_time_t now)
2916 {
2917     int i;
2918 
2919     /* This group needs to be sent */
2920     for(i=0; i< group->metric_array->nelts; i++)
2921       {
2922         XDR x;
2923         int len, errors;
2924         char metricmsg[max_udp_message_len];
2925         Ganglia_metric_callback *cb = ((Ganglia_metric_callback **)(group->metric_array->elts))[i];
2926 
2927         /* Build the message */
2928         switch(cb->info->type)
2929           {
2930           case GANGLIA_VALUE_UNKNOWN:
2931             /* The 2.5.x protocol doesn't allow for unknown values. :(  Do nothing. */
2932             continue;
2933           case GANGLIA_VALUE_STRING:
2934             cb->msg.Ganglia_value_msg_u.gstr.str = cb->now.str;
2935             break;
2936           case GANGLIA_VALUE_UNSIGNED_SHORT:
2937             cb->msg.Ganglia_value_msg_u.gu_short.us = cb->now.uint16;
2938             break;
2939           case GANGLIA_VALUE_SHORT:
2940             cb->msg.Ganglia_value_msg_u.gs_short.ss = cb->now.int16;
2941             break;
2942           case GANGLIA_VALUE_UNSIGNED_INT:
2943             cb->msg.Ganglia_value_msg_u.gu_int.ui = cb->now.uint32;
2944             break;
2945           case GANGLIA_VALUE_INT:
2946             cb->msg.Ganglia_value_msg_u.gs_int.si = cb->now.int32;
2947             break;
2948           case GANGLIA_VALUE_FLOAT:
2949             cb->msg.Ganglia_value_msg_u.gf.f = cb->now.f;
2950             break;
2951           case GANGLIA_VALUE_DOUBLE:
2952             cb->msg.Ganglia_value_msg_u.gd.d = cb->now.d;
2953             break;
2954           default:
2955             continue;
2956           }
2957 
2958         /* Send the full metadata packet if the specified interval has elapsed or a
2959          *  request has been received to resend the metadata.  In this case the
2960          *  metadata_last_set field will be 0.  No need to send the full data
2961          *  with every value update.
2962          */
2963         if (!cb->metadata_last_sent || (send_metadata_interval &&
2964            (cb->metadata_last_sent < (now - apr_time_make(send_metadata_interval,0)))))
2965           {
2966             Ganglia_metric gmetric = Ganglia_metric_create((Ganglia_pool)global_context);
2967             char *name, *val, *type;
2968             apr_pool_t *gm_pool = (apr_pool_t*)gmetric->pool;
2969 
2970             if(!gmetric)
2971               {
2972                 /* no memory */
2973                 return;
2974               }
2975 
2976             name = cb->msg.Ganglia_value_msg_u.gstr.metric_id.name;
2977             if (override_hostname != NULL)
2978               {
2979                 cb->msg.Ganglia_value_msg_u.gstr.metric_id.host = apr_pstrcat(gm_pool, (char *)( override_ip != NULL ? override_ip : override_hostname ), ":", (char *) override_hostname, NULL);
2980                 cb->msg.Ganglia_value_msg_u.gstr.metric_id.spoof = TRUE;
2981               }
2982             val = apr_pstrdup(gm_pool, host_metric_value(cb->info, &(cb->msg)));
2983             type = apr_pstrdup(gm_pool, host_metric_type(cb->info->type));
2984 
2985             errors = Ganglia_metric_set(gmetric, name, val, type,
2986                         cb->info->units, cstr_to_slope( cb->info->slope),
2987                         cb->info->tmax, 0);
2988 
2989             if (errors)
2990               {
2991                 err_msg("Error %d setting the modular data for %s\n", errors, cb->name);
2992               }
2993             else
2994               {
2995                 Ganglia_metadata_add(gmetric, "TITLE", cb->title);
2996                 Ganglia_metadata_add(gmetric, "DESC", cb->info->desc);
2997 
2998                 /* Add the rest of the metadata here by interating through
2999                  *  the metadata table of the metric_info structure */
3000                 if (cb->info->metadata)
3001                   {
3002                     int i;
3003                     const apr_array_header_t *arr = apr_table_elts((apr_table_t*)cb->info->metadata);
3004                     const apr_table_entry_t *elts = (const apr_table_entry_t *)arr->elts;
3005 
3006                     /* add all of the metadata to the packet */
3007                     for (i = 0; i < arr->nelts; ++i)
3008                       {
3009                         if (elts[i].key == NULL)
3010                             continue;
3011                         Ganglia_metadata_add(gmetric, elts[i].key, elts[i].val);
3012                       }
3013                   }
3014 
3015                 debug_msg("\tsending metadata for metric: %s", cb->name);
3016 
3017                 ganglia_scoreboard_inc(PKTS_SENT_METADATA);
3018                 ganglia_scoreboard_inc(PKTS_SENT_ALL);
3019                 if (override_hostname != NULL)
3020                   {
3021                     errors = Ganglia_metadata_send_real(gmetric, udp_send_channels, cb->msg.Ganglia_value_msg_u.gstr.metric_id.host);
3022                   }
3023                 else
3024                   {
3025                     errors = Ganglia_metadata_send(gmetric, udp_send_channels);
3026                   }
3027                 if (errors)
3028                   {
3029                     err_msg("Error %d sending the modular data for %s\n", errors, cb->name);
3030                     debug_msg("\tsent message '%s' with %d errors", cb->name, errors);
3031                     ganglia_scoreboard_inc(PKTS_SENT_FAILED);
3032                   }
3033                 else
3034                   {
3035                     cb->metadata_last_sent = now; /* mark the metadata as sent */
3036                   }
3037               }
3038 
3039             Ganglia_metric_destroy(gmetric);
3040           }
3041 
3042         /* Send the updated value packet ever time it is collected */
3043         xdrmem_create(&x, metricmsg, max_udp_message_len, XDR_ENCODE);
3044         xdr_Ganglia_value_msg(&x, &(cb->msg));
3045         len = xdr_getpos(&x);
3046         errors = send_message( metricmsg, len );
3047         debug_msg("\tsent message '%s' of length %d with %d errors", cb->name, len, errors);
3048         ganglia_scoreboard_inc(PKTS_SENT_VALUE);
3049         ganglia_scoreboard_inc(PKTS_SENT_ALL);
3050 
3051         if(!errors)
3052           {
3053             /* If the message send ok. Schedule the next time threshold. */
3054             group->next_send = now + (group->time_threshold * APR_USEC_PER_SEC);
3055           }
3056         else
3057             ganglia_scoreboard_inc(PKTS_SENT_FAILED);
3058       }
3059 }
3060 
3061 /* TODO: It might be necessary in the future to use a heap for the collection groups.
3062  * Running through an array should suffice for now */
3063 apr_time_t
process_collection_groups(apr_time_t now)3064 process_collection_groups( apr_time_t now )
3065 {
3066   int i;
3067   apr_time_t next = 0;
3068 
3069   /* Run through each collection group and collect any data that needs collecting... */
3070   for(i=0; i< collection_groups->nelts; i++)
3071     {
3072       Ganglia_collection_group *group = ((Ganglia_collection_group **)(collection_groups->elts))[i];
3073       if(group->next_collect <= now)
3074         {
3075           Ganglia_collection_group_collect(group, now);
3076         }
3077     }
3078 
3079   /* Run through each collection group and send any data that needs sending... */
3080   for(i=0; i< collection_groups->nelts; i++)
3081     {
3082       Ganglia_collection_group *group = ((Ganglia_collection_group **)(collection_groups->elts))[i];
3083       if( group->next_send <= now )
3084         {
3085           Ganglia_collection_group_send(group, now);
3086         }
3087     }
3088 
3089   /* Run through each collection group and find when our next event (collect|send) occurs */
3090   for(i=0; i< collection_groups->nelts; i++)
3091     {
3092       apr_time_t min;
3093       Ganglia_collection_group *group = ((Ganglia_collection_group **)(collection_groups->elts))[i];
3094       min = group->next_send < group->next_collect? group->next_send : group->next_collect;
3095       if(!next)
3096         {
3097           next = min;
3098         }
3099       else
3100         {
3101           if(min < next)
3102             {
3103               next = min;
3104             }
3105         }
3106     }
3107 
3108   /* make sure we don't schedule for the past */
3109   return next < now ? now + 1 * APR_USEC_PER_SEC: next;
3110 }
3111 
3112 static void
print_metric_list(void)3113 print_metric_list( void )
3114 {
3115   apr_hash_index_t *hi;
3116   void *val;
3117   char modular_desc[1024];
3118 
3119   for(hi = apr_hash_first(global_context, metric_callbacks);
3120       hi;
3121       hi = apr_hash_next(hi))
3122     {
3123       Ganglia_metric_callback *cb;
3124       Ganglia_25metric *metric_info;
3125       char *desc = NULL;
3126 
3127       apr_hash_this(hi, NULL, NULL, &val);
3128       cb = val;
3129       metric_info = NULL;
3130 
3131       if (cb->modp)
3132         {
3133           int i;
3134 
3135           metric_info = (Ganglia_25metric *)cb->modp->metrics_info;
3136           for (i = 0; metric_info[i].name != NULL; i++)
3137             {
3138               if (strcasecmp(cb->name,  metric_info[i].name) == 0)
3139                 {
3140                   snprintf (modular_desc, sizeof(modular_desc),
3141                             "%s (module %s)",
3142                             metric_info[i].desc,
3143                             cb->modp->module_name);
3144 
3145                   desc = (char*)modular_desc;
3146                   break;
3147                 }
3148             }
3149         }
3150 
3151       if (desc == NULL)
3152         {
3153           desc = "<no description available>";
3154         }
3155 
3156       fprintf(stdout, "%-15s\t%s\n", cb->name, desc);
3157     }
3158 }
3159 
3160 static void
cleanup_data(apr_pool_t * pool,apr_time_t now)3161 cleanup_data( apr_pool_t *pool, apr_time_t now )
3162 {
3163   apr_hash_index_t *hi, *metric_hi;
3164 
3165   /* Walk the host hash */
3166   apr_thread_mutex_lock(hosts_mutex);
3167   for(hi = apr_hash_first(pool, hosts);
3168       hi;
3169       hi = apr_hash_next(hi))
3170     {
3171       void *val;
3172       Ganglia_host *host;
3173       apr_hash_this(hi, NULL, NULL, &val);
3174       host = val;
3175 
3176       if( host_dmax && (now - host->last_heard_from) > (host_dmax * APR_USEC_PER_SEC) )
3177         {
3178           /* this host is older than dmax... delete it */
3179           debug_msg("deleting old host '%s' from host hash'", host->hostname);
3180           /* remove it from the hash */
3181           apr_hash_set( hosts, host->ip, APR_HASH_KEY_STRING, NULL);
3182           /* free all its memory */
3183           if(host->location)
3184           {
3185             free(host->location);
3186           }
3187           apr_pool_destroy( host->pool);
3188         }
3189       else
3190         {
3191           /* this host isn't being deleted but it might have some stale gmetric data */
3192 	  apr_thread_mutex_lock(host->mutex);
3193           for( metric_hi = apr_hash_first( pool, host->metrics );
3194                metric_hi;
3195                metric_hi = apr_hash_next( metric_hi ))
3196             {
3197               void *val;
3198               Ganglia_metadata *metric;
3199               int dmax;
3200 
3201               apr_hash_this( metric_hi, NULL, NULL, &val );
3202               metric = val;
3203 
3204               if(!metric || metric->message_u.f_message.id != gmetadata_full)
3205                   continue;  /* this shouldn't happen */
3206 
3207               dmax = metric->message_u.f_message.Ganglia_metadata_msg_u.gfull.metric.dmax;
3208               if( dmax && (now - metric->last_heard_from) > (dmax * APR_USEC_PER_SEC) )
3209                 {
3210                   Ganglia_metadata *value_metric = (Ganglia_metadata *)apr_hash_get( host->gmetrics,
3211                                                                                      metric->name,
3212                                                                                      APR_HASH_KEY_STRING);
3213                   /* this is a stale gmetric */
3214                   debug_msg("deleting old metric '%s' from host '%s'", metric->name, host->hostname);
3215 
3216                   /* remove the metric from the metric and values hash */
3217                   apr_hash_set( host->metrics, metric->name, APR_HASH_KEY_STRING, NULL);
3218                   apr_hash_set( host->gmetrics, metric->name, APR_HASH_KEY_STRING, NULL);
3219                   /* destroy any memory that was allocated for this gmetric */
3220                   apr_pool_destroy( metric->pool );
3221                   if (value_metric)
3222                   {
3223                     apr_pool_destroy( value_metric->pool );
3224                   }
3225                 }
3226             }
3227 	  apr_thread_mutex_unlock(host->mutex);
3228         }
3229     }
3230   apr_thread_mutex_unlock(hosts_mutex);
3231 
3232   apr_pool_clear( pool );
3233 }
3234 
initialize_scoreboard()3235 void initialize_scoreboard()
3236 {
3237     ganglia_scoreboard_init(global_context);
3238     ganglia_scoreboard_add(PKTS_RECVD_ALL, GSB_READ_RESET);
3239     ganglia_scoreboard_add(PKTS_RECVD_FAILED, GSB_READ_RESET);
3240     ganglia_scoreboard_add(PKTS_RECVD_IGNORED, GSB_READ_RESET);
3241     ganglia_scoreboard_add(PKTS_RECVD_METADATA, GSB_READ_RESET);
3242     ganglia_scoreboard_add(PKTS_RECVD_VALUE, GSB_READ_RESET);
3243     ganglia_scoreboard_add(PKTS_RECVD_REQUEST, GSB_READ_RESET);
3244     ganglia_scoreboard_add(PKTS_SENT_ALL, GSB_READ_RESET);
3245     ganglia_scoreboard_add(PKTS_SENT_FAILED, GSB_READ_RESET);
3246     ganglia_scoreboard_add(PKTS_SENT_METADATA, GSB_READ_RESET);
3247     ganglia_scoreboard_add(PKTS_SENT_VALUE, GSB_READ_RESET);
3248     ganglia_scoreboard_add(PKTS_SENT_REQUEST, GSB_READ_RESET);
3249 }
3250 
3251 int done = 0;
3252 int reload_required = 0;
3253 
set_reload_required()3254 void set_reload_required()
3255  {
3256      done = 1;
3257      reload_required = 1;
3258  }
3259 
sig_handler(int i)3260 void sig_handler(int i)
3261 {
3262     switch(i)
3263     {
3264     case SIGHUP:
3265         set_reload_required();
3266         break;
3267     default:
3268         done = 1;
3269     }
3270 }
3271 
tcp_listener(apr_thread_t * thd,void * data)3272 static void* APR_THREAD_FUNC tcp_listener(apr_thread_t *thd, void *data)
3273 {
3274   apr_time_t now;
3275   apr_interval_time_t wait = 100 * 1000; // 100ms
3276 
3277   if (tcp_listen_channels == NULL)
3278     return NULL;
3279 
3280   debug_msg("[tcp] Starting TCP listener thread...");
3281   for(;!done;)
3282     {
3283       now = apr_time_now();
3284       /* Pull in incoming data */
3285       poll_tcp_listen_channels(wait, now);
3286     }
3287     apr_thread_exit(thd, APR_SUCCESS);
3288 
3289     return NULL;
3290 }
3291 
3292 int
main(int argc,char * argv[])3293 main ( int argc, char *argv[] )
3294 {
3295   apr_time_t now, next_collection, last_cleanup;
3296   apr_pool_t *cleanup_context;
3297 
3298   gmond_argv = argv;
3299 
3300   if (cmdline_parser (argc, argv, &args_info) != 0)
3301       exit(EXIT_FAILURE);
3302 
3303   if(args_info.convert_given)
3304     {
3305       exit (print_ganglia_25_config( args_info.convert_arg ));
3306     }
3307 
3308   /* Create the global context */
3309   global_context = (apr_pool_t*)Ganglia_pool_create(NULL);
3310 
3311   /* Create the cleanup context from the global context */
3312   cleanup_context = (apr_pool_t*)Ganglia_pool_create((Ganglia_pool)global_context);
3313 
3314   /* Mark the time this gmond started */
3315   started = apr_time_now();
3316 
3317   /* Builds a default configuration based on platform */
3318   build_default_gmond_configuration((Ganglia_pool)global_context);
3319 
3320   if(args_info.default_config_flag)
3321     {
3322       fprintf(stdout, "%s", default_gmond_configuration);
3323       fflush( stdout );
3324       exit(EXIT_SUCCESS);
3325     }
3326 
3327   process_configuration_file();
3328 
3329 #ifdef SFLOW
3330   sflow_udp_port = init_sflow(config_file);
3331 #endif
3332 
3333   /* Should over-ride any value from the configuration file */
3334   if(args_info.location_given)
3335     {
3336       host_location = args_info.location_arg;
3337     }
3338 
3339   load_metric_modules();
3340 
3341   if(args_info.metrics_flag)
3342     {
3343       initialize_scoreboard();
3344       setup_metric_callbacks();
3345       print_metric_list();
3346       fflush( stdout );
3347       exit(EXIT_SUCCESS);
3348     }
3349 
3350   if(args_info.bandwidth_flag)
3351     {
3352       double bytes_per_sec;
3353       setup_metric_callbacks();
3354       bytes_per_sec = setup_collection_groups();
3355       fprintf(stdout, "%f bytes/sec\n", bytes_per_sec);
3356       exit(EXIT_SUCCESS);
3357     }
3358 
3359   daemonize_if_necessary( argv );
3360   if (args_info.pid_file_given)
3361     {
3362       update_pidfile (args_info.pid_file_arg);
3363     }
3364 
3365   /* Collect my hostname */
3366   apr_gethostname( myname, APRMAXHOSTLEN+1, global_context);
3367 
3368   apr_signal( SIGPIPE, SIG_IGN );
3369   apr_signal( SIGINT, sig_handler );
3370   apr_signal( SIGHUP, sig_handler );
3371   apr_signal( SIGTERM, sig_handler );
3372 
3373   initialize_scoreboard();
3374 
3375   /* This must occur before we setuid_if_necessary() particularly on freebsd
3376    * where we need to be root to access /dev/mem to initialize metric collection */
3377   setup_metric_callbacks();
3378 
3379   setuid_if_necessary();
3380 
3381   process_deaf_mute_mode();
3382   process_allow_extra_data_mode();
3383 
3384   if(!deaf)
3385     {
3386       setup_listen_channels_pollset();
3387     }
3388 
3389   /* even if mute, a send channel may be needed to send a request for metadata */
3390   udp_send_channels = Ganglia_udp_send_channels_create((Ganglia_pool)global_context,
3391                                                        (Ganglia_gmond_config)config_file);
3392   if(!udp_send_channels)
3393     {
3394       /* if there are no send channels defined, we are equivalent to mute */
3395       mute = 1;
3396     }
3397   if(!mute)
3398     {
3399       setup_collection_groups();
3400     }
3401 
3402   /* Create the host hash table */
3403   hosts = apr_hash_make( global_context );
3404 
3405   /* Create the hosts mutex */
3406   if (apr_thread_mutex_create(&hosts_mutex, APR_THREAD_MUTEX_DEFAULT, global_context) != APR_SUCCESS)
3407     {
3408       err_msg("Failed to create thread mutex. Exiting.\n");
3409       exit(EXIT_FAILURE);
3410     }
3411 
3412   /* Initialize time variables */
3413   udp_last_heard = last_cleanup = next_collection = now = apr_time_now();
3414 
3415   /* Create TCP listener thread */
3416   if(!deaf)
3417     {
3418       apr_thread_t *thread;
3419       if (apr_thread_create(&thread, NULL, tcp_listener, NULL, global_context) != APR_SUCCESS)
3420         {
3421           err_msg("Failed to create TCP listener thread. Exiting.\n");
3422           exit(EXIT_FAILURE);
3423         }
3424     }
3425 
3426   /* Loop */
3427   for(;!done;)
3428     {
3429       /* Make sure we never wait for negative seconds (shouldn't happen) */
3430       apr_interval_time_t wait = next_collection >= now ? next_collection - now : 1;
3431 
3432       if (udp_listen_channels != NULL)
3433         {
3434           /* Pull in incoming data */
3435           poll_udp_listen_channels(wait, now);
3436         }
3437       else
3438         {
3439           /* Sleep until next collection */
3440           apr_sleep( wait );
3441         }
3442 
3443       /* only continue if it's time to process our collection groups */
3444       now = apr_time_now();
3445       if(now < next_collection)
3446           continue;
3447 
3448       if(!deaf)
3449         {
3450           /* if we went deaf, re-subscribe to the multicast channel */
3451           if ((now - udp_last_heard) > 60 * APR_USEC_PER_SEC)
3452             {
3453               /* FIXME: maybe this should be done for the affected
3454                         channel only? */
3455               reset_mcast_channels();
3456               /* reset the timer */
3457               udp_last_heard = now;
3458             }
3459 
3460           /* cleanup the data if the cleanup threshold has been met */
3461           if( (now - last_cleanup) > apr_time_make(cleanup_threshold,0))
3462             {
3463               cleanup_data( cleanup_context, now );
3464               last_cleanup = now;
3465             }
3466         }
3467 
3468       if(!mute)
3469         {
3470           /* collect data from collection_groups */
3471           next_collection = process_collection_groups( now );
3472         }
3473       else
3474         {
3475           /* we're mute. nothing to collect and send. */
3476           next_collection = now + 60 * APR_USEC_PER_SEC;
3477         }
3478     }
3479 
3480   apr_pool_destroy(global_context);
3481 
3482   if(reload_required == 1)
3483     reload_ganglia_configuration();
3484 
3485   return 0;
3486 }
3487