1
2 /*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7 #include <math.h>
8 #include <nxt_router.h>
9 #include <nxt_http.h>
10 #include <nxt_upstream.h>
11
12
13 struct nxt_upstream_round_robin_server_s {
14 nxt_sockaddr_t *sockaddr;
15
16 int32_t current_weight;
17 int32_t effective_weight;
18 int32_t weight;
19
20 uint8_t protocol;
21 };
22
23
24 struct nxt_upstream_round_robin_s {
25 uint32_t items;
26 nxt_upstream_round_robin_server_t server[0];
27 };
28
29
30 static nxt_upstream_t *nxt_upstream_round_robin_joint_create(
31 nxt_router_temp_conf_t *tmcf, nxt_upstream_t *upstream);
32 static void nxt_upstream_round_robin_server_get(nxt_task_t *task,
33 nxt_upstream_server_t *us);
34
35
36 static const nxt_upstream_server_proto_t nxt_upstream_round_robin_proto = {
37 .joint_create = nxt_upstream_round_robin_joint_create,
38 .get = nxt_upstream_round_robin_server_get,
39 };
40
41
42 nxt_int_t
nxt_upstream_round_robin_create(nxt_task_t * task,nxt_router_temp_conf_t * tmcf,nxt_conf_value_t * upstream_conf,nxt_upstream_t * upstream)43 nxt_upstream_round_robin_create(nxt_task_t *task, nxt_router_temp_conf_t *tmcf,
44 nxt_conf_value_t *upstream_conf, nxt_upstream_t *upstream)
45 {
46 double total, k, w;
47 size_t size;
48 uint32_t i, n, next, wt;
49 nxt_mp_t *mp;
50 nxt_str_t name;
51 nxt_sockaddr_t *sa;
52 nxt_conf_value_t *servers_conf, *srvcf, *wtcf;
53 nxt_upstream_round_robin_t *urr;
54
55 static nxt_str_t servers = nxt_string("servers");
56 static nxt_str_t weight = nxt_string("weight");
57
58 mp = tmcf->router_conf->mem_pool;
59
60 servers_conf = nxt_conf_get_object_member(upstream_conf, &servers, NULL);
61 n = nxt_conf_object_members_count(servers_conf);
62
63 total = 0.0;
64 next = 0;
65
66 for (i = 0; i < n; i++) {
67 srvcf = nxt_conf_next_object_member(servers_conf, &name, &next);
68 wtcf = nxt_conf_get_object_member(srvcf, &weight, NULL);
69 w = (wtcf != NULL) ? nxt_conf_get_number(wtcf) : 1;
70 total += w;
71 }
72
73 /*
74 * This prevents overflow of int32_t
75 * in nxt_upstream_round_robin_server_get().
76 */
77 k = (total == 0) ? 0 : (NXT_INT32_T_MAX / 2) / total;
78
79 if (isinf(k)) {
80 k = 1;
81 }
82
83 size = sizeof(nxt_upstream_round_robin_t)
84 + n * sizeof(nxt_upstream_round_robin_server_t);
85
86 urr = nxt_mp_zalloc(mp, size);
87 if (nxt_slow_path(urr == NULL)) {
88 return NXT_ERROR;
89 }
90
91 urr->items = n;
92 next = 0;
93
94 for (i = 0; i < n; i++) {
95 srvcf = nxt_conf_next_object_member(servers_conf, &name, &next);
96
97 sa = nxt_sockaddr_parse(mp, &name);
98 if (nxt_slow_path(sa == NULL)) {
99 return NXT_ERROR;
100 }
101
102 sa->type = SOCK_STREAM;
103
104 urr->server[i].sockaddr = sa;
105 urr->server[i].protocol = NXT_HTTP_PROTO_H1;
106
107 wtcf = nxt_conf_get_object_member(srvcf, &weight, NULL);
108 w = (wtcf != NULL) ? k * nxt_conf_get_number(wtcf) : k;
109 wt = (w > 1 || w == 0) ? round(w) : 1;
110
111 urr->server[i].weight = wt;
112 urr->server[i].effective_weight = wt;
113 }
114
115 upstream->proto = &nxt_upstream_round_robin_proto;
116 upstream->type.round_robin = urr;
117
118 return NXT_OK;
119 }
120
121
122 static nxt_upstream_t *
nxt_upstream_round_robin_joint_create(nxt_router_temp_conf_t * tmcf,nxt_upstream_t * upstream)123 nxt_upstream_round_robin_joint_create(nxt_router_temp_conf_t *tmcf,
124 nxt_upstream_t *upstream)
125 {
126 size_t size;
127 uint32_t i, n;
128 nxt_mp_t *mp;
129 nxt_upstream_t *u;
130 nxt_upstream_round_robin_t *urr, *urrcf;
131
132 mp = tmcf->router_conf->mem_pool;
133
134 u = nxt_mp_alloc(mp, sizeof(nxt_upstream_t));
135 if (nxt_slow_path(u == NULL)) {
136 return NULL;
137 }
138
139 *u = *upstream;
140
141 urrcf = upstream->type.round_robin;
142
143 size = sizeof(nxt_upstream_round_robin_t)
144 + urrcf->items * sizeof(nxt_upstream_round_robin_server_t);
145
146 urr = nxt_mp_alloc(mp, size);
147 if (nxt_slow_path(urr == NULL)) {
148 return NULL;
149 }
150
151 u->type.round_robin = urr;
152
153 n = urrcf->items;
154 urr->items = n;
155
156 for (i = 0; i < n; i++) {
157 urr->server[i] = urrcf->server[i];
158 }
159
160 return u;
161 }
162
163
164 static void
nxt_upstream_round_robin_server_get(nxt_task_t * task,nxt_upstream_server_t * us)165 nxt_upstream_round_robin_server_get(nxt_task_t *task, nxt_upstream_server_t *us)
166 {
167 int32_t total;
168 uint32_t i, n;
169 nxt_upstream_round_robin_t *round_robin;
170 nxt_upstream_round_robin_server_t *s, *best;
171
172 best = NULL;
173 total = 0;
174
175 round_robin = us->upstream->type.round_robin;
176
177 s = round_robin->server;
178 n = round_robin->items;
179
180 for (i = 0; i < n; i++) {
181
182 s[i].current_weight += s[i].effective_weight;
183 total += s[i].effective_weight;
184
185 if (s[i].effective_weight < s[i].weight) {
186 s[i].effective_weight++;
187 }
188
189 if (best == NULL || s[i].current_weight > best->current_weight) {
190 best = &s[i];
191 }
192 }
193
194 if (best == NULL || total == 0) {
195 us->state->error(task, us);
196 return;
197 }
198
199 best->current_weight -= total;
200 us->sockaddr = best->sockaddr;
201 us->protocol = best->protocol;
202 us->server.round_robin = best;
203
204 us->state->ready(task, us);
205 }
206