1
2 /*
3 * Copyright (C) Roman Arutyunyan
4 * Copyright (C) Nginx, Inc.
5 */
6
7
8 #include <ngx_config.h>
9 #include <ngx_core.h>
10 #include <ngx_stream.h>
11
12
13 typedef struct {
14 uint32_t hash;
15 ngx_str_t *server;
16 } ngx_stream_upstream_chash_point_t;
17
18
19 typedef struct {
20 ngx_uint_t number;
21 ngx_stream_upstream_chash_point_t point[1];
22 } ngx_stream_upstream_chash_points_t;
23
24
25 typedef struct {
26 ngx_stream_complex_value_t key;
27 ngx_stream_upstream_chash_points_t *points;
28 } ngx_stream_upstream_hash_srv_conf_t;
29
30
31 typedef struct {
32 /* the round robin data must be first */
33 ngx_stream_upstream_rr_peer_data_t rrp;
34 ngx_stream_upstream_hash_srv_conf_t *conf;
35 ngx_str_t key;
36 ngx_uint_t tries;
37 ngx_uint_t rehash;
38 uint32_t hash;
39 ngx_event_get_peer_pt get_rr_peer;
40 } ngx_stream_upstream_hash_peer_data_t;
41
42
43 static ngx_int_t ngx_stream_upstream_init_hash(ngx_conf_t *cf,
44 ngx_stream_upstream_srv_conf_t *us);
45 static ngx_int_t ngx_stream_upstream_init_hash_peer(ngx_stream_session_t *s,
46 ngx_stream_upstream_srv_conf_t *us);
47 static ngx_int_t ngx_stream_upstream_get_hash_peer(ngx_peer_connection_t *pc,
48 void *data);
49
50 static ngx_int_t ngx_stream_upstream_init_chash(ngx_conf_t *cf,
51 ngx_stream_upstream_srv_conf_t *us);
52 static int ngx_libc_cdecl
53 ngx_stream_upstream_chash_cmp_points(const void *one, const void *two);
54 static ngx_uint_t ngx_stream_upstream_find_chash_point(
55 ngx_stream_upstream_chash_points_t *points, uint32_t hash);
56 static ngx_int_t ngx_stream_upstream_init_chash_peer(ngx_stream_session_t *s,
57 ngx_stream_upstream_srv_conf_t *us);
58 static ngx_int_t ngx_stream_upstream_get_chash_peer(ngx_peer_connection_t *pc,
59 void *data);
60
61 static void *ngx_stream_upstream_hash_create_conf(ngx_conf_t *cf);
62 static char *ngx_stream_upstream_hash(ngx_conf_t *cf, ngx_command_t *cmd,
63 void *conf);
64
65
66 static ngx_command_t ngx_stream_upstream_hash_commands[] = {
67
68 { ngx_string("hash"),
69 NGX_STREAM_UPS_CONF|NGX_CONF_TAKE12,
70 ngx_stream_upstream_hash,
71 NGX_STREAM_SRV_CONF_OFFSET,
72 0,
73 NULL },
74
75 ngx_null_command
76 };
77
78
79 static ngx_stream_module_t ngx_stream_upstream_hash_module_ctx = {
80 NULL, /* preconfiguration */
81 NULL, /* postconfiguration */
82
83 NULL, /* create main configuration */
84 NULL, /* init main configuration */
85
86 ngx_stream_upstream_hash_create_conf, /* create server configuration */
87 NULL /* merge server configuration */
88 };
89
90
91 ngx_module_t ngx_stream_upstream_hash_module = {
92 NGX_MODULE_V1,
93 &ngx_stream_upstream_hash_module_ctx, /* module context */
94 ngx_stream_upstream_hash_commands, /* module directives */
95 NGX_STREAM_MODULE, /* module type */
96 NULL, /* init master */
97 NULL, /* init module */
98 NULL, /* init process */
99 NULL, /* init thread */
100 NULL, /* exit thread */
101 NULL, /* exit process */
102 NULL, /* exit master */
103 NGX_MODULE_V1_PADDING
104 };
105
106
107 static ngx_int_t
ngx_stream_upstream_init_hash(ngx_conf_t * cf,ngx_stream_upstream_srv_conf_t * us)108 ngx_stream_upstream_init_hash(ngx_conf_t *cf,
109 ngx_stream_upstream_srv_conf_t *us)
110 {
111 if (ngx_stream_upstream_init_round_robin(cf, us) != NGX_OK) {
112 return NGX_ERROR;
113 }
114
115 us->peer.init = ngx_stream_upstream_init_hash_peer;
116
117 return NGX_OK;
118 }
119
120
121 static ngx_int_t
ngx_stream_upstream_init_hash_peer(ngx_stream_session_t * s,ngx_stream_upstream_srv_conf_t * us)122 ngx_stream_upstream_init_hash_peer(ngx_stream_session_t *s,
123 ngx_stream_upstream_srv_conf_t *us)
124 {
125 ngx_stream_upstream_hash_srv_conf_t *hcf;
126 ngx_stream_upstream_hash_peer_data_t *hp;
127
128 hp = ngx_palloc(s->connection->pool,
129 sizeof(ngx_stream_upstream_hash_peer_data_t));
130 if (hp == NULL) {
131 return NGX_ERROR;
132 }
133
134 s->upstream->peer.data = &hp->rrp;
135
136 if (ngx_stream_upstream_init_round_robin_peer(s, us) != NGX_OK) {
137 return NGX_ERROR;
138 }
139
140 s->upstream->peer.get = ngx_stream_upstream_get_hash_peer;
141
142 hcf = ngx_stream_conf_upstream_srv_conf(us,
143 ngx_stream_upstream_hash_module);
144
145 if (ngx_stream_complex_value(s, &hcf->key, &hp->key) != NGX_OK) {
146 return NGX_ERROR;
147 }
148
149 ngx_log_debug1(NGX_LOG_DEBUG_STREAM, s->connection->log, 0,
150 "upstream hash key:\"%V\"", &hp->key);
151
152 hp->conf = hcf;
153 hp->tries = 0;
154 hp->rehash = 0;
155 hp->hash = 0;
156 hp->get_rr_peer = ngx_stream_upstream_get_round_robin_peer;
157
158 return NGX_OK;
159 }
160
161
162 static ngx_int_t
ngx_stream_upstream_get_hash_peer(ngx_peer_connection_t * pc,void * data)163 ngx_stream_upstream_get_hash_peer(ngx_peer_connection_t *pc, void *data)
164 {
165 ngx_stream_upstream_hash_peer_data_t *hp = data;
166
167 time_t now;
168 u_char buf[NGX_INT_T_LEN];
169 size_t size;
170 uint32_t hash;
171 ngx_int_t w;
172 uintptr_t m;
173 ngx_uint_t n, p;
174 ngx_stream_upstream_rr_peer_t *peer;
175
176 ngx_log_debug1(NGX_LOG_DEBUG_STREAM, pc->log, 0,
177 "get hash peer, try: %ui", pc->tries);
178
179 ngx_stream_upstream_rr_peers_rlock(hp->rrp.peers);
180
181 if (hp->tries > 20 || hp->rrp.peers->single || hp->key.len == 0) {
182 ngx_stream_upstream_rr_peers_unlock(hp->rrp.peers);
183 return hp->get_rr_peer(pc, &hp->rrp);
184 }
185
186 now = ngx_time();
187
188 pc->connection = NULL;
189
190 for ( ;; ) {
191
192 /*
193 * Hash expression is compatible with Cache::Memcached:
194 * ((crc32([REHASH] KEY) >> 16) & 0x7fff) + PREV_HASH
195 * with REHASH omitted at the first iteration.
196 */
197
198 ngx_crc32_init(hash);
199
200 if (hp->rehash > 0) {
201 size = ngx_sprintf(buf, "%ui", hp->rehash) - buf;
202 ngx_crc32_update(&hash, buf, size);
203 }
204
205 ngx_crc32_update(&hash, hp->key.data, hp->key.len);
206 ngx_crc32_final(hash);
207
208 hash = (hash >> 16) & 0x7fff;
209
210 hp->hash += hash;
211 hp->rehash++;
212
213 w = hp->hash % hp->rrp.peers->total_weight;
214 peer = hp->rrp.peers->peer;
215 p = 0;
216
217 while (w >= peer->weight) {
218 w -= peer->weight;
219 peer = peer->next;
220 p++;
221 }
222
223 n = p / (8 * sizeof(uintptr_t));
224 m = (uintptr_t) 1 << p % (8 * sizeof(uintptr_t));
225
226 if (hp->rrp.tried[n] & m) {
227 goto next;
228 }
229
230 ngx_stream_upstream_rr_peer_lock(hp->rrp.peers, peer);
231
232 ngx_log_debug2(NGX_LOG_DEBUG_STREAM, pc->log, 0,
233 "get hash peer, value:%uD, peer:%ui", hp->hash, p);
234
235 if (peer->down) {
236 ngx_stream_upstream_rr_peer_unlock(hp->rrp.peers, peer);
237 goto next;
238 }
239
240 if (peer->max_fails
241 && peer->fails >= peer->max_fails
242 && now - peer->checked <= peer->fail_timeout)
243 {
244 ngx_stream_upstream_rr_peer_unlock(hp->rrp.peers, peer);
245 goto next;
246 }
247
248 if (peer->max_conns && peer->conns >= peer->max_conns) {
249 ngx_stream_upstream_rr_peer_unlock(hp->rrp.peers, peer);
250 goto next;
251 }
252
253 break;
254
255 next:
256
257 if (++hp->tries > 20) {
258 ngx_stream_upstream_rr_peers_unlock(hp->rrp.peers);
259 return hp->get_rr_peer(pc, &hp->rrp);
260 }
261 }
262
263 hp->rrp.current = peer;
264
265 pc->sockaddr = peer->sockaddr;
266 pc->socklen = peer->socklen;
267 pc->name = &peer->name;
268
269 peer->conns++;
270
271 if (now - peer->checked > peer->fail_timeout) {
272 peer->checked = now;
273 }
274
275 ngx_stream_upstream_rr_peer_unlock(hp->rrp.peers, peer);
276 ngx_stream_upstream_rr_peers_unlock(hp->rrp.peers);
277
278 hp->rrp.tried[n] |= m;
279
280 return NGX_OK;
281 }
282
283
284 static ngx_int_t
ngx_stream_upstream_init_chash(ngx_conf_t * cf,ngx_stream_upstream_srv_conf_t * us)285 ngx_stream_upstream_init_chash(ngx_conf_t *cf,
286 ngx_stream_upstream_srv_conf_t *us)
287 {
288 u_char *host, *port, c;
289 size_t host_len, port_len, size;
290 uint32_t hash, base_hash;
291 ngx_str_t *server;
292 ngx_uint_t npoints, i, j;
293 ngx_stream_upstream_rr_peer_t *peer;
294 ngx_stream_upstream_rr_peers_t *peers;
295 ngx_stream_upstream_chash_points_t *points;
296 ngx_stream_upstream_hash_srv_conf_t *hcf;
297 union {
298 uint32_t value;
299 u_char byte[4];
300 } prev_hash;
301
302 if (ngx_stream_upstream_init_round_robin(cf, us) != NGX_OK) {
303 return NGX_ERROR;
304 }
305
306 us->peer.init = ngx_stream_upstream_init_chash_peer;
307
308 peers = us->peer.data;
309 npoints = peers->total_weight * 160;
310
311 size = sizeof(ngx_stream_upstream_chash_points_t)
312 + sizeof(ngx_stream_upstream_chash_point_t) * (npoints - 1);
313
314 points = ngx_palloc(cf->pool, size);
315 if (points == NULL) {
316 return NGX_ERROR;
317 }
318
319 points->number = 0;
320
321 for (peer = peers->peer; peer; peer = peer->next) {
322 server = &peer->server;
323
324 /*
325 * Hash expression is compatible with Cache::Memcached::Fast:
326 * crc32(HOST \0 PORT PREV_HASH).
327 */
328
329 if (server->len >= 5
330 && ngx_strncasecmp(server->data, (u_char *) "unix:", 5) == 0)
331 {
332 host = server->data + 5;
333 host_len = server->len - 5;
334 port = NULL;
335 port_len = 0;
336 goto done;
337 }
338
339 for (j = 0; j < server->len; j++) {
340 c = server->data[server->len - j - 1];
341
342 if (c == ':') {
343 host = server->data;
344 host_len = server->len - j - 1;
345 port = server->data + server->len - j;
346 port_len = j;
347 goto done;
348 }
349
350 if (c < '0' || c > '9') {
351 break;
352 }
353 }
354
355 host = server->data;
356 host_len = server->len;
357 port = NULL;
358 port_len = 0;
359
360 done:
361
362 ngx_crc32_init(base_hash);
363 ngx_crc32_update(&base_hash, host, host_len);
364 ngx_crc32_update(&base_hash, (u_char *) "", 1);
365 ngx_crc32_update(&base_hash, port, port_len);
366
367 prev_hash.value = 0;
368 npoints = peer->weight * 160;
369
370 for (j = 0; j < npoints; j++) {
371 hash = base_hash;
372
373 ngx_crc32_update(&hash, prev_hash.byte, 4);
374 ngx_crc32_final(hash);
375
376 points->point[points->number].hash = hash;
377 points->point[points->number].server = server;
378 points->number++;
379
380 #if (NGX_HAVE_LITTLE_ENDIAN)
381 prev_hash.value = hash;
382 #else
383 prev_hash.byte[0] = (u_char) (hash & 0xff);
384 prev_hash.byte[1] = (u_char) ((hash >> 8) & 0xff);
385 prev_hash.byte[2] = (u_char) ((hash >> 16) & 0xff);
386 prev_hash.byte[3] = (u_char) ((hash >> 24) & 0xff);
387 #endif
388 }
389 }
390
391 ngx_qsort(points->point,
392 points->number,
393 sizeof(ngx_stream_upstream_chash_point_t),
394 ngx_stream_upstream_chash_cmp_points);
395
396 for (i = 0, j = 1; j < points->number; j++) {
397 if (points->point[i].hash != points->point[j].hash) {
398 points->point[++i] = points->point[j];
399 }
400 }
401
402 points->number = i + 1;
403
404 hcf = ngx_stream_conf_upstream_srv_conf(us,
405 ngx_stream_upstream_hash_module);
406 hcf->points = points;
407
408 return NGX_OK;
409 }
410
411
412 static int ngx_libc_cdecl
ngx_stream_upstream_chash_cmp_points(const void * one,const void * two)413 ngx_stream_upstream_chash_cmp_points(const void *one, const void *two)
414 {
415 ngx_stream_upstream_chash_point_t *first =
416 (ngx_stream_upstream_chash_point_t *) one;
417 ngx_stream_upstream_chash_point_t *second =
418 (ngx_stream_upstream_chash_point_t *) two;
419
420 if (first->hash < second->hash) {
421 return -1;
422
423 } else if (first->hash > second->hash) {
424 return 1;
425
426 } else {
427 return 0;
428 }
429 }
430
431
432 static ngx_uint_t
ngx_stream_upstream_find_chash_point(ngx_stream_upstream_chash_points_t * points,uint32_t hash)433 ngx_stream_upstream_find_chash_point(ngx_stream_upstream_chash_points_t *points,
434 uint32_t hash)
435 {
436 ngx_uint_t i, j, k;
437 ngx_stream_upstream_chash_point_t *point;
438
439 /* find first point >= hash */
440
441 point = &points->point[0];
442
443 i = 0;
444 j = points->number;
445
446 while (i < j) {
447 k = (i + j) / 2;
448
449 if (hash > point[k].hash) {
450 i = k + 1;
451
452 } else if (hash < point[k].hash) {
453 j = k;
454
455 } else {
456 return k;
457 }
458 }
459
460 return i;
461 }
462
463
464 static ngx_int_t
ngx_stream_upstream_init_chash_peer(ngx_stream_session_t * s,ngx_stream_upstream_srv_conf_t * us)465 ngx_stream_upstream_init_chash_peer(ngx_stream_session_t *s,
466 ngx_stream_upstream_srv_conf_t *us)
467 {
468 uint32_t hash;
469 ngx_stream_upstream_hash_srv_conf_t *hcf;
470 ngx_stream_upstream_hash_peer_data_t *hp;
471
472 if (ngx_stream_upstream_init_hash_peer(s, us) != NGX_OK) {
473 return NGX_ERROR;
474 }
475
476 s->upstream->peer.get = ngx_stream_upstream_get_chash_peer;
477
478 hp = s->upstream->peer.data;
479 hcf = ngx_stream_conf_upstream_srv_conf(us,
480 ngx_stream_upstream_hash_module);
481
482 hash = ngx_crc32_long(hp->key.data, hp->key.len);
483
484 ngx_stream_upstream_rr_peers_rlock(hp->rrp.peers);
485
486 hp->hash = ngx_stream_upstream_find_chash_point(hcf->points, hash);
487
488 ngx_stream_upstream_rr_peers_unlock(hp->rrp.peers);
489
490 return NGX_OK;
491 }
492
493
494 static ngx_int_t
ngx_stream_upstream_get_chash_peer(ngx_peer_connection_t * pc,void * data)495 ngx_stream_upstream_get_chash_peer(ngx_peer_connection_t *pc, void *data)
496 {
497 ngx_stream_upstream_hash_peer_data_t *hp = data;
498
499 time_t now;
500 intptr_t m;
501 ngx_str_t *server;
502 ngx_int_t total;
503 ngx_uint_t i, n, best_i;
504 ngx_stream_upstream_rr_peer_t *peer, *best;
505 ngx_stream_upstream_chash_point_t *point;
506 ngx_stream_upstream_chash_points_t *points;
507 ngx_stream_upstream_hash_srv_conf_t *hcf;
508
509 ngx_log_debug1(NGX_LOG_DEBUG_STREAM, pc->log, 0,
510 "get consistent hash peer, try: %ui", pc->tries);
511
512 ngx_stream_upstream_rr_peers_wlock(hp->rrp.peers);
513
514 if (hp->tries > 20 || hp->rrp.peers->single || hp->key.len == 0) {
515 ngx_stream_upstream_rr_peers_unlock(hp->rrp.peers);
516 return hp->get_rr_peer(pc, &hp->rrp);
517 }
518
519 pc->connection = NULL;
520
521 now = ngx_time();
522 hcf = hp->conf;
523
524 points = hcf->points;
525 point = &points->point[0];
526
527 for ( ;; ) {
528 server = point[hp->hash % points->number].server;
529
530 ngx_log_debug2(NGX_LOG_DEBUG_STREAM, pc->log, 0,
531 "consistent hash peer:%uD, server:\"%V\"",
532 hp->hash, server);
533
534 best = NULL;
535 best_i = 0;
536 total = 0;
537
538 for (peer = hp->rrp.peers->peer, i = 0;
539 peer;
540 peer = peer->next, i++)
541 {
542 n = i / (8 * sizeof(uintptr_t));
543 m = (uintptr_t) 1 << i % (8 * sizeof(uintptr_t));
544
545 if (hp->rrp.tried[n] & m) {
546 continue;
547 }
548
549 if (peer->down) {
550 continue;
551 }
552
553 if (peer->max_fails
554 && peer->fails >= peer->max_fails
555 && now - peer->checked <= peer->fail_timeout)
556 {
557 continue;
558 }
559
560 if (peer->max_conns && peer->conns >= peer->max_conns) {
561 continue;
562 }
563
564 if (peer->server.len != server->len
565 || ngx_strncmp(peer->server.data, server->data, server->len)
566 != 0)
567 {
568 continue;
569 }
570
571 peer->current_weight += peer->effective_weight;
572 total += peer->effective_weight;
573
574 if (peer->effective_weight < peer->weight) {
575 peer->effective_weight++;
576 }
577
578 if (best == NULL || peer->current_weight > best->current_weight) {
579 best = peer;
580 best_i = i;
581 }
582 }
583
584 if (best) {
585 best->current_weight -= total;
586 break;
587 }
588
589 hp->hash++;
590 hp->tries++;
591
592 if (hp->tries > 20) {
593 ngx_stream_upstream_rr_peers_unlock(hp->rrp.peers);
594 return hp->get_rr_peer(pc, &hp->rrp);
595 }
596 }
597
598 hp->rrp.current = best;
599
600 pc->sockaddr = best->sockaddr;
601 pc->socklen = best->socklen;
602 pc->name = &best->name;
603
604 best->conns++;
605
606 if (now - best->checked > best->fail_timeout) {
607 best->checked = now;
608 }
609
610 ngx_stream_upstream_rr_peers_unlock(hp->rrp.peers);
611
612 n = best_i / (8 * sizeof(uintptr_t));
613 m = (uintptr_t) 1 << best_i % (8 * sizeof(uintptr_t));
614
615 hp->rrp.tried[n] |= m;
616
617 return NGX_OK;
618 }
619
620
621 static void *
ngx_stream_upstream_hash_create_conf(ngx_conf_t * cf)622 ngx_stream_upstream_hash_create_conf(ngx_conf_t *cf)
623 {
624 ngx_stream_upstream_hash_srv_conf_t *conf;
625
626 conf = ngx_palloc(cf->pool, sizeof(ngx_stream_upstream_hash_srv_conf_t));
627 if (conf == NULL) {
628 return NULL;
629 }
630
631 conf->points = NULL;
632
633 return conf;
634 }
635
636
637 static char *
ngx_stream_upstream_hash(ngx_conf_t * cf,ngx_command_t * cmd,void * conf)638 ngx_stream_upstream_hash(ngx_conf_t *cf, ngx_command_t *cmd, void *conf)
639 {
640 ngx_stream_upstream_hash_srv_conf_t *hcf = conf;
641
642 ngx_str_t *value;
643 ngx_stream_upstream_srv_conf_t *uscf;
644 ngx_stream_compile_complex_value_t ccv;
645
646 value = cf->args->elts;
647
648 ngx_memzero(&ccv, sizeof(ngx_stream_compile_complex_value_t));
649
650 ccv.cf = cf;
651 ccv.value = &value[1];
652 ccv.complex_value = &hcf->key;
653
654 if (ngx_stream_compile_complex_value(&ccv) != NGX_OK) {
655 return NGX_CONF_ERROR;
656 }
657
658 uscf = ngx_stream_conf_get_module_srv_conf(cf, ngx_stream_upstream_module);
659
660 if (uscf->peer.init_upstream) {
661 ngx_conf_log_error(NGX_LOG_WARN, cf, 0,
662 "load balancing method redefined");
663 }
664
665 uscf->flags = NGX_STREAM_UPSTREAM_CREATE
666 |NGX_STREAM_UPSTREAM_WEIGHT
667 |NGX_STREAM_UPSTREAM_MAX_CONNS
668 |NGX_STREAM_UPSTREAM_MAX_FAILS
669 |NGX_STREAM_UPSTREAM_FAIL_TIMEOUT
670 |NGX_STREAM_UPSTREAM_DOWN;
671
672 if (cf->args->nelts == 2) {
673 uscf->peer.init_upstream = ngx_stream_upstream_init_hash;
674
675 } else if (ngx_strcmp(value[2].data, "consistent") == 0) {
676 uscf->peer.init_upstream = ngx_stream_upstream_init_chash;
677
678 } else {
679 ngx_conf_log_error(NGX_LOG_EMERG, cf, 0,
680 "invalid parameter \"%V\"", &value[2]);
681 return NGX_CONF_ERROR;
682 }
683
684 return NGX_CONF_OK;
685 }
686