1 /* Licensed to the Apache Software Foundation (ASF) under one or more
2 * contributor license agreements. See the NOTICE file distributed with
3 * this work for additional information regarding copyright ownership.
4 * The ASF licenses this file to You under the Apache License, Version 2.0
5 * (the "License"); you may not use this file except in compliance with
6 * the License. You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 #include "mod_proxy.h"
18 #include "scoreboard.h"
19 #include "ap_mpm.h"
20 #include "apr_version.h"
21 #include "ap_hooks.h"
22 #include "ap_slotmem.h"
23 #include "heartbeat.h"
24
25 #ifndef LBM_HEARTBEAT_MAX_LASTSEEN
26 /* If we haven't seen a heartbeat in the last N seconds, don't count this IP
27 * as allive.
28 */
29 #define LBM_HEARTBEAT_MAX_LASTSEEN (10)
30 #endif
31
32 module AP_MODULE_DECLARE_DATA lbmethod_heartbeat_module;
33
34 static int (*ap_proxy_retry_worker_fn)(const char *proxy_function,
35 proxy_worker *worker, server_rec *s) = NULL;
36
37 static const ap_slotmem_provider_t *storage = NULL;
38 static ap_slotmem_instance_t *hm_serversmem = NULL;
39
40 /*
41 * configuration structure
42 * path: path of the file where the heartbeat information is stored.
43 */
44 typedef struct lb_hb_ctx_t
45 {
46 const char *path;
47 } lb_hb_ctx_t;
48
49 typedef struct hb_server_t {
50 const char *ip;
51 int busy;
52 int ready;
53 int port;
54 int id;
55 apr_time_t seen;
56 proxy_worker *worker;
57 } hb_server_t;
58
59 typedef struct ctx_servers {
60 apr_time_t now;
61 apr_hash_t *servers;
62 } ctx_servers_t;
63
64 static void
argstr_to_table(apr_pool_t * p,char * str,apr_table_t * parms)65 argstr_to_table(apr_pool_t *p, char *str, apr_table_t *parms)
66 {
67 char *key;
68 char *value;
69 char *strtok_state;
70
71 key = apr_strtok(str, "&", &strtok_state);
72 while (key) {
73 value = strchr(key, '=');
74 if (value) {
75 *value = '\0'; /* Split the string in two */
76 value++; /* Skip passed the = */
77 }
78 else {
79 value = "1";
80 }
81 ap_unescape_url(key);
82 ap_unescape_url(value);
83 apr_table_set(parms, key, value);
84 /*
85 ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r,
86 "Found query arg: %s = %s", key, value);
87 */
88 key = apr_strtok(NULL, "&", &strtok_state);
89 }
90 }
91
readfile_heartbeats(const char * path,apr_hash_t * servers,apr_pool_t * pool)92 static apr_status_t readfile_heartbeats(const char *path, apr_hash_t *servers,
93 apr_pool_t *pool)
94 {
95 apr_finfo_t fi;
96 apr_status_t rv;
97 apr_file_t *fp;
98
99 if (!path) {
100 return APR_SUCCESS;
101 }
102
103 rv = apr_file_open(&fp, path, APR_READ|APR_BINARY|APR_BUFFERED,
104 APR_OS_DEFAULT, pool);
105
106 if (rv) {
107 return rv;
108 }
109
110 rv = apr_file_info_get(&fi, APR_FINFO_SIZE, fp);
111
112 if (rv) {
113 return rv;
114 }
115
116 {
117 char *t;
118 int lineno = 0;
119 apr_bucket_alloc_t *ba = apr_bucket_alloc_create(pool);
120 apr_bucket_brigade *bb = apr_brigade_create(pool, ba);
121 apr_bucket_brigade *tmpbb = apr_brigade_create(pool, ba);
122 apr_table_t *hbt = apr_table_make(pool, 10);
123
124 apr_brigade_insert_file(bb, fp, 0, fi.size, pool);
125
126 do {
127 hb_server_t *server;
128 char buf[4096];
129 apr_size_t bsize = sizeof(buf);
130 const char *ip;
131
132 apr_brigade_cleanup(tmpbb);
133
134 if (APR_BRIGADE_EMPTY(bb)) {
135 break;
136 }
137
138 rv = apr_brigade_split_line(tmpbb, bb,
139 APR_BLOCK_READ, sizeof(buf));
140 lineno++;
141
142 if (rv) {
143 return rv;
144 }
145
146 apr_brigade_flatten(tmpbb, buf, &bsize);
147
148 if (bsize == 0) {
149 break;
150 }
151
152 buf[bsize - 1] = 0;
153
154 /* comment */
155 if (buf[0] == '#') {
156 continue;
157 }
158
159 /* line format: <IP> <query_string>\n */
160 t = strchr(buf, ' ');
161 if (!t) {
162 continue;
163 }
164
165 ip = apr_pstrndup(pool, buf, t - buf);
166 t++;
167
168 server = apr_hash_get(servers, ip, APR_HASH_KEY_STRING);
169
170 if (server == NULL) {
171 server = apr_pcalloc(pool, sizeof(hb_server_t));
172 server->ip = ip;
173 server->port = 80;
174 server->seen = -1;
175
176 apr_hash_set(servers, server->ip, APR_HASH_KEY_STRING, server);
177 }
178
179 apr_table_clear(hbt);
180
181 argstr_to_table(pool, apr_pstrdup(pool, t), hbt);
182
183 if (apr_table_get(hbt, "busy")) {
184 server->busy = atoi(apr_table_get(hbt, "busy"));
185 }
186
187 if (apr_table_get(hbt, "ready")) {
188 server->ready = atoi(apr_table_get(hbt, "ready"));
189 }
190
191 if (apr_table_get(hbt, "lastseen")) {
192 server->seen = atoi(apr_table_get(hbt, "lastseen"));
193 }
194
195 if (apr_table_get(hbt, "port")) {
196 server->port = atoi(apr_table_get(hbt, "port"));
197 }
198
199 if (server->busy == 0 && server->ready != 0) {
200 /* Server has zero threads active, but lots of them ready,
201 * it likely just started up, so lets /4 the number ready,
202 * to prevent us from completely flooding it with all new
203 * requests.
204 */
205 server->ready = server->ready / 4;
206 }
207
208 } while (1);
209 }
210
211 return APR_SUCCESS;
212 }
213
hm_read(void * mem,void * data,apr_pool_t * pool)214 static apr_status_t hm_read(void* mem, void *data, apr_pool_t *pool)
215 {
216 hm_slot_server_t *slotserver = (hm_slot_server_t *) mem;
217 ctx_servers_t *ctx = (ctx_servers_t *) data;
218 apr_hash_t *servers = (apr_hash_t *) ctx->servers;
219 hb_server_t *server = apr_hash_get(servers, slotserver->ip, APR_HASH_KEY_STRING);
220 if (server == NULL) {
221 server = apr_pcalloc(pool, sizeof(hb_server_t));
222 server->ip = apr_pstrdup(pool, slotserver->ip);
223 server->seen = -1;
224
225 apr_hash_set(servers, server->ip, APR_HASH_KEY_STRING, server);
226
227 }
228 server->busy = slotserver->busy;
229 server->ready = slotserver->ready;
230 server->seen = apr_time_sec(ctx->now - slotserver->seen);
231 server->id = slotserver->id;
232 if (server->busy == 0 && server->ready != 0) {
233 server->ready = server->ready / 4;
234 }
235 return APR_SUCCESS;
236 }
readslot_heartbeats(ctx_servers_t * ctx,apr_pool_t * pool)237 static apr_status_t readslot_heartbeats(ctx_servers_t *ctx,
238 apr_pool_t *pool)
239 {
240 storage->doall(hm_serversmem, hm_read, ctx, pool);
241 return APR_SUCCESS;
242 }
243
244
read_heartbeats(const char * path,apr_hash_t * servers,apr_pool_t * pool)245 static apr_status_t read_heartbeats(const char *path, apr_hash_t *servers,
246 apr_pool_t *pool)
247 {
248 apr_status_t rv;
249 if (hm_serversmem) {
250 ctx_servers_t ctx;
251 ctx.now = apr_time_now();
252 ctx.servers = servers;
253 rv = readslot_heartbeats(&ctx, pool);
254 } else
255 rv = readfile_heartbeats(path, servers, pool);
256 return rv;
257 }
258
find_best_hb(proxy_balancer * balancer,request_rec * r)259 static proxy_worker *find_best_hb(proxy_balancer *balancer,
260 request_rec *r)
261 {
262 apr_status_t rv;
263 int i;
264 apr_uint32_t openslots = 0;
265 proxy_worker **worker;
266 hb_server_t *server;
267 apr_array_header_t *up_servers;
268 proxy_worker *mycandidate = NULL;
269 apr_pool_t *tpool;
270 apr_hash_t *servers;
271
272 lb_hb_ctx_t *ctx =
273 ap_get_module_config(r->server->module_config,
274 &lbmethod_heartbeat_module);
275
276 if (!ap_proxy_retry_worker_fn) {
277 ap_proxy_retry_worker_fn =
278 APR_RETRIEVE_OPTIONAL_FN(ap_proxy_retry_worker);
279 if (!ap_proxy_retry_worker_fn) {
280 /* can only happen if mod_proxy isn't loaded */
281 return NULL;
282 }
283 }
284
285 apr_pool_create(&tpool, r->pool);
286
287 servers = apr_hash_make(tpool);
288
289 rv = read_heartbeats(ctx->path, servers, tpool);
290
291 if (rv) {
292 ap_log_rerror(APLOG_MARK, APLOG_ERR, rv, r, APLOGNO(01213)
293 "lb_heartbeat: Unable to read heartbeats at '%s'",
294 ctx->path);
295 apr_pool_destroy(tpool);
296 return NULL;
297 }
298
299 up_servers = apr_array_make(tpool, apr_hash_count(servers), sizeof(hb_server_t *));
300
301 for (i = 0; i < balancer->workers->nelts; i++) {
302 worker = &APR_ARRAY_IDX(balancer->workers, i, proxy_worker *);
303 server = apr_hash_get(servers, (*worker)->s->hostname, APR_HASH_KEY_STRING);
304
305 if (!server) {
306 ap_log_rerror(APLOG_MARK, APLOG_DEBUG, rv, r, APLOGNO(01214)
307 "lb_heartbeat: No server for worker %s", (*worker)->s->name);
308 continue;
309 }
310
311 if (!PROXY_WORKER_IS_USABLE(*worker)) {
312 ap_proxy_retry_worker_fn("BALANCER", *worker, r->server);
313 }
314
315 if (PROXY_WORKER_IS_USABLE(*worker)) {
316 server->worker = *worker;
317 if (server->seen < LBM_HEARTBEAT_MAX_LASTSEEN) {
318 openslots += server->ready;
319 APR_ARRAY_PUSH(up_servers, hb_server_t *) = server;
320 }
321 }
322 }
323
324 if (openslots > 0) {
325 apr_uint32_t c = 0;
326 apr_uint32_t pick = 0;
327
328 pick = ap_random_pick(0, openslots);
329
330 for (i = 0; i < up_servers->nelts; i++) {
331 server = APR_ARRAY_IDX(up_servers, i, hb_server_t *);
332 if (pick >= c && pick <= c + server->ready) {
333 mycandidate = server->worker;
334 }
335
336 c += server->ready;
337 }
338 }
339
340 apr_pool_destroy(tpool);
341
342 return mycandidate;
343 }
344
reset(proxy_balancer * balancer,server_rec * s)345 static apr_status_t reset(proxy_balancer *balancer, server_rec *s) {
346 return APR_SUCCESS;
347 }
348
age(proxy_balancer * balancer,server_rec * s)349 static apr_status_t age(proxy_balancer *balancer, server_rec *s) {
350 return APR_SUCCESS;
351 }
352
353 static const proxy_balancer_method heartbeat =
354 {
355 "heartbeat",
356 &find_best_hb,
357 NULL,
358 &reset,
359 &age
360 };
361
lb_hb_init(apr_pool_t * p,apr_pool_t * plog,apr_pool_t * ptemp,server_rec * s)362 static int lb_hb_init(apr_pool_t *p, apr_pool_t *plog,
363 apr_pool_t *ptemp, server_rec *s)
364 {
365 apr_size_t size;
366 unsigned int num;
367 lb_hb_ctx_t *ctx = ap_get_module_config(s->module_config,
368 &lbmethod_heartbeat_module);
369
370 /* do nothing on first call */
371 if (ap_state_query(AP_SQ_MAIN_STATE) == AP_SQ_MS_CREATE_PRE_CONFIG)
372 return OK;
373
374 storage = ap_lookup_provider(AP_SLOTMEM_PROVIDER_GROUP, "shm",
375 AP_SLOTMEM_PROVIDER_VERSION);
376 if (!storage) {
377 ap_log_error(APLOG_MARK, APLOG_NOTICE, 0, s, APLOGNO(02281)
378 "Failed to lookup provider 'shm' for '%s'. Maybe you "
379 "need to load mod_slotmem_shm?",
380 AP_SLOTMEM_PROVIDER_GROUP);
381 return OK;
382 }
383
384 /* Try to use a slotmem created by mod_heartmonitor */
385 storage->attach(&hm_serversmem, "mod_heartmonitor", &size, &num, p);
386 if (!hm_serversmem)
387 ap_log_error(APLOG_MARK, APLOG_NOTICE, 0, s, APLOGNO(02282)
388 "No slotmem from mod_heartmonitor");
389 else
390 ap_log_error(APLOG_MARK, APLOG_NOTICE, 0, s, APLOGNO(02283)
391 "Using slotmem from mod_heartmonitor");
392
393 if (hm_serversmem)
394 ctx->path = "(slotmem)";
395
396 return OK;
397 }
398
register_hooks(apr_pool_t * p)399 static void register_hooks(apr_pool_t *p)
400 {
401 static const char * const aszPred[]={ "mod_heartmonitor.c", NULL };
402 ap_register_provider(p, PROXY_LBMETHOD, "heartbeat", "0", &heartbeat);
403 ap_hook_post_config(lb_hb_init, aszPred, NULL, APR_HOOK_MIDDLE);
404 }
405
lb_hb_create_config(apr_pool_t * p,server_rec * s)406 static void *lb_hb_create_config(apr_pool_t *p, server_rec *s)
407 {
408 lb_hb_ctx_t *ctx = (lb_hb_ctx_t *) apr_palloc(p, sizeof(lb_hb_ctx_t));
409
410 ctx->path = ap_runtime_dir_relative(p, DEFAULT_HEARTBEAT_STORAGE);
411
412 return ctx;
413 }
414
lb_hb_merge_config(apr_pool_t * p,void * basev,void * overridesv)415 static void *lb_hb_merge_config(apr_pool_t *p, void *basev, void *overridesv)
416 {
417 lb_hb_ctx_t *ps = apr_pcalloc(p, sizeof(lb_hb_ctx_t));
418 lb_hb_ctx_t *base = (lb_hb_ctx_t *) basev;
419 lb_hb_ctx_t *overrides = (lb_hb_ctx_t *) overridesv;
420
421 if (overrides->path) {
422 ps->path = apr_pstrdup(p, overrides->path);
423 }
424 else {
425 ps->path = apr_pstrdup(p, base->path);
426 }
427
428 return ps;
429 }
430
cmd_lb_hb_storage(cmd_parms * cmd,void * dconf,const char * path)431 static const char *cmd_lb_hb_storage(cmd_parms *cmd,
432 void *dconf, const char *path)
433 {
434 apr_pool_t *p = cmd->pool;
435 lb_hb_ctx_t *ctx =
436 (lb_hb_ctx_t *) ap_get_module_config(cmd->server->module_config,
437 &lbmethod_heartbeat_module);
438
439 const char *err = ap_check_cmd_context(cmd, GLOBAL_ONLY);
440
441 if (err != NULL) {
442 return err;
443 }
444
445 ctx->path = ap_runtime_dir_relative(p, path);
446
447 return NULL;
448 }
449
450 static const command_rec cmds[] = {
451 AP_INIT_TAKE1("HeartbeatStorage", cmd_lb_hb_storage, NULL, RSRC_CONF,
452 "Path to read heartbeat data."),
453 {NULL}
454 };
455
456 AP_DECLARE_MODULE(lbmethod_heartbeat) = {
457 STANDARD20_MODULE_STUFF,
458 NULL, /* create per-directory config structure */
459 NULL, /* merge per-directory config structures */
460 lb_hb_create_config, /* create per-server config structure */
461 lb_hb_merge_config, /* merge per-server config structures */
462 cmds, /* command apr_table_t */
463 register_hooks /* register hooks */
464 };
465