1 /* Licensed to the Apache Software Foundation (ASF) under one or more
2  * contributor license agreements.  See the NOTICE file distributed with
3  * this work for additional information regarding copyright ownership.
4  * The ASF licenses this file to You under the Apache License, Version 2.0
5  * (the "License"); you may not use this file except in compliance with
6  * the License.  You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "httpd.h"
18 #include "http_config.h"
19 #include "http_log.h"
20 #include "http_core.h"
21 #include "http_protocol.h"
22 #include "apr_strings.h"
23 #include "apr_hash.h"
24 #include "apr_time.h"
25 #include "ap_mpm.h"
26 #include "scoreboard.h"
27 #include "mod_watchdog.h"
28 #include "ap_slotmem.h"
29 #include "heartbeat.h"
30 
31 
32 #ifndef HM_UPDATE_SEC
33 /* How often we update the stats file */
34 /* TODO: Make a runtime config */
35 #define HM_UPDATE_SEC (5)
36 #endif
37 
38 #define HM_WATHCHDOG_NAME ("_heartmonitor_")
39 
40 static const ap_slotmem_provider_t *storage = NULL;
41 static ap_slotmem_instance_t *slotmem = NULL;
42 static int maxworkers = 0;
43 
44 module AP_MODULE_DECLARE_DATA heartmonitor_module;
45 
46 typedef struct hm_server_t
47 {
48     const char *ip;
49     int busy;
50     int ready;
51     unsigned int port;
52     apr_time_t seen;
53 } hm_server_t;
54 
55 typedef struct hm_ctx_t
56 {
57     int active;
58     const char *storage_path;
59     ap_watchdog_t *watchdog;
60     apr_interval_time_t interval;
61     apr_sockaddr_t *mcast_addr;
62     apr_status_t status;
63     volatile int keep_running;
64     apr_socket_t *sock;
65     apr_pool_t *p;
66     apr_hash_t *servers;
67     server_rec *s;
68 } hm_ctx_t;
69 
70 typedef struct hm_slot_server_ctx_t {
71   hm_server_t *s;
72   int found;
73   unsigned int item_id;
74 } hm_slot_server_ctx_t;
75 
hm_listen(hm_ctx_t * ctx)76 static apr_status_t hm_listen(hm_ctx_t *ctx)
77 {
78     apr_status_t rv;
79 
80     rv = apr_socket_create(&ctx->sock, ctx->mcast_addr->family,
81                            SOCK_DGRAM, APR_PROTO_UDP, ctx->p);
82 
83     if (rv) {
84         ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ctx->s, APLOGNO(02068)
85                      "Failed to create listening socket.");
86         return rv;
87     }
88 
89     rv = apr_socket_opt_set(ctx->sock, APR_SO_REUSEADDR, 1);
90     if (rv) {
91         ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ctx->s, APLOGNO(02069)
92                      "Failed to set APR_SO_REUSEADDR to 1 on socket.");
93         return rv;
94     }
95 
96 
97     rv = apr_socket_opt_set(ctx->sock, APR_SO_NONBLOCK, 1);
98     if (rv) {
99         ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ctx->s, APLOGNO(02070)
100                      "Failed to set APR_SO_NONBLOCK to 1 on socket.");
101         return rv;
102     }
103 
104     rv = apr_socket_bind(ctx->sock, ctx->mcast_addr);
105     if (rv) {
106         ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ctx->s, APLOGNO(02071)
107                      "Failed to bind on socket.");
108         return rv;
109     }
110 
111     rv = apr_mcast_join(ctx->sock, ctx->mcast_addr, NULL, NULL);
112 
113     if (rv) {
114         ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ctx->s, APLOGNO(02072)
115                      "Failed to join multicast group");
116         return rv;
117     }
118 
119     rv = apr_mcast_loopback(ctx->sock, 1);
120     if (rv) {
121         ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ctx->s, APLOGNO(02073)
122                      "Failed to accept localhost mulitcast on socket.");
123         return rv;
124     }
125 
126     return APR_SUCCESS;
127 }
128 
129 /* XXX: The same exists in mod_lbmethod_heartbeat.c where it is named argstr_to_table */
qs_to_table(const char * input,apr_table_t * parms,apr_pool_t * p)130 static void qs_to_table(const char *input, apr_table_t *parms,
131                         apr_pool_t *p)
132 {
133     char *key;
134     char *value;
135     char *query_string;
136     char *strtok_state;
137 
138     if (input == NULL) {
139         return;
140     }
141 
142     query_string = apr_pstrdup(p, input);
143 
144     key = apr_strtok(query_string, "&", &strtok_state);
145     while (key) {
146         value = strchr(key, '=');
147         if (value) {
148             *value = '\0';      /* Split the string in two */
149             value++;            /* Skip passed the = */
150         }
151         else {
152             value = "1";
153         }
154         ap_unescape_url(key);
155         ap_unescape_url(value);
156         apr_table_set(parms, key, value);
157         /*
158            ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, APLOGNO(03182)
159            "Found query arg: %s = %s", key, value);
160          */
161         key = apr_strtok(NULL, "&", &strtok_state);
162     }
163 }
164 
165 
166 #define SEEN_TIMEOUT (30)
167 
168 /* Store in the slotmem */
hm_update(void * mem,void * data,apr_pool_t * p)169 static apr_status_t hm_update(void* mem, void *data, apr_pool_t *p)
170 {
171     hm_slot_server_t *old = (hm_slot_server_t *) mem;
172     hm_slot_server_ctx_t *s = (hm_slot_server_ctx_t *) data;
173     hm_server_t *new = s->s;
174     if (strncmp(old->ip, new->ip, MAXIPSIZE)==0) {
175         s->found = 1;
176         old->busy = new->busy;
177         old->ready = new->ready;
178         old->seen = new->seen;
179     }
180     return APR_SUCCESS;
181 }
182 /* Read the id corresponding to the entry in the slotmem */
hm_readid(void * mem,void * data,apr_pool_t * p)183 static apr_status_t hm_readid(void* mem, void *data, apr_pool_t *p)
184 {
185     hm_slot_server_t *old = (hm_slot_server_t *) mem;
186     hm_slot_server_ctx_t *s = (hm_slot_server_ctx_t *) data;
187     hm_server_t *new = s->s;
188     if (strncmp(old->ip, new->ip, MAXIPSIZE)==0) {
189         s->found = 1;
190         s->item_id = old->id;
191     }
192     return APR_SUCCESS;
193 }
194 /* update the entry or create it if not existing */
hm_slotmem_update_stat(hm_server_t * s,apr_pool_t * pool)195 static  apr_status_t  hm_slotmem_update_stat(hm_server_t *s, apr_pool_t *pool)
196 {
197     /* We call do_all (to try to update) otherwise grab + put */
198     hm_slot_server_ctx_t ctx;
199     ctx.s = s;
200     ctx.found = 0;
201     storage->doall(slotmem, hm_update, &ctx, pool);
202     if (!ctx.found) {
203         unsigned int i;
204         hm_slot_server_t hmserver;
205         memcpy(hmserver.ip, s->ip, MAXIPSIZE);
206         hmserver.busy = s->busy;
207         hmserver.ready = s->ready;
208         hmserver.seen = s->seen;
209         /* XXX locking for grab() / put() */
210         storage->grab(slotmem, &i);
211         hmserver.id = i;
212         storage->put(slotmem, i, (unsigned char *)&hmserver, sizeof(hmserver));
213     }
214     return APR_SUCCESS;
215 }
hm_slotmem_remove_stat(hm_server_t * s,apr_pool_t * pool)216 static  apr_status_t  hm_slotmem_remove_stat(hm_server_t *s, apr_pool_t *pool)
217 {
218     hm_slot_server_ctx_t ctx;
219     ctx.s = s;
220     ctx.found = 0;
221     storage->doall(slotmem, hm_readid, &ctx, pool);
222     if (ctx.found) {
223         storage->release(slotmem, ctx.item_id);
224     }
225     return APR_SUCCESS;
226 }
hm_file_update_stat(hm_ctx_t * ctx,hm_server_t * s,apr_pool_t * pool)227 static apr_status_t hm_file_update_stat(hm_ctx_t *ctx, hm_server_t *s, apr_pool_t *pool)
228 {
229     apr_status_t rv;
230     apr_file_t *fp;
231     apr_file_t *fpin;
232     apr_time_t now;
233     apr_time_t fage;
234     apr_finfo_t fi;
235     int updated = 0;
236     char *path = apr_pstrcat(pool, ctx->storage_path, ".tmp.XXXXXX", NULL);
237 
238 
239     /* TODO: Update stats file (!) */
240     rv = apr_file_mktemp(&fp, path, APR_CREATE | APR_WRITE, pool);
241 
242     if (rv) {
243         ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ctx->s, APLOGNO(02074)
244                      "Unable to open tmp file: %s", path);
245         return rv;
246     }
247     rv = apr_file_open(&fpin, ctx->storage_path, APR_READ|APR_BINARY|APR_BUFFERED,
248                        APR_OS_DEFAULT, pool);
249 
250     now = apr_time_now();
251     if (rv == APR_SUCCESS) {
252         char *t;
253         apr_table_t *hbt = apr_table_make(pool, 10);
254         apr_bucket_alloc_t *ba;
255         apr_bucket_brigade *bb;
256         apr_bucket_brigade *tmpbb;
257 
258         rv = apr_file_info_get(&fi, APR_FINFO_SIZE | APR_FINFO_MTIME, fpin);
259         if (rv) {
260             ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ctx->s, APLOGNO(02075)
261                          "Unable to read file: %s", ctx->storage_path);
262             return rv;
263         }
264 
265         /* Read the file and update the line corresponding to the node */
266         ba = apr_bucket_alloc_create(pool);
267         bb = apr_brigade_create(pool, ba);
268         apr_brigade_insert_file(bb, fpin, 0, fi.size, pool);
269         tmpbb = apr_brigade_create(pool, ba);
270         fage = apr_time_sec(now - fi.mtime);
271         do {
272             char buf[4096];
273             const char *ip;
274             apr_size_t bsize = sizeof(buf);
275 
276             apr_brigade_cleanup(tmpbb);
277             if (APR_BRIGADE_EMPTY(bb)) {
278                 break;
279             }
280             rv = apr_brigade_split_line(tmpbb, bb,
281                                         APR_BLOCK_READ, sizeof(buf));
282 
283             if (rv) {
284                 ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ctx->s, APLOGNO(02076)
285                              "Unable to read from file: %s", ctx->storage_path);
286                 return rv;
287             }
288 
289             apr_brigade_flatten(tmpbb, buf, &bsize);
290             if (bsize == 0) {
291                 break;
292             }
293             buf[bsize - 1] = 0;
294             t = strchr(buf, ' ');
295             if (t) {
296                 ip = apr_pstrmemdup(pool, buf, t - buf);
297             }
298             else {
299                 ip = NULL;
300             }
301 
302             if (!ip || buf[0] == '#') {
303                 /* copy things we can't process */
304                 apr_file_printf(fp, "%s\n", buf);
305             }
306             else if (strcmp(ip, s->ip) != 0 ) {
307                 hm_server_t node;
308                 apr_time_t seen;
309                 const char *val;
310 
311                 /* Update seen time according to the last file modification */
312                 apr_table_clear(hbt);
313                 qs_to_table(apr_pstrdup(pool, t), hbt, pool);
314                 if ((val = apr_table_get(hbt, "busy"))) {
315                     node.busy = atoi(val);
316                 }
317                 else {
318                     node.busy = 0;
319                 }
320 
321                 if ((val = apr_table_get(hbt, "ready"))) {
322                     node.ready = atoi(val);
323                 }
324                 else {
325                     node.ready = 0;
326                 }
327 
328                 if ((val = apr_table_get(hbt, "lastseen"))) {
329                     node.seen = atoi(val);
330                 }
331                 else {
332                     node.seen = SEEN_TIMEOUT;
333                 }
334                 seen = fage + node.seen;
335 
336                 if ((val = apr_table_get(hbt, "port"))) {
337                     node.port = atoi(val);
338                 }
339                 else {
340                     node.port = 80;
341                 }
342                 apr_file_printf(fp, "%s &ready=%u&busy=%u&lastseen=%u&port=%u\n",
343                                 ip, node.ready, node.busy, (unsigned int) seen, node.port);
344             }
345             else {
346                 apr_time_t seen;
347                 seen = apr_time_sec(now - s->seen);
348                 apr_file_printf(fp, "%s &ready=%u&busy=%u&lastseen=%u&port=%u\n",
349                                 s->ip, s->ready, s->busy, (unsigned int) seen, s->port);
350                 updated = 1;
351             }
352         } while (1);
353     }
354 
355     if (!updated) {
356         apr_time_t seen;
357         seen = apr_time_sec(now - s->seen);
358         apr_file_printf(fp, "%s &ready=%u&busy=%u&lastseen=%u&port=%u\n",
359                         s->ip, s->ready, s->busy, (unsigned int) seen, s->port);
360     }
361 
362     rv = apr_file_flush(fp);
363     if (rv) {
364       ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ctx->s, APLOGNO(02077)
365                    "Unable to flush file: %s", path);
366       return rv;
367     }
368 
369     rv = apr_file_close(fp);
370     if (rv) {
371       ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ctx->s, APLOGNO(02078)
372                    "Unable to close file: %s", path);
373       return rv;
374     }
375 
376     rv = apr_file_perms_set(path,
377                             APR_FPROT_UREAD | APR_FPROT_GREAD |
378                             APR_FPROT_WREAD);
379     if (rv && rv != APR_INCOMPLETE && rv != APR_ENOTIMPL) {
380         ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ctx->s, APLOGNO(02079)
381                      "Unable to set file permissions on %s",
382                      path);
383         return rv;
384     }
385 
386     rv = apr_file_rename(path, ctx->storage_path, pool);
387 
388     if (rv) {
389         ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ctx->s, APLOGNO(02080)
390                      "Unable to move file: %s -> %s", path,
391                      ctx->storage_path);
392         return rv;
393     }
394 
395     return APR_SUCCESS;
396 }
hm_update_stat(hm_ctx_t * ctx,hm_server_t * s,apr_pool_t * pool)397 static  apr_status_t  hm_update_stat(hm_ctx_t *ctx, hm_server_t *s, apr_pool_t *pool)
398 {
399     if (slotmem)
400         return hm_slotmem_update_stat(s, pool);
401     else
402         return hm_file_update_stat(ctx, s, pool);
403 }
404 
405 /* Store in a file */
hm_file_update_stats(hm_ctx_t * ctx,apr_pool_t * p)406 static apr_status_t hm_file_update_stats(hm_ctx_t *ctx, apr_pool_t *p)
407 {
408     apr_status_t rv;
409     apr_file_t *fp;
410     apr_hash_index_t *hi;
411     apr_time_t now;
412     char *path = apr_pstrcat(p, ctx->storage_path, ".tmp.XXXXXX", NULL);
413     /* TODO: Update stats file (!) */
414     rv = apr_file_mktemp(&fp, path, APR_CREATE | APR_WRITE, p);
415 
416     if (rv) {
417         ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ctx->s, APLOGNO(02081)
418                      "Unable to open tmp file: %s", path);
419         return rv;
420     }
421 
422     now = apr_time_now();
423     for (hi = apr_hash_first(p, ctx->servers);
424          hi != NULL; hi = apr_hash_next(hi)) {
425         hm_server_t *s = NULL;
426         apr_time_t seen;
427         apr_hash_this(hi, NULL, NULL, (void **) &s);
428         seen = apr_time_sec(now - s->seen);
429         if (seen > SEEN_TIMEOUT) {
430             /*
431              * Skip this entry from the heartbeat file -- when it comes back,
432              * we will reuse the memory...
433              */
434         }
435         else {
436             apr_file_printf(fp, "%s &ready=%u&busy=%u&lastseen=%u&port=%u\n",
437                             s->ip, s->ready, s->busy, (unsigned int) seen, s->port);
438         }
439     }
440 
441     rv = apr_file_flush(fp);
442     if (rv) {
443       ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ctx->s, APLOGNO(02082)
444                    "Unable to flush file: %s", path);
445       return rv;
446     }
447 
448     rv = apr_file_close(fp);
449     if (rv) {
450       ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ctx->s, APLOGNO(02083)
451                    "Unable to close file: %s", path);
452       return rv;
453     }
454 
455     rv = apr_file_perms_set(path,
456                             APR_FPROT_UREAD | APR_FPROT_GREAD |
457                             APR_FPROT_WREAD);
458     if (rv && rv != APR_INCOMPLETE && rv != APR_ENOTIMPL) {
459         ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ctx->s, APLOGNO(02084)
460                      "Unable to set file permissions on %s",
461                      path);
462         return rv;
463     }
464 
465     rv = apr_file_rename(path, ctx->storage_path, p);
466 
467     if (rv) {
468         ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ctx->s, APLOGNO(02085)
469                      "Unable to move file: %s -> %s", path,
470                      ctx->storage_path);
471         return rv;
472     }
473 
474     return APR_SUCCESS;
475 }
476 /* Store in a slotmem */
hm_slotmem_update_stats(hm_ctx_t * ctx,apr_pool_t * p)477 static apr_status_t hm_slotmem_update_stats(hm_ctx_t *ctx, apr_pool_t *p)
478 {
479     apr_status_t rv;
480     apr_time_t now;
481     apr_hash_index_t *hi;
482     now = apr_time_now();
483     for (hi = apr_hash_first(p, ctx->servers);
484          hi != NULL; hi = apr_hash_next(hi)) {
485         hm_server_t *s = NULL;
486         apr_time_t seen;
487         apr_hash_this(hi, NULL, NULL, (void **) &s);
488         seen = apr_time_sec(now - s->seen);
489         if (seen > SEEN_TIMEOUT) {
490             /* remove it */
491             rv = hm_slotmem_remove_stat(s, p);
492         } else {
493             /* update it */
494             rv = hm_slotmem_update_stat(s, p);
495         }
496         if (rv !=APR_SUCCESS)
497             return rv;
498     }
499     return APR_SUCCESS;
500 }
501 /* Store/update the stats */
hm_update_stats(hm_ctx_t * ctx,apr_pool_t * p)502 static apr_status_t hm_update_stats(hm_ctx_t *ctx, apr_pool_t *p)
503 {
504     if (slotmem)
505         return hm_slotmem_update_stats(ctx, p);
506     else
507         return hm_file_update_stats(ctx, p);
508 }
509 
hm_get_server(hm_ctx_t * ctx,const char * ip,const int port)510 static hm_server_t *hm_get_server(hm_ctx_t *ctx, const char *ip, const int port)
511 {
512     hm_server_t *s;
513 
514     s = apr_hash_get(ctx->servers, ip, APR_HASH_KEY_STRING);
515 
516     if (s == NULL) {
517         s = apr_palloc(ctx->p, sizeof(hm_server_t));
518         s->ip = apr_pstrdup(ctx->p, ip);
519         s->port = port;
520         s->ready = 0;
521         s->busy = 0;
522         s->seen = 0;
523         apr_hash_set(ctx->servers, s->ip, APR_HASH_KEY_STRING, s);
524     }
525 
526     return s;
527 }
528 
529 /* Process a message received from a backend node */
hm_processmsg(hm_ctx_t * ctx,apr_pool_t * p,apr_sockaddr_t * from,char * buf,int len)530 static void hm_processmsg(hm_ctx_t *ctx, apr_pool_t *p,
531                                   apr_sockaddr_t *from, char *buf, int len)
532 {
533     apr_table_t *tbl;
534 
535     buf[len] = '\0';
536 
537     tbl = apr_table_make(p, 10);
538 
539     qs_to_table(buf, tbl, p);
540 
541     if (apr_table_get(tbl, "v") != NULL &&
542         apr_table_get(tbl, "busy") != NULL &&
543         apr_table_get(tbl, "ready") != NULL) {
544         char *ip;
545         int port = 80;
546         hm_server_t *s;
547         /* TODO: REMOVE ME BEFORE PRODUCTION (????) */
548         ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ctx->s, APLOGNO(02086)
549                      "%pI busy=%s ready=%s", from,
550                      apr_table_get(tbl, "busy"), apr_table_get(tbl, "ready"));
551 
552         apr_sockaddr_ip_get(&ip, from);
553 
554         if (apr_table_get(tbl, "port") != NULL)
555             port = atoi(apr_table_get(tbl, "port"));
556 
557         s = hm_get_server(ctx, ip, port);
558 
559         s->busy = atoi(apr_table_get(tbl, "busy"));
560         s->ready = atoi(apr_table_get(tbl, "ready"));
561         s->seen = apr_time_now();
562     }
563     else {
564         ap_log_error(APLOG_MARK, APLOG_CRIT, 0, ctx->s, APLOGNO(02087)
565                      "malformed message from %pI",
566                      from);
567     }
568 
569 }
570 /* Read message from multicast socket */
571 #define MAX_MSG_LEN (1000)
hm_recv(hm_ctx_t * ctx,apr_pool_t * p)572 static apr_status_t hm_recv(hm_ctx_t *ctx, apr_pool_t *p)
573 {
574     char buf[MAX_MSG_LEN + 1];
575     apr_sockaddr_t from;
576     apr_size_t len = MAX_MSG_LEN;
577     apr_status_t rv;
578 
579     from.pool = p;
580 
581     rv = apr_socket_recvfrom(&from, ctx->sock, 0, buf, &len);
582 
583     if (APR_STATUS_IS_EAGAIN(rv)) {
584         ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ctx->s, APLOGNO(02088) "would block");
585         return APR_SUCCESS;
586     }
587     else if (rv) {
588         ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ctx->s, APLOGNO(02089) "recvfrom failed");
589         return rv;
590     }
591 
592     hm_processmsg(ctx, p, &from, buf, len);
593 
594     return rv;
595 }
596 
hm_watchdog_callback(int state,void * data,apr_pool_t * pool)597 static apr_status_t hm_watchdog_callback(int state, void *data,
598                                          apr_pool_t *pool)
599 {
600     apr_status_t rv = APR_SUCCESS;
601     apr_time_t cur, now;
602     hm_ctx_t *ctx = (hm_ctx_t *)data;
603 
604     if (!ctx->active) {
605         return rv;
606     }
607 
608     switch (state) {
609         case AP_WATCHDOG_STATE_STARTING:
610             rv = hm_listen(ctx);
611             if (rv) {
612                 ctx->status = rv;
613                 ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ctx->s, APLOGNO(02090)
614                              "Unable to listen for connections!");
615             }
616             else {
617                 ctx->keep_running = 1;
618                 ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ctx->s, APLOGNO(02091)
619                              "%s listener started.",
620                              HM_WATHCHDOG_NAME);
621             }
622         break;
623         case AP_WATCHDOG_STATE_RUNNING:
624             /* store in the slotmem or in the file depending on configuration */
625             hm_update_stats(ctx, pool);
626             cur = now = apr_time_sec(apr_time_now());
627 
628             while ((now - cur) < apr_time_sec(ctx->interval)) {
629                 int n;
630                 apr_status_t rc;
631                 apr_pool_t *p;
632                 apr_pollfd_t pfd;
633                 apr_interval_time_t timeout;
634 
635                 apr_pool_create(&p, pool);
636                 apr_pool_tag(p, "hm_running");
637 
638                 pfd.desc_type = APR_POLL_SOCKET;
639                 pfd.desc.s = ctx->sock;
640                 pfd.p = p;
641                 pfd.reqevents = APR_POLLIN;
642 
643                 timeout = apr_time_from_sec(1);
644 
645                 rc = apr_poll(&pfd, 1, &n, timeout);
646 
647                 if (!ctx->keep_running) {
648                     apr_pool_destroy(p);
649                     break;
650                 }
651                 if (rc == APR_SUCCESS && (pfd.rtnevents & APR_POLLIN)) {
652                     hm_recv(ctx, p);
653                 }
654                 now = apr_time_sec(apr_time_now());
655                 apr_pool_destroy(p);
656             }
657         break;
658         case AP_WATCHDOG_STATE_STOPPING:
659             ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ctx->s, APLOGNO(02092)
660                          "stopping %s listener.",
661                          HM_WATHCHDOG_NAME);
662 
663             ctx->keep_running = 0;
664             if (ctx->sock) {
665                 apr_socket_close(ctx->sock);
666                 ctx->sock = NULL;
667             }
668         break;
669     }
670     return rv;
671 }
672 
hm_post_config(apr_pool_t * p,apr_pool_t * plog,apr_pool_t * ptemp,server_rec * s)673 static int hm_post_config(apr_pool_t *p, apr_pool_t *plog,
674                           apr_pool_t *ptemp, server_rec *s)
675 {
676     apr_status_t rv;
677     hm_ctx_t *ctx = ap_get_module_config(s->module_config,
678                                          &heartmonitor_module);
679     APR_OPTIONAL_FN_TYPE(ap_watchdog_get_instance) *hm_watchdog_get_instance;
680     APR_OPTIONAL_FN_TYPE(ap_watchdog_register_callback) *hm_watchdog_register_callback;
681 
682     hm_watchdog_get_instance = APR_RETRIEVE_OPTIONAL_FN(ap_watchdog_get_instance);
683     hm_watchdog_register_callback = APR_RETRIEVE_OPTIONAL_FN(ap_watchdog_register_callback);
684     if (!hm_watchdog_get_instance || !hm_watchdog_register_callback) {
685         ap_log_error(APLOG_MARK, APLOG_CRIT, 0, s, APLOGNO(02093)
686                      "mod_watchdog is required");
687         return !OK;
688     }
689 
690     /* Create the slotmem */
691     if (ap_state_query(AP_SQ_MAIN_STATE) == AP_SQ_MS_CREATE_CONFIG) {
692         /* this is the real thing */
693         if (maxworkers) {
694             storage = ap_lookup_provider(AP_SLOTMEM_PROVIDER_GROUP, "shm",
695                                          AP_SLOTMEM_PROVIDER_VERSION);
696             if (!storage) {
697                 ap_log_error(APLOG_MARK, APLOG_EMERG, 0, s, APLOGNO(02284)
698                              "failed to lookup provider 'shm' for '%s', "
699                              "maybe you need to load mod_slotmem_shm?",
700                              AP_SLOTMEM_PROVIDER_GROUP);
701                 return !OK;
702             }
703             storage->create(&slotmem, "mod_heartmonitor", sizeof(hm_slot_server_t), maxworkers, AP_SLOTMEM_TYPE_PREGRAB, p);
704             if (!slotmem) {
705                 ap_log_error(APLOG_MARK, APLOG_EMERG, 0, s, APLOGNO(02285)
706                              "slotmem_create for status failed");
707                 return !OK;
708             }
709         }
710     }
711 
712     if (!ctx->active) {
713         return OK;
714     }
715     rv = hm_watchdog_get_instance(&ctx->watchdog,
716                                   HM_WATHCHDOG_NAME,
717                                   0, 1, p);
718     if (rv) {
719         ap_log_error(APLOG_MARK, APLOG_CRIT, rv, s, APLOGNO(02094)
720                      "Failed to create watchdog instance (%s)",
721                      HM_WATHCHDOG_NAME);
722         return !OK;
723     }
724     /* Register a callback with zero interval. */
725     rv = hm_watchdog_register_callback(ctx->watchdog,
726                                        0,
727                                        ctx,
728                                        hm_watchdog_callback);
729     if (rv) {
730         ap_log_error(APLOG_MARK, APLOG_CRIT, rv, s, APLOGNO(02095)
731                      "Failed to register watchdog callback (%s)",
732                      HM_WATHCHDOG_NAME);
733         return !OK;
734     }
735     ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, s, APLOGNO(02096)
736                  "wd callback %s", HM_WATHCHDOG_NAME);
737     return OK;
738 }
739 
hm_handler(request_rec * r)740 static int hm_handler(request_rec *r)
741 {
742     apr_bucket_brigade *input_brigade;
743     apr_size_t len;
744     char *buf;
745     apr_status_t status;
746     apr_table_t *tbl;
747     hm_server_t hmserver;
748     char *ip;
749     hm_ctx_t *ctx;
750 
751     if (strcmp(r->handler, "heartbeat")) {
752         return DECLINED;
753     }
754     if (r->method_number != M_POST) {
755         return HTTP_METHOD_NOT_ALLOWED;
756     }
757 
758     len = MAX_MSG_LEN;
759     ctx = ap_get_module_config(r->server->module_config,
760             &heartmonitor_module);
761 
762     buf = apr_pcalloc(r->pool, MAX_MSG_LEN);
763     input_brigade = apr_brigade_create(r->connection->pool, r->connection->bucket_alloc);
764     status = ap_get_brigade(r->input_filters, input_brigade, AP_MODE_READBYTES, APR_BLOCK_READ, MAX_MSG_LEN);
765     if (status != APR_SUCCESS) {
766         return ap_map_http_request_error(status, HTTP_BAD_REQUEST);
767     }
768     apr_brigade_flatten(input_brigade, buf, &len);
769 
770     /* we can't use hm_processmsg because it uses hm_get_server() */
771     buf[len] = '\0';
772     tbl = apr_table_make(r->pool, 10);
773     qs_to_table(buf, tbl, r->pool);
774     apr_sockaddr_ip_get(&ip, r->connection->client_addr);
775     hmserver.ip = ip;
776     hmserver.port = 80;
777     if (apr_table_get(tbl, "port") != NULL)
778         hmserver.port = atoi(apr_table_get(tbl, "port"));
779     hmserver.busy = atoi(apr_table_get(tbl, "busy"));
780     hmserver.ready = atoi(apr_table_get(tbl, "ready"));
781     hmserver.seen = apr_time_now();
782     hm_update_stat(ctx, &hmserver, r->pool);
783 
784     ap_set_content_type(r, "text/plain");
785     ap_set_content_length(r, 2);
786     ap_rputs("OK", r);
787     ap_rflush(r);
788 
789     return OK;
790 }
791 
hm_register_hooks(apr_pool_t * p)792 static void hm_register_hooks(apr_pool_t *p)
793 {
794     static const char * const aszSucc[]={ "mod_proxy.c", NULL };
795     ap_hook_post_config(hm_post_config, NULL, NULL, APR_HOOK_MIDDLE);
796 
797     ap_hook_handler(hm_handler, NULL, aszSucc, APR_HOOK_FIRST);
798 }
799 
hm_create_config(apr_pool_t * p,server_rec * s)800 static void *hm_create_config(apr_pool_t *p, server_rec *s)
801 {
802     hm_ctx_t *ctx = (hm_ctx_t *) apr_palloc(p, sizeof(hm_ctx_t));
803 
804     ctx->active = 0;
805     ctx->storage_path = ap_runtime_dir_relative(p, DEFAULT_HEARTBEAT_STORAGE);
806     /* TODO: Add directive for tuning the update interval
807      */
808     ctx->interval = apr_time_from_sec(HM_UPDATE_SEC);
809     ctx->s = s;
810     apr_pool_create(&ctx->p, p);
811     apr_pool_tag(ctx->p, "hm_ctx");
812     ctx->servers = apr_hash_make(ctx->p);
813 
814     return ctx;
815 }
816 
cmd_hm_storage(cmd_parms * cmd,void * dconf,const char * path)817 static const char *cmd_hm_storage(cmd_parms *cmd,
818                                   void *dconf, const char *path)
819 {
820     apr_pool_t *p = cmd->pool;
821     hm_ctx_t *ctx =
822         (hm_ctx_t *) ap_get_module_config(cmd->server->module_config,
823                                           &heartmonitor_module);
824     const char *err = ap_check_cmd_context(cmd, GLOBAL_ONLY);
825 
826     if (err != NULL) {
827         return err;
828     }
829 
830     ctx->storage_path = ap_runtime_dir_relative(p, path);
831 
832     return NULL;
833 }
834 
cmd_hm_listen(cmd_parms * cmd,void * dconf,const char * mcast_addr)835 static const char *cmd_hm_listen(cmd_parms *cmd,
836                                  void *dconf, const char *mcast_addr)
837 {
838     apr_status_t rv;
839     char *host_str;
840     char *scope_id;
841     apr_port_t port = 0;
842     apr_pool_t *p = cmd->pool;
843     hm_ctx_t *ctx =
844         (hm_ctx_t *) ap_get_module_config(cmd->server->module_config,
845                                           &heartmonitor_module);
846     const char *err = ap_check_cmd_context(cmd, GLOBAL_ONLY);
847 
848     if (err != NULL) {
849         return err;
850     }
851 
852     if (!ctx->active) {
853         ctx->active = 1;
854     }
855     else {
856         return "HeartbeatListen: May only be specified once.";
857     }
858 
859     rv = apr_parse_addr_port(&host_str, &scope_id, &port, mcast_addr, cmd->temp_pool);
860 
861     if (rv) {
862         return "HeartbeatListen: Unable to parse multicast address.";
863     }
864 
865     if (host_str == NULL) {
866         return "HeartbeatListen: No host provided in multicast address";
867     }
868 
869     if (port == 0) {
870         return "HeartbeatListen: No port provided in multicast address";
871     }
872 
873     rv = apr_sockaddr_info_get(&ctx->mcast_addr, host_str, APR_INET, port, 0,
874                                p);
875 
876     if (rv) {
877         return
878             "HeartbeatListen: apr_sockaddr_info_get failed on multicast address";
879     }
880 
881     return NULL;
882 }
883 
cmd_hm_maxworkers(cmd_parms * cmd,void * dconf,const char * data)884 static const char *cmd_hm_maxworkers(cmd_parms *cmd,
885                                   void *dconf, const char *data)
886 {
887     const char *err = ap_check_cmd_context(cmd, GLOBAL_ONLY);
888 
889     if (err != NULL) {
890         return err;
891     }
892 
893     maxworkers = atoi(data);
894     if (maxworkers <= 10)
895         return "HeartbeatMaxServers: Should be bigger than 10";
896 
897     return NULL;
898 }
899 
900 static const command_rec hm_cmds[] = {
901     AP_INIT_TAKE1("HeartbeatListen", cmd_hm_listen, NULL, RSRC_CONF,
902                   "Address to listen for heartbeat requests"),
903     AP_INIT_TAKE1("HeartbeatStorage", cmd_hm_storage, NULL, RSRC_CONF,
904                   "Path to store heartbeat data."),
905     AP_INIT_TAKE1("HeartbeatMaxServers", cmd_hm_maxworkers, NULL, RSRC_CONF,
906                   "Max number of servers when using slotmem (instead file) to store heartbeat data."),
907     {NULL}
908 };
909 
910 AP_DECLARE_MODULE(heartmonitor) = {
911     STANDARD20_MODULE_STUFF,
912     NULL,                       /* create per-directory config structure */
913     NULL,                       /* merge per-directory config structures */
914     hm_create_config,           /* create per-server config structure */
915     NULL,                       /* merge per-server config structures */
916     hm_cmds,                    /* command apr_table_t */
917     hm_register_hooks
918 };
919