1 /* Licensed to the Apache Software Foundation (ASF) under one or more
2  * contributor license agreements.  See the NOTICE file distributed with
3  * this work for additional information regarding copyright ownership.
4  * The ASF licenses this file to You under the Apache License, Version 2.0
5  * (the "License"); you may not use this file except in compliance with
6  * the License.  You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #include "mod_proxy.h"
18 #include "scoreboard.h"
19 #include "ap_mpm.h"
20 #include "apr_version.h"
21 #include "ap_hooks.h"
22 #include "ap_slotmem.h"
23 #include "heartbeat.h"
24 
25 #ifndef LBM_HEARTBEAT_MAX_LASTSEEN
26 /* If we haven't seen a heartbeat in the last N seconds, don't count this IP
27  * as allive.
28  */
29 #define LBM_HEARTBEAT_MAX_LASTSEEN (10)
30 #endif
31 
32 module AP_MODULE_DECLARE_DATA lbmethod_heartbeat_module;
33 
34 static int (*ap_proxy_retry_worker_fn)(const char *proxy_function,
35         proxy_worker *worker, server_rec *s) = NULL;
36 
37 static const ap_slotmem_provider_t *storage = NULL;
38 static ap_slotmem_instance_t *hm_serversmem = NULL;
39 
40 /*
41  * configuration structure
42  * path: path of the file where the heartbeat information is stored.
43  */
44 typedef struct lb_hb_ctx_t
45 {
46     const char *path;
47 } lb_hb_ctx_t;
48 
49 typedef struct hb_server_t {
50     const char *ip;
51     int busy;
52     int ready;
53     int port;
54     int id;
55     apr_time_t seen;
56     proxy_worker *worker;
57 } hb_server_t;
58 
59 typedef struct ctx_servers {
60     apr_time_t now;
61     apr_hash_t *servers;
62 } ctx_servers_t;
63 
64 static void
argstr_to_table(apr_pool_t * p,char * str,apr_table_t * parms)65 argstr_to_table(apr_pool_t *p, char *str, apr_table_t *parms)
66 {
67     char *key;
68     char *value;
69     char *strtok_state;
70 
71     key = apr_strtok(str, "&", &strtok_state);
72     while (key) {
73         value = strchr(key, '=');
74         if (value) {
75             *value = '\0';      /* Split the string in two */
76             value++;            /* Skip passed the = */
77         }
78         else {
79             value = "1";
80         }
81         ap_unescape_url(key);
82         ap_unescape_url(value);
83         apr_table_set(parms, key, value);
84         /*
85          ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r,
86          "Found query arg: %s = %s", key, value);
87          */
88         key = apr_strtok(NULL, "&", &strtok_state);
89     }
90 }
91 
readfile_heartbeats(const char * path,apr_hash_t * servers,apr_pool_t * pool)92 static apr_status_t readfile_heartbeats(const char *path, apr_hash_t *servers,
93                                     apr_pool_t *pool)
94 {
95     apr_finfo_t fi;
96     apr_status_t rv;
97     apr_file_t *fp;
98 
99     if (!path) {
100         return APR_SUCCESS;
101     }
102 
103     rv = apr_file_open(&fp, path, APR_READ|APR_BINARY|APR_BUFFERED,
104                        APR_OS_DEFAULT, pool);
105 
106     if (rv) {
107         return rv;
108     }
109 
110     rv = apr_file_info_get(&fi, APR_FINFO_SIZE, fp);
111 
112     if (rv) {
113         return rv;
114     }
115 
116     {
117         char *t;
118         int lineno = 0;
119         apr_bucket_alloc_t *ba = apr_bucket_alloc_create(pool);
120         apr_bucket_brigade *bb = apr_brigade_create(pool, ba);
121         apr_bucket_brigade *tmpbb = apr_brigade_create(pool, ba);
122         apr_table_t *hbt = apr_table_make(pool, 10);
123 
124         apr_brigade_insert_file(bb, fp, 0, fi.size, pool);
125 
126         do {
127             hb_server_t *server;
128             char buf[4096];
129             apr_size_t bsize = sizeof(buf);
130             const char *ip;
131 
132             apr_brigade_cleanup(tmpbb);
133 
134             if (APR_BRIGADE_EMPTY(bb)) {
135                 break;
136             }
137 
138             rv = apr_brigade_split_line(tmpbb, bb,
139                                         APR_BLOCK_READ, sizeof(buf));
140             lineno++;
141 
142             if (rv) {
143                 return rv;
144             }
145 
146             apr_brigade_flatten(tmpbb, buf, &bsize);
147 
148             if (bsize == 0) {
149                 break;
150             }
151 
152             buf[bsize - 1] = 0;
153 
154             /* comment */
155             if (buf[0] == '#') {
156                 continue;
157             }
158 
159             /* line format: <IP> <query_string>\n */
160             t = strchr(buf, ' ');
161             if (!t) {
162                 continue;
163             }
164 
165             ip = apr_pstrndup(pool, buf, t - buf);
166             t++;
167 
168             server = apr_hash_get(servers, ip, APR_HASH_KEY_STRING);
169 
170             if (server == NULL) {
171                 server = apr_pcalloc(pool, sizeof(hb_server_t));
172                 server->ip = ip;
173                 server->port = 80;
174                 server->seen = -1;
175 
176                 apr_hash_set(servers, server->ip, APR_HASH_KEY_STRING, server);
177             }
178 
179             apr_table_clear(hbt);
180 
181             argstr_to_table(pool, apr_pstrdup(pool, t), hbt);
182 
183             if (apr_table_get(hbt, "busy")) {
184                 server->busy = atoi(apr_table_get(hbt, "busy"));
185             }
186 
187             if (apr_table_get(hbt, "ready")) {
188                 server->ready = atoi(apr_table_get(hbt, "ready"));
189             }
190 
191             if (apr_table_get(hbt, "lastseen")) {
192                 server->seen = atoi(apr_table_get(hbt, "lastseen"));
193             }
194 
195             if (apr_table_get(hbt, "port")) {
196                 server->port = atoi(apr_table_get(hbt, "port"));
197             }
198 
199             if (server->busy == 0 && server->ready != 0) {
200                 /* Server has zero threads active, but lots of them ready,
201                  * it likely just started up, so lets /4 the number ready,
202                  * to prevent us from completely flooding it with all new
203                  * requests.
204                  */
205                 server->ready = server->ready / 4;
206             }
207 
208         } while (1);
209     }
210 
211     return APR_SUCCESS;
212 }
213 
hm_read(void * mem,void * data,apr_pool_t * pool)214 static apr_status_t hm_read(void* mem, void *data, apr_pool_t *pool)
215 {
216     hm_slot_server_t *slotserver = (hm_slot_server_t *) mem;
217     ctx_servers_t *ctx = (ctx_servers_t *) data;
218     apr_hash_t *servers = (apr_hash_t *) ctx->servers;
219     hb_server_t *server = apr_hash_get(servers, slotserver->ip, APR_HASH_KEY_STRING);
220     if (server == NULL) {
221         server = apr_pcalloc(pool, sizeof(hb_server_t));
222         server->ip = apr_pstrdup(pool, slotserver->ip);
223         server->seen = -1;
224 
225         apr_hash_set(servers, server->ip, APR_HASH_KEY_STRING, server);
226 
227     }
228     server->busy = slotserver->busy;
229     server->ready = slotserver->ready;
230     server->seen = apr_time_sec(ctx->now - slotserver->seen);
231     server->id = slotserver->id;
232     if (server->busy == 0 && server->ready != 0) {
233         server->ready = server->ready / 4;
234     }
235     return APR_SUCCESS;
236 }
readslot_heartbeats(ctx_servers_t * ctx,apr_pool_t * pool)237 static apr_status_t readslot_heartbeats(ctx_servers_t *ctx,
238                                     apr_pool_t *pool)
239 {
240     storage->doall(hm_serversmem, hm_read, ctx, pool);
241     return APR_SUCCESS;
242 }
243 
244 
read_heartbeats(const char * path,apr_hash_t * servers,apr_pool_t * pool)245 static apr_status_t read_heartbeats(const char *path, apr_hash_t *servers,
246                                         apr_pool_t *pool)
247 {
248     apr_status_t rv;
249     if (hm_serversmem) {
250         ctx_servers_t ctx;
251         ctx.now = apr_time_now();
252         ctx.servers = servers;
253         rv = readslot_heartbeats(&ctx, pool);
254     } else
255         rv = readfile_heartbeats(path, servers, pool);
256     return rv;
257 }
258 
find_best_hb(proxy_balancer * balancer,request_rec * r)259 static proxy_worker *find_best_hb(proxy_balancer *balancer,
260                                   request_rec *r)
261 {
262     apr_status_t rv;
263     int i;
264     apr_uint32_t openslots = 0;
265     proxy_worker **worker;
266     hb_server_t *server;
267     apr_array_header_t *up_servers;
268     proxy_worker *mycandidate = NULL;
269     apr_pool_t *tpool;
270     apr_hash_t *servers;
271 
272     lb_hb_ctx_t *ctx =
273         ap_get_module_config(r->server->module_config,
274                              &lbmethod_heartbeat_module);
275 
276     if (!ap_proxy_retry_worker_fn) {
277         ap_proxy_retry_worker_fn =
278                 APR_RETRIEVE_OPTIONAL_FN(ap_proxy_retry_worker);
279         if (!ap_proxy_retry_worker_fn) {
280             /* can only happen if mod_proxy isn't loaded */
281             return NULL;
282         }
283     }
284 
285     apr_pool_create(&tpool, r->pool);
286 
287     servers = apr_hash_make(tpool);
288 
289     rv = read_heartbeats(ctx->path, servers, tpool);
290 
291     if (rv) {
292         ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r, APLOGNO(01213)
293                       "lb_heartbeat: Unable to read heartbeats at '%s'",
294                       ctx->path);
295         apr_pool_destroy(tpool);
296         return NULL;
297     }
298 
299     up_servers = apr_array_make(tpool, apr_hash_count(servers), sizeof(hb_server_t *));
300 
301     for (i = 0; i < balancer->workers->nelts; i++) {
302         worker = &APR_ARRAY_IDX(balancer->workers, i, proxy_worker *);
303         server = apr_hash_get(servers, (*worker)->s->hostname, APR_HASH_KEY_STRING);
304 
305         if (!server) {
306             ap_log_rerror(APLOG_MARK, APLOG_DEBUG, rv, r, APLOGNO(01214)
307                       "lb_heartbeat: No server for worker %s", (*worker)->s->name);
308             continue;
309         }
310 
311         if (!PROXY_WORKER_IS_USABLE(*worker)) {
312             ap_proxy_retry_worker_fn("BALANCER", *worker, r->server);
313         }
314 
315         if (PROXY_WORKER_IS_USABLE(*worker)) {
316             server->worker = *worker;
317             if (server->seen < LBM_HEARTBEAT_MAX_LASTSEEN) {
318                 openslots += server->ready;
319                 APR_ARRAY_PUSH(up_servers, hb_server_t *) = server;
320             }
321         }
322     }
323 
324     if (openslots > 0) {
325         apr_uint32_t c = 0;
326         apr_uint32_t pick = 0;
327 
328         pick = ap_random_pick(0, openslots);
329 
330         for (i = 0; i < up_servers->nelts; i++) {
331             server = APR_ARRAY_IDX(up_servers, i, hb_server_t *);
332             if (pick >= c && pick <= c + server->ready) {
333                 mycandidate = server->worker;
334             }
335 
336             c += server->ready;
337         }
338     }
339 
340     apr_pool_destroy(tpool);
341 
342     return mycandidate;
343 }
344 
reset(proxy_balancer * balancer,server_rec * s)345 static apr_status_t reset(proxy_balancer *balancer, server_rec *s) {
346         return APR_SUCCESS;
347 }
348 
age(proxy_balancer * balancer,server_rec * s)349 static apr_status_t age(proxy_balancer *balancer, server_rec *s) {
350         return APR_SUCCESS;
351 }
352 
353 static const proxy_balancer_method heartbeat =
354 {
355     "heartbeat",
356     &find_best_hb,
357     NULL,
358     &reset,
359     &age
360 };
361 
lb_hb_init(apr_pool_t * p,apr_pool_t * plog,apr_pool_t * ptemp,server_rec * s)362 static int lb_hb_init(apr_pool_t *p, apr_pool_t *plog,
363                           apr_pool_t *ptemp, server_rec *s)
364 {
365     apr_size_t size;
366     unsigned int num;
367     lb_hb_ctx_t *ctx = ap_get_module_config(s->module_config,
368                                             &lbmethod_heartbeat_module);
369 
370     /* do nothing on first call */
371     if (ap_state_query(AP_SQ_MAIN_STATE) == AP_SQ_MS_CREATE_PRE_CONFIG)
372         return OK;
373 
374     storage = ap_lookup_provider(AP_SLOTMEM_PROVIDER_GROUP, "shm",
375                                  AP_SLOTMEM_PROVIDER_VERSION);
376     if (!storage) {
377         ap_log_error(APLOG_MARK, APLOG_NOTICE, 0, s, APLOGNO(02281)
378                      "Failed to lookup provider 'shm' for '%s'. Maybe you "
379                      "need to load mod_slotmem_shm?",
380                      AP_SLOTMEM_PROVIDER_GROUP);
381         return OK;
382     }
383 
384     /* Try to use a slotmem created by mod_heartmonitor */
385     storage->attach(&hm_serversmem, "mod_heartmonitor", &size, &num, p);
386     if (!hm_serversmem)
387         ap_log_error(APLOG_MARK, APLOG_NOTICE, 0, s, APLOGNO(02282)
388                      "No slotmem from mod_heartmonitor");
389     else
390         ap_log_error(APLOG_MARK, APLOG_NOTICE, 0, s, APLOGNO(02283)
391                      "Using slotmem from mod_heartmonitor");
392 
393     if (hm_serversmem)
394         ctx->path = "(slotmem)";
395 
396     return OK;
397 }
398 
register_hooks(apr_pool_t * p)399 static void register_hooks(apr_pool_t *p)
400 {
401     static const char * const aszPred[]={ "mod_heartmonitor.c", NULL };
402     ap_register_provider(p, PROXY_LBMETHOD, "heartbeat", "0", &heartbeat);
403     ap_hook_post_config(lb_hb_init, aszPred, NULL, APR_HOOK_MIDDLE);
404 }
405 
lb_hb_create_config(apr_pool_t * p,server_rec * s)406 static void *lb_hb_create_config(apr_pool_t *p, server_rec *s)
407 {
408     lb_hb_ctx_t *ctx = (lb_hb_ctx_t *) apr_palloc(p, sizeof(lb_hb_ctx_t));
409 
410     ctx->path = ap_runtime_dir_relative(p, DEFAULT_HEARTBEAT_STORAGE);
411 
412     return ctx;
413 }
414 
lb_hb_merge_config(apr_pool_t * p,void * basev,void * overridesv)415 static void *lb_hb_merge_config(apr_pool_t *p, void *basev, void *overridesv)
416 {
417     lb_hb_ctx_t *ps = apr_pcalloc(p, sizeof(lb_hb_ctx_t));
418     lb_hb_ctx_t *base = (lb_hb_ctx_t *) basev;
419     lb_hb_ctx_t *overrides = (lb_hb_ctx_t *) overridesv;
420 
421     if (overrides->path) {
422         ps->path = apr_pstrdup(p, overrides->path);
423     }
424     else {
425         ps->path = apr_pstrdup(p, base->path);
426     }
427 
428     return ps;
429 }
430 
cmd_lb_hb_storage(cmd_parms * cmd,void * dconf,const char * path)431 static const char *cmd_lb_hb_storage(cmd_parms *cmd,
432                                   void *dconf, const char *path)
433 {
434     apr_pool_t *p = cmd->pool;
435     lb_hb_ctx_t *ctx =
436     (lb_hb_ctx_t *) ap_get_module_config(cmd->server->module_config,
437                                          &lbmethod_heartbeat_module);
438 
439     const char *err = ap_check_cmd_context(cmd, GLOBAL_ONLY);
440 
441     if (err != NULL) {
442         return err;
443     }
444 
445     ctx->path = ap_runtime_dir_relative(p, path);
446 
447     return NULL;
448 }
449 
450 static const command_rec cmds[] = {
451     AP_INIT_TAKE1("HeartbeatStorage", cmd_lb_hb_storage, NULL, RSRC_CONF,
452                   "Path to read heartbeat data."),
453     {NULL}
454 };
455 
456 AP_DECLARE_MODULE(lbmethod_heartbeat) = {
457     STANDARD20_MODULE_STUFF,
458     NULL,                       /* create per-directory config structure */
459     NULL,                       /* merge per-directory config structures */
460     lb_hb_create_config,        /* create per-server config structure */
461     lb_hb_merge_config,         /* merge per-server config structures */
462     cmds,                       /* command apr_table_t */
463     register_hooks              /* register hooks */
464 };
465