1 /* Licensed to the Apache Software Foundation (ASF) under one or more
2 * contributor license agreements. See the NOTICE file distributed with
3 * this work for additional information regarding copyright ownership.
4 * The ASF licenses this file to You under the Apache License, Version 2.0
5 * (the "License"); you may not use this file except in compliance with
6 * the License. You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "httpd.h"
18 #include "http_config.h"
19 #include "http_log.h"
20 #include "http_core.h"
21 #include "http_protocol.h"
22 #include "apr_strings.h"
23 #include "apr_hash.h"
24 #include "apr_time.h"
25 #include "ap_mpm.h"
26 #include "scoreboard.h"
27 #include "mod_watchdog.h"
28 #include "ap_slotmem.h"
29 #include "heartbeat.h"
30
31
32 #ifndef HM_UPDATE_SEC
33 /* How often we update the stats file */
34 /* TODO: Make a runtime config */
35 #define HM_UPDATE_SEC (5)
36 #endif
37
38 #define HM_WATHCHDOG_NAME ("_heartmonitor_")
39
40 static const ap_slotmem_provider_t *storage = NULL;
41 static ap_slotmem_instance_t *slotmem = NULL;
42 static int maxworkers = 0;
43
44 module AP_MODULE_DECLARE_DATA heartmonitor_module;
45
46 typedef struct hm_server_t
47 {
48 const char *ip;
49 int busy;
50 int ready;
51 unsigned int port;
52 apr_time_t seen;
53 } hm_server_t;
54
55 typedef struct hm_ctx_t
56 {
57 int active;
58 const char *storage_path;
59 ap_watchdog_t *watchdog;
60 apr_interval_time_t interval;
61 apr_sockaddr_t *mcast_addr;
62 apr_status_t status;
63 volatile int keep_running;
64 apr_socket_t *sock;
65 apr_pool_t *p;
66 apr_hash_t *servers;
67 server_rec *s;
68 } hm_ctx_t;
69
70 typedef struct hm_slot_server_ctx_t {
71 hm_server_t *s;
72 int found;
73 unsigned int item_id;
74 } hm_slot_server_ctx_t;
75
hm_listen(hm_ctx_t * ctx)76 static apr_status_t hm_listen(hm_ctx_t *ctx)
77 {
78 apr_status_t rv;
79
80 rv = apr_socket_create(&ctx->sock, ctx->mcast_addr->family,
81 SOCK_DGRAM, APR_PROTO_UDP, ctx->p);
82
83 if (rv) {
84 ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ctx->s, APLOGNO(02068)
85 "Failed to create listening socket.");
86 return rv;
87 }
88
89 rv = apr_socket_opt_set(ctx->sock, APR_SO_REUSEADDR, 1);
90 if (rv) {
91 ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ctx->s, APLOGNO(02069)
92 "Failed to set APR_SO_REUSEADDR to 1 on socket.");
93 return rv;
94 }
95
96
97 rv = apr_socket_opt_set(ctx->sock, APR_SO_NONBLOCK, 1);
98 if (rv) {
99 ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ctx->s, APLOGNO(02070)
100 "Failed to set APR_SO_NONBLOCK to 1 on socket.");
101 return rv;
102 }
103
104 rv = apr_socket_bind(ctx->sock, ctx->mcast_addr);
105 if (rv) {
106 ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ctx->s, APLOGNO(02071)
107 "Failed to bind on socket.");
108 return rv;
109 }
110
111 rv = apr_mcast_join(ctx->sock, ctx->mcast_addr, NULL, NULL);
112
113 if (rv) {
114 ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ctx->s, APLOGNO(02072)
115 "Failed to join multicast group");
116 return rv;
117 }
118
119 rv = apr_mcast_loopback(ctx->sock, 1);
120 if (rv) {
121 ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ctx->s, APLOGNO(02073)
122 "Failed to accept localhost mulitcast on socket.");
123 return rv;
124 }
125
126 return APR_SUCCESS;
127 }
128
129 /* XXX: The same exists in mod_lbmethod_heartbeat.c where it is named argstr_to_table */
qs_to_table(const char * input,apr_table_t * parms,apr_pool_t * p)130 static void qs_to_table(const char *input, apr_table_t *parms,
131 apr_pool_t *p)
132 {
133 char *key;
134 char *value;
135 char *query_string;
136 char *strtok_state;
137
138 if (input == NULL) {
139 return;
140 }
141
142 query_string = apr_pstrdup(p, input);
143
144 key = apr_strtok(query_string, "&", &strtok_state);
145 while (key) {
146 value = strchr(key, '=');
147 if (value) {
148 *value = '\0'; /* Split the string in two */
149 value++; /* Skip passed the = */
150 }
151 else {
152 value = "1";
153 }
154 ap_unescape_url(key);
155 ap_unescape_url(value);
156 apr_table_set(parms, key, value);
157 /*
158 ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, APLOGNO(03182)
159 "Found query arg: %s = %s", key, value);
160 */
161 key = apr_strtok(NULL, "&", &strtok_state);
162 }
163 }
164
165
166 #define SEEN_TIMEOUT (30)
167
168 /* Store in the slotmem */
hm_update(void * mem,void * data,apr_pool_t * p)169 static apr_status_t hm_update(void* mem, void *data, apr_pool_t *p)
170 {
171 hm_slot_server_t *old = (hm_slot_server_t *) mem;
172 hm_slot_server_ctx_t *s = (hm_slot_server_ctx_t *) data;
173 hm_server_t *new = s->s;
174 if (strncmp(old->ip, new->ip, MAXIPSIZE)==0) {
175 s->found = 1;
176 old->busy = new->busy;
177 old->ready = new->ready;
178 old->seen = new->seen;
179 }
180 return APR_SUCCESS;
181 }
182 /* Read the id corresponding to the entry in the slotmem */
hm_readid(void * mem,void * data,apr_pool_t * p)183 static apr_status_t hm_readid(void* mem, void *data, apr_pool_t *p)
184 {
185 hm_slot_server_t *old = (hm_slot_server_t *) mem;
186 hm_slot_server_ctx_t *s = (hm_slot_server_ctx_t *) data;
187 hm_server_t *new = s->s;
188 if (strncmp(old->ip, new->ip, MAXIPSIZE)==0) {
189 s->found = 1;
190 s->item_id = old->id;
191 }
192 return APR_SUCCESS;
193 }
194 /* update the entry or create it if not existing */
hm_slotmem_update_stat(hm_server_t * s,apr_pool_t * pool)195 static apr_status_t hm_slotmem_update_stat(hm_server_t *s, apr_pool_t *pool)
196 {
197 /* We call do_all (to try to update) otherwise grab + put */
198 hm_slot_server_ctx_t ctx;
199 ctx.s = s;
200 ctx.found = 0;
201 storage->doall(slotmem, hm_update, &ctx, pool);
202 if (!ctx.found) {
203 unsigned int i;
204 hm_slot_server_t hmserver;
205 memcpy(hmserver.ip, s->ip, MAXIPSIZE);
206 hmserver.busy = s->busy;
207 hmserver.ready = s->ready;
208 hmserver.seen = s->seen;
209 /* XXX locking for grab() / put() */
210 storage->grab(slotmem, &i);
211 hmserver.id = i;
212 storage->put(slotmem, i, (unsigned char *)&hmserver, sizeof(hmserver));
213 }
214 return APR_SUCCESS;
215 }
hm_slotmem_remove_stat(hm_server_t * s,apr_pool_t * pool)216 static apr_status_t hm_slotmem_remove_stat(hm_server_t *s, apr_pool_t *pool)
217 {
218 hm_slot_server_ctx_t ctx;
219 ctx.s = s;
220 ctx.found = 0;
221 storage->doall(slotmem, hm_readid, &ctx, pool);
222 if (ctx.found) {
223 storage->release(slotmem, ctx.item_id);
224 }
225 return APR_SUCCESS;
226 }
hm_file_update_stat(hm_ctx_t * ctx,hm_server_t * s,apr_pool_t * pool)227 static apr_status_t hm_file_update_stat(hm_ctx_t *ctx, hm_server_t *s, apr_pool_t *pool)
228 {
229 apr_status_t rv;
230 apr_file_t *fp;
231 apr_file_t *fpin;
232 apr_time_t now;
233 apr_time_t fage;
234 apr_finfo_t fi;
235 int updated = 0;
236 char *path = apr_pstrcat(pool, ctx->storage_path, ".tmp.XXXXXX", NULL);
237
238
239 /* TODO: Update stats file (!) */
240 rv = apr_file_mktemp(&fp, path, APR_CREATE | APR_WRITE, pool);
241
242 if (rv) {
243 ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ctx->s, APLOGNO(02074)
244 "Unable to open tmp file: %s", path);
245 return rv;
246 }
247 rv = apr_file_open(&fpin, ctx->storage_path, APR_READ|APR_BINARY|APR_BUFFERED,
248 APR_OS_DEFAULT, pool);
249
250 now = apr_time_now();
251 if (rv == APR_SUCCESS) {
252 char *t;
253 apr_table_t *hbt = apr_table_make(pool, 10);
254 apr_bucket_alloc_t *ba;
255 apr_bucket_brigade *bb;
256 apr_bucket_brigade *tmpbb;
257
258 rv = apr_file_info_get(&fi, APR_FINFO_SIZE | APR_FINFO_MTIME, fpin);
259 if (rv) {
260 ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ctx->s, APLOGNO(02075)
261 "Unable to read file: %s", ctx->storage_path);
262 return rv;
263 }
264
265 /* Read the file and update the line corresponding to the node */
266 ba = apr_bucket_alloc_create(pool);
267 bb = apr_brigade_create(pool, ba);
268 apr_brigade_insert_file(bb, fpin, 0, fi.size, pool);
269 tmpbb = apr_brigade_create(pool, ba);
270 fage = apr_time_sec(now - fi.mtime);
271 do {
272 char buf[4096];
273 const char *ip;
274 apr_size_t bsize = sizeof(buf);
275
276 apr_brigade_cleanup(tmpbb);
277 if (APR_BRIGADE_EMPTY(bb)) {
278 break;
279 }
280 rv = apr_brigade_split_line(tmpbb, bb,
281 APR_BLOCK_READ, sizeof(buf));
282
283 if (rv) {
284 ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ctx->s, APLOGNO(02076)
285 "Unable to read from file: %s", ctx->storage_path);
286 return rv;
287 }
288
289 apr_brigade_flatten(tmpbb, buf, &bsize);
290 if (bsize == 0) {
291 break;
292 }
293 buf[bsize - 1] = 0;
294 t = strchr(buf, ' ');
295 if (t) {
296 ip = apr_pstrmemdup(pool, buf, t - buf);
297 }
298 else {
299 ip = NULL;
300 }
301
302 if (!ip || buf[0] == '#') {
303 /* copy things we can't process */
304 apr_file_printf(fp, "%s\n", buf);
305 }
306 else if (strcmp(ip, s->ip) != 0 ) {
307 hm_server_t node;
308 apr_time_t seen;
309 const char *val;
310
311 /* Update seen time according to the last file modification */
312 apr_table_clear(hbt);
313 qs_to_table(apr_pstrdup(pool, t), hbt, pool);
314 if ((val = apr_table_get(hbt, "busy"))) {
315 node.busy = atoi(val);
316 }
317 else {
318 node.busy = 0;
319 }
320
321 if ((val = apr_table_get(hbt, "ready"))) {
322 node.ready = atoi(val);
323 }
324 else {
325 node.ready = 0;
326 }
327
328 if ((val = apr_table_get(hbt, "lastseen"))) {
329 node.seen = atoi(val);
330 }
331 else {
332 node.seen = SEEN_TIMEOUT;
333 }
334 seen = fage + node.seen;
335
336 if ((val = apr_table_get(hbt, "port"))) {
337 node.port = atoi(val);
338 }
339 else {
340 node.port = 80;
341 }
342 apr_file_printf(fp, "%s &ready=%u&busy=%u&lastseen=%u&port=%u\n",
343 ip, node.ready, node.busy, (unsigned int) seen, node.port);
344 }
345 else {
346 apr_time_t seen;
347 seen = apr_time_sec(now - s->seen);
348 apr_file_printf(fp, "%s &ready=%u&busy=%u&lastseen=%u&port=%u\n",
349 s->ip, s->ready, s->busy, (unsigned int) seen, s->port);
350 updated = 1;
351 }
352 } while (1);
353 }
354
355 if (!updated) {
356 apr_time_t seen;
357 seen = apr_time_sec(now - s->seen);
358 apr_file_printf(fp, "%s &ready=%u&busy=%u&lastseen=%u&port=%u\n",
359 s->ip, s->ready, s->busy, (unsigned int) seen, s->port);
360 }
361
362 rv = apr_file_flush(fp);
363 if (rv) {
364 ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ctx->s, APLOGNO(02077)
365 "Unable to flush file: %s", path);
366 return rv;
367 }
368
369 rv = apr_file_close(fp);
370 if (rv) {
371 ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ctx->s, APLOGNO(02078)
372 "Unable to close file: %s", path);
373 return rv;
374 }
375
376 rv = apr_file_perms_set(path,
377 APR_FPROT_UREAD | APR_FPROT_GREAD |
378 APR_FPROT_WREAD);
379 if (rv && rv != APR_INCOMPLETE && rv != APR_ENOTIMPL) {
380 ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ctx->s, APLOGNO(02079)
381 "Unable to set file permissions on %s",
382 path);
383 return rv;
384 }
385
386 rv = apr_file_rename(path, ctx->storage_path, pool);
387
388 if (rv) {
389 ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ctx->s, APLOGNO(02080)
390 "Unable to move file: %s -> %s", path,
391 ctx->storage_path);
392 return rv;
393 }
394
395 return APR_SUCCESS;
396 }
hm_update_stat(hm_ctx_t * ctx,hm_server_t * s,apr_pool_t * pool)397 static apr_status_t hm_update_stat(hm_ctx_t *ctx, hm_server_t *s, apr_pool_t *pool)
398 {
399 if (slotmem)
400 return hm_slotmem_update_stat(s, pool);
401 else
402 return hm_file_update_stat(ctx, s, pool);
403 }
404
405 /* Store in a file */
hm_file_update_stats(hm_ctx_t * ctx,apr_pool_t * p)406 static apr_status_t hm_file_update_stats(hm_ctx_t *ctx, apr_pool_t *p)
407 {
408 apr_status_t rv;
409 apr_file_t *fp;
410 apr_hash_index_t *hi;
411 apr_time_t now;
412 char *path = apr_pstrcat(p, ctx->storage_path, ".tmp.XXXXXX", NULL);
413 /* TODO: Update stats file (!) */
414 rv = apr_file_mktemp(&fp, path, APR_CREATE | APR_WRITE, p);
415
416 if (rv) {
417 ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ctx->s, APLOGNO(02081)
418 "Unable to open tmp file: %s", path);
419 return rv;
420 }
421
422 now = apr_time_now();
423 for (hi = apr_hash_first(p, ctx->servers);
424 hi != NULL; hi = apr_hash_next(hi)) {
425 hm_server_t *s = NULL;
426 apr_time_t seen;
427 apr_hash_this(hi, NULL, NULL, (void **) &s);
428 seen = apr_time_sec(now - s->seen);
429 if (seen > SEEN_TIMEOUT) {
430 /*
431 * Skip this entry from the heartbeat file -- when it comes back,
432 * we will reuse the memory...
433 */
434 }
435 else {
436 apr_file_printf(fp, "%s &ready=%u&busy=%u&lastseen=%u&port=%u\n",
437 s->ip, s->ready, s->busy, (unsigned int) seen, s->port);
438 }
439 }
440
441 rv = apr_file_flush(fp);
442 if (rv) {
443 ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ctx->s, APLOGNO(02082)
444 "Unable to flush file: %s", path);
445 return rv;
446 }
447
448 rv = apr_file_close(fp);
449 if (rv) {
450 ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ctx->s, APLOGNO(02083)
451 "Unable to close file: %s", path);
452 return rv;
453 }
454
455 rv = apr_file_perms_set(path,
456 APR_FPROT_UREAD | APR_FPROT_GREAD |
457 APR_FPROT_WREAD);
458 if (rv && rv != APR_INCOMPLETE && rv != APR_ENOTIMPL) {
459 ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ctx->s, APLOGNO(02084)
460 "Unable to set file permissions on %s",
461 path);
462 return rv;
463 }
464
465 rv = apr_file_rename(path, ctx->storage_path, p);
466
467 if (rv) {
468 ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ctx->s, APLOGNO(02085)
469 "Unable to move file: %s -> %s", path,
470 ctx->storage_path);
471 return rv;
472 }
473
474 return APR_SUCCESS;
475 }
476 /* Store in a slotmem */
hm_slotmem_update_stats(hm_ctx_t * ctx,apr_pool_t * p)477 static apr_status_t hm_slotmem_update_stats(hm_ctx_t *ctx, apr_pool_t *p)
478 {
479 apr_status_t rv;
480 apr_time_t now;
481 apr_hash_index_t *hi;
482 now = apr_time_now();
483 for (hi = apr_hash_first(p, ctx->servers);
484 hi != NULL; hi = apr_hash_next(hi)) {
485 hm_server_t *s = NULL;
486 apr_time_t seen;
487 apr_hash_this(hi, NULL, NULL, (void **) &s);
488 seen = apr_time_sec(now - s->seen);
489 if (seen > SEEN_TIMEOUT) {
490 /* remove it */
491 rv = hm_slotmem_remove_stat(s, p);
492 } else {
493 /* update it */
494 rv = hm_slotmem_update_stat(s, p);
495 }
496 if (rv !=APR_SUCCESS)
497 return rv;
498 }
499 return APR_SUCCESS;
500 }
501 /* Store/update the stats */
hm_update_stats(hm_ctx_t * ctx,apr_pool_t * p)502 static apr_status_t hm_update_stats(hm_ctx_t *ctx, apr_pool_t *p)
503 {
504 if (slotmem)
505 return hm_slotmem_update_stats(ctx, p);
506 else
507 return hm_file_update_stats(ctx, p);
508 }
509
hm_get_server(hm_ctx_t * ctx,const char * ip,const int port)510 static hm_server_t *hm_get_server(hm_ctx_t *ctx, const char *ip, const int port)
511 {
512 hm_server_t *s;
513
514 s = apr_hash_get(ctx->servers, ip, APR_HASH_KEY_STRING);
515
516 if (s == NULL) {
517 s = apr_palloc(ctx->p, sizeof(hm_server_t));
518 s->ip = apr_pstrdup(ctx->p, ip);
519 s->port = port;
520 s->ready = 0;
521 s->busy = 0;
522 s->seen = 0;
523 apr_hash_set(ctx->servers, s->ip, APR_HASH_KEY_STRING, s);
524 }
525
526 return s;
527 }
528
529 /* Process a message received from a backend node */
hm_processmsg(hm_ctx_t * ctx,apr_pool_t * p,apr_sockaddr_t * from,char * buf,int len)530 static void hm_processmsg(hm_ctx_t *ctx, apr_pool_t *p,
531 apr_sockaddr_t *from, char *buf, int len)
532 {
533 apr_table_t *tbl;
534
535 buf[len] = '\0';
536
537 tbl = apr_table_make(p, 10);
538
539 qs_to_table(buf, tbl, p);
540
541 if (apr_table_get(tbl, "v") != NULL &&
542 apr_table_get(tbl, "busy") != NULL &&
543 apr_table_get(tbl, "ready") != NULL) {
544 char *ip;
545 int port = 80;
546 hm_server_t *s;
547 /* TODO: REMOVE ME BEFORE PRODUCTION (????) */
548 ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ctx->s, APLOGNO(02086)
549 "%pI busy=%s ready=%s", from,
550 apr_table_get(tbl, "busy"), apr_table_get(tbl, "ready"));
551
552 apr_sockaddr_ip_get(&ip, from);
553
554 if (apr_table_get(tbl, "port") != NULL)
555 port = atoi(apr_table_get(tbl, "port"));
556
557 s = hm_get_server(ctx, ip, port);
558
559 s->busy = atoi(apr_table_get(tbl, "busy"));
560 s->ready = atoi(apr_table_get(tbl, "ready"));
561 s->seen = apr_time_now();
562 }
563 else {
564 ap_log_error(APLOG_MARK, APLOG_CRIT, 0, ctx->s, APLOGNO(02087)
565 "malformed message from %pI",
566 from);
567 }
568
569 }
570 /* Read message from multicast socket */
571 #define MAX_MSG_LEN (1000)
hm_recv(hm_ctx_t * ctx,apr_pool_t * p)572 static apr_status_t hm_recv(hm_ctx_t *ctx, apr_pool_t *p)
573 {
574 char buf[MAX_MSG_LEN + 1];
575 apr_sockaddr_t from;
576 apr_size_t len = MAX_MSG_LEN;
577 apr_status_t rv;
578
579 from.pool = p;
580
581 rv = apr_socket_recvfrom(&from, ctx->sock, 0, buf, &len);
582
583 if (APR_STATUS_IS_EAGAIN(rv)) {
584 ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ctx->s, APLOGNO(02088) "would block");
585 return APR_SUCCESS;
586 }
587 else if (rv) {
588 ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ctx->s, APLOGNO(02089) "recvfrom failed");
589 return rv;
590 }
591
592 hm_processmsg(ctx, p, &from, buf, len);
593
594 return rv;
595 }
596
hm_watchdog_callback(int state,void * data,apr_pool_t * pool)597 static apr_status_t hm_watchdog_callback(int state, void *data,
598 apr_pool_t *pool)
599 {
600 apr_status_t rv = APR_SUCCESS;
601 apr_time_t cur, now;
602 hm_ctx_t *ctx = (hm_ctx_t *)data;
603
604 if (!ctx->active) {
605 return rv;
606 }
607
608 switch (state) {
609 case AP_WATCHDOG_STATE_STARTING:
610 rv = hm_listen(ctx);
611 if (rv) {
612 ctx->status = rv;
613 ap_log_error(APLOG_MARK, APLOG_CRIT, rv, ctx->s, APLOGNO(02090)
614 "Unable to listen for connections!");
615 }
616 else {
617 ctx->keep_running = 1;
618 ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ctx->s, APLOGNO(02091)
619 "%s listener started.",
620 HM_WATHCHDOG_NAME);
621 }
622 break;
623 case AP_WATCHDOG_STATE_RUNNING:
624 /* store in the slotmem or in the file depending on configuration */
625 hm_update_stats(ctx, pool);
626 cur = now = apr_time_sec(apr_time_now());
627
628 while ((now - cur) < apr_time_sec(ctx->interval)) {
629 int n;
630 apr_status_t rc;
631 apr_pool_t *p;
632 apr_pollfd_t pfd;
633 apr_interval_time_t timeout;
634
635 apr_pool_create(&p, pool);
636 apr_pool_tag(p, "hm_running");
637
638 pfd.desc_type = APR_POLL_SOCKET;
639 pfd.desc.s = ctx->sock;
640 pfd.p = p;
641 pfd.reqevents = APR_POLLIN;
642
643 timeout = apr_time_from_sec(1);
644
645 rc = apr_poll(&pfd, 1, &n, timeout);
646
647 if (!ctx->keep_running) {
648 apr_pool_destroy(p);
649 break;
650 }
651 if (rc == APR_SUCCESS && (pfd.rtnevents & APR_POLLIN)) {
652 hm_recv(ctx, p);
653 }
654 now = apr_time_sec(apr_time_now());
655 apr_pool_destroy(p);
656 }
657 break;
658 case AP_WATCHDOG_STATE_STOPPING:
659 ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, ctx->s, APLOGNO(02092)
660 "stopping %s listener.",
661 HM_WATHCHDOG_NAME);
662
663 ctx->keep_running = 0;
664 if (ctx->sock) {
665 apr_socket_close(ctx->sock);
666 ctx->sock = NULL;
667 }
668 break;
669 }
670 return rv;
671 }
672
hm_post_config(apr_pool_t * p,apr_pool_t * plog,apr_pool_t * ptemp,server_rec * s)673 static int hm_post_config(apr_pool_t *p, apr_pool_t *plog,
674 apr_pool_t *ptemp, server_rec *s)
675 {
676 apr_status_t rv;
677 hm_ctx_t *ctx = ap_get_module_config(s->module_config,
678 &heartmonitor_module);
679 APR_OPTIONAL_FN_TYPE(ap_watchdog_get_instance) *hm_watchdog_get_instance;
680 APR_OPTIONAL_FN_TYPE(ap_watchdog_register_callback) *hm_watchdog_register_callback;
681
682 hm_watchdog_get_instance = APR_RETRIEVE_OPTIONAL_FN(ap_watchdog_get_instance);
683 hm_watchdog_register_callback = APR_RETRIEVE_OPTIONAL_FN(ap_watchdog_register_callback);
684 if (!hm_watchdog_get_instance || !hm_watchdog_register_callback) {
685 ap_log_error(APLOG_MARK, APLOG_CRIT, 0, s, APLOGNO(02093)
686 "mod_watchdog is required");
687 return !OK;
688 }
689
690 /* Create the slotmem */
691 if (ap_state_query(AP_SQ_MAIN_STATE) == AP_SQ_MS_CREATE_CONFIG) {
692 /* this is the real thing */
693 if (maxworkers) {
694 storage = ap_lookup_provider(AP_SLOTMEM_PROVIDER_GROUP, "shm",
695 AP_SLOTMEM_PROVIDER_VERSION);
696 if (!storage) {
697 ap_log_error(APLOG_MARK, APLOG_EMERG, 0, s, APLOGNO(02284)
698 "failed to lookup provider 'shm' for '%s', "
699 "maybe you need to load mod_slotmem_shm?",
700 AP_SLOTMEM_PROVIDER_GROUP);
701 return !OK;
702 }
703 storage->create(&slotmem, "mod_heartmonitor", sizeof(hm_slot_server_t), maxworkers, AP_SLOTMEM_TYPE_PREGRAB, p);
704 if (!slotmem) {
705 ap_log_error(APLOG_MARK, APLOG_EMERG, 0, s, APLOGNO(02285)
706 "slotmem_create for status failed");
707 return !OK;
708 }
709 }
710 }
711
712 if (!ctx->active) {
713 return OK;
714 }
715 rv = hm_watchdog_get_instance(&ctx->watchdog,
716 HM_WATHCHDOG_NAME,
717 0, 1, p);
718 if (rv) {
719 ap_log_error(APLOG_MARK, APLOG_CRIT, rv, s, APLOGNO(02094)
720 "Failed to create watchdog instance (%s)",
721 HM_WATHCHDOG_NAME);
722 return !OK;
723 }
724 /* Register a callback with zero interval. */
725 rv = hm_watchdog_register_callback(ctx->watchdog,
726 0,
727 ctx,
728 hm_watchdog_callback);
729 if (rv) {
730 ap_log_error(APLOG_MARK, APLOG_CRIT, rv, s, APLOGNO(02095)
731 "Failed to register watchdog callback (%s)",
732 HM_WATHCHDOG_NAME);
733 return !OK;
734 }
735 ap_log_error(APLOG_MARK, APLOG_DEBUG, 0, s, APLOGNO(02096)
736 "wd callback %s", HM_WATHCHDOG_NAME);
737 return OK;
738 }
739
hm_handler(request_rec * r)740 static int hm_handler(request_rec *r)
741 {
742 apr_bucket_brigade *input_brigade;
743 apr_size_t len;
744 char *buf;
745 apr_status_t status;
746 apr_table_t *tbl;
747 hm_server_t hmserver;
748 char *ip;
749 hm_ctx_t *ctx;
750
751 if (strcmp(r->handler, "heartbeat")) {
752 return DECLINED;
753 }
754 if (r->method_number != M_POST) {
755 return HTTP_METHOD_NOT_ALLOWED;
756 }
757
758 len = MAX_MSG_LEN;
759 ctx = ap_get_module_config(r->server->module_config,
760 &heartmonitor_module);
761
762 buf = apr_pcalloc(r->pool, MAX_MSG_LEN);
763 input_brigade = apr_brigade_create(r->connection->pool, r->connection->bucket_alloc);
764 status = ap_get_brigade(r->input_filters, input_brigade, AP_MODE_READBYTES, APR_BLOCK_READ, MAX_MSG_LEN);
765 if (status != APR_SUCCESS) {
766 return ap_map_http_request_error(status, HTTP_BAD_REQUEST);
767 }
768 apr_brigade_flatten(input_brigade, buf, &len);
769
770 /* we can't use hm_processmsg because it uses hm_get_server() */
771 buf[len] = '\0';
772 tbl = apr_table_make(r->pool, 10);
773 qs_to_table(buf, tbl, r->pool);
774 apr_sockaddr_ip_get(&ip, r->connection->client_addr);
775 hmserver.ip = ip;
776 hmserver.port = 80;
777 if (apr_table_get(tbl, "port") != NULL)
778 hmserver.port = atoi(apr_table_get(tbl, "port"));
779 hmserver.busy = atoi(apr_table_get(tbl, "busy"));
780 hmserver.ready = atoi(apr_table_get(tbl, "ready"));
781 hmserver.seen = apr_time_now();
782 hm_update_stat(ctx, &hmserver, r->pool);
783
784 ap_set_content_type(r, "text/plain");
785 ap_set_content_length(r, 2);
786 ap_rputs("OK", r);
787 ap_rflush(r);
788
789 return OK;
790 }
791
hm_register_hooks(apr_pool_t * p)792 static void hm_register_hooks(apr_pool_t *p)
793 {
794 static const char * const aszSucc[]={ "mod_proxy.c", NULL };
795 ap_hook_post_config(hm_post_config, NULL, NULL, APR_HOOK_MIDDLE);
796
797 ap_hook_handler(hm_handler, NULL, aszSucc, APR_HOOK_FIRST);
798 }
799
hm_create_config(apr_pool_t * p,server_rec * s)800 static void *hm_create_config(apr_pool_t *p, server_rec *s)
801 {
802 hm_ctx_t *ctx = (hm_ctx_t *) apr_palloc(p, sizeof(hm_ctx_t));
803
804 ctx->active = 0;
805 ctx->storage_path = ap_runtime_dir_relative(p, DEFAULT_HEARTBEAT_STORAGE);
806 /* TODO: Add directive for tuning the update interval
807 */
808 ctx->interval = apr_time_from_sec(HM_UPDATE_SEC);
809 ctx->s = s;
810 apr_pool_create(&ctx->p, p);
811 apr_pool_tag(ctx->p, "hm_ctx");
812 ctx->servers = apr_hash_make(ctx->p);
813
814 return ctx;
815 }
816
cmd_hm_storage(cmd_parms * cmd,void * dconf,const char * path)817 static const char *cmd_hm_storage(cmd_parms *cmd,
818 void *dconf, const char *path)
819 {
820 apr_pool_t *p = cmd->pool;
821 hm_ctx_t *ctx =
822 (hm_ctx_t *) ap_get_module_config(cmd->server->module_config,
823 &heartmonitor_module);
824 const char *err = ap_check_cmd_context(cmd, GLOBAL_ONLY);
825
826 if (err != NULL) {
827 return err;
828 }
829
830 ctx->storage_path = ap_runtime_dir_relative(p, path);
831
832 return NULL;
833 }
834
cmd_hm_listen(cmd_parms * cmd,void * dconf,const char * mcast_addr)835 static const char *cmd_hm_listen(cmd_parms *cmd,
836 void *dconf, const char *mcast_addr)
837 {
838 apr_status_t rv;
839 char *host_str;
840 char *scope_id;
841 apr_port_t port = 0;
842 apr_pool_t *p = cmd->pool;
843 hm_ctx_t *ctx =
844 (hm_ctx_t *) ap_get_module_config(cmd->server->module_config,
845 &heartmonitor_module);
846 const char *err = ap_check_cmd_context(cmd, GLOBAL_ONLY);
847
848 if (err != NULL) {
849 return err;
850 }
851
852 if (!ctx->active) {
853 ctx->active = 1;
854 }
855 else {
856 return "HeartbeatListen: May only be specified once.";
857 }
858
859 rv = apr_parse_addr_port(&host_str, &scope_id, &port, mcast_addr, cmd->temp_pool);
860
861 if (rv) {
862 return "HeartbeatListen: Unable to parse multicast address.";
863 }
864
865 if (host_str == NULL) {
866 return "HeartbeatListen: No host provided in multicast address";
867 }
868
869 if (port == 0) {
870 return "HeartbeatListen: No port provided in multicast address";
871 }
872
873 rv = apr_sockaddr_info_get(&ctx->mcast_addr, host_str, APR_INET, port, 0,
874 p);
875
876 if (rv) {
877 return
878 "HeartbeatListen: apr_sockaddr_info_get failed on multicast address";
879 }
880
881 return NULL;
882 }
883
cmd_hm_maxworkers(cmd_parms * cmd,void * dconf,const char * data)884 static const char *cmd_hm_maxworkers(cmd_parms *cmd,
885 void *dconf, const char *data)
886 {
887 const char *err = ap_check_cmd_context(cmd, GLOBAL_ONLY);
888
889 if (err != NULL) {
890 return err;
891 }
892
893 maxworkers = atoi(data);
894 if (maxworkers <= 10)
895 return "HeartbeatMaxServers: Should be bigger than 10";
896
897 return NULL;
898 }
899
900 static const command_rec hm_cmds[] = {
901 AP_INIT_TAKE1("HeartbeatListen", cmd_hm_listen, NULL, RSRC_CONF,
902 "Address to listen for heartbeat requests"),
903 AP_INIT_TAKE1("HeartbeatStorage", cmd_hm_storage, NULL, RSRC_CONF,
904 "Path to store heartbeat data."),
905 AP_INIT_TAKE1("HeartbeatMaxServers", cmd_hm_maxworkers, NULL, RSRC_CONF,
906 "Max number of servers when using slotmem (instead file) to store heartbeat data."),
907 {NULL}
908 };
909
910 AP_DECLARE_MODULE(heartmonitor) = {
911 STANDARD20_MODULE_STUFF,
912 NULL, /* create per-directory config structure */
913 NULL, /* merge per-directory config structures */
914 hm_create_config, /* create per-server config structure */
915 NULL, /* merge per-server config structures */
916 hm_cmds, /* command apr_table_t */
917 hm_register_hooks
918 };
919