1 #include "uwsgi.h"
2
3 /*
4
5 subscription subsystem
6
7 each subscription slot is as an hashed item in a dictionary
8
9 each slot has a circular linked list containing the nodes names
10
11 the structure and system is very similar to uwsgi_dyn_dict already used by the mime type parser
12
13 This system is not mean to run on shared memory. If you have multiple processes for the same app, you have to create
14 a new subscriptions slot list.
15
16 To avoid removal of already using nodes, a reference count system has been implemented
17
18 */
19
20
21 extern struct uwsgi_server uwsgi;
22
23 #ifdef UWSGI_SSL
uwsgi_subscription_sni_check(struct uwsgi_subscribe_slot * current_slot,struct uwsgi_subscribe_req * usr)24 static void uwsgi_subscription_sni_check(struct uwsgi_subscribe_slot *current_slot, struct uwsgi_subscribe_req *usr) {
25 if (usr->sni_key_len > 0 && usr->sni_crt_len > 0) {
26 if (!current_slot->sni_enabled) {
27 char *sni_key = uwsgi_concat2n(usr->sni_key, usr->sni_key_len, "", 0);
28 char *sni_crt = uwsgi_concat2n(usr->sni_crt, usr->sni_crt_len, "", 0);
29 char *sni_ca = NULL;
30 if (usr->sni_ca_len > 0) {
31 sni_ca = uwsgi_concat2n(usr->sni_ca, usr->sni_ca_len, "", 0);
32 }
33 char *servername = NULL;
34 char *colon = memchr(current_slot->key, ':', current_slot->keylen);
35 if (colon) {
36 servername = uwsgi_concat2n(current_slot->key, colon - current_slot->key, "", 0);
37 }
38 else {
39 servername = uwsgi_concat2n(current_slot->key, current_slot->keylen, "", 0);
40 }
41 if (uwsgi_ssl_add_sni_item(servername, sni_crt, sni_key, uwsgi.sni_dir_ciphers, sni_ca)) {
42 current_slot->sni_enabled = 1;
43 }
44 if (sni_key)
45 free(sni_key);
46 if (sni_crt)
47 free(sni_crt);
48 if (sni_ca)
49 free(sni_ca);
50 }
51 }
52 }
53 #endif
54
uwsgi_subscription_credentials_check(struct uwsgi_subscribe_slot * slot,struct uwsgi_subscribe_req * usr)55 int uwsgi_subscription_credentials_check(struct uwsgi_subscribe_slot *slot, struct uwsgi_subscribe_req *usr) {
56 struct uwsgi_string_list *usl = NULL;
57 uwsgi_foreach(usl, uwsgi.subscriptions_credentials_check_dir) {
58 char *filename = uwsgi_concat2n(usl->value, usl->len, slot->key, slot->keylen);
59 struct stat st;
60 int ret = stat(filename, &st);
61 free(filename);
62 if (ret != 0)
63 continue;
64 if (st.st_uid != usr->uid)
65 continue;
66 if (st.st_gid != usr->gid)
67 continue;
68 // accepted...
69 return 1;
70 }
71 return 0;
72 }
73
uwsgi_get_subscribe_slot(struct uwsgi_subscribe_slot ** slot,char * key,uint16_t keylen)74 struct uwsgi_subscribe_slot *uwsgi_get_subscribe_slot(struct uwsgi_subscribe_slot **slot, char *key, uint16_t keylen) {
75
76 if (keylen > 0xff)
77 return NULL;
78
79 uint32_t hash = djb33x_hash(key, keylen);
80 int hash_key = hash % 0xffff;
81
82 struct uwsgi_subscribe_slot *current_slot = slot[hash_key];
83
84
85 #ifdef UWSGI_DEBUG
86 uwsgi_log("****************************\n");
87 while (current_slot) {
88 uwsgi_log("slot %.*s %d\n", current_slot->keylen, current_slot->key, current_slot->hits);
89 current_slot = current_slot->next;
90 }
91 uwsgi_log("****************************\n");
92 current_slot = slot[hash_key];
93 #endif
94
95 while (current_slot) {
96 if (!uwsgi_strncmp(key, keylen, current_slot->key, current_slot->keylen)) {
97 // auto optimization
98 if (current_slot->prev) {
99 if (current_slot->hits > current_slot->prev->hits) {
100 struct uwsgi_subscribe_slot *slot_parent = current_slot->prev->prev, *slot_prev = current_slot->prev;
101 if (slot_parent) {
102 slot_parent->next = current_slot;
103 }
104 else {
105 slot[hash_key] = current_slot;
106 }
107
108 if (current_slot->next) {
109 current_slot->next->prev = slot_prev;
110 }
111
112 slot_prev->prev = current_slot;
113 slot_prev->next = current_slot->next;
114
115 current_slot->next = slot_prev;
116 current_slot->prev = slot_parent;
117
118 }
119 }
120 return current_slot;
121 }
122 current_slot = current_slot->next;
123 // check for loopy optimization
124 if (current_slot == slot[hash_key])
125 break;
126 }
127
128 return NULL;
129 }
130
131 // least reference count
uwsgi_subscription_algo_lrc(struct uwsgi_subscribe_slot * current_slot,struct uwsgi_subscribe_node * node)132 static struct uwsgi_subscribe_node *uwsgi_subscription_algo_lrc(struct uwsgi_subscribe_slot *current_slot, struct uwsgi_subscribe_node *node) {
133 // if node is NULL we are in the second step (in lrc mode we do not use the first step)
134 if (node)
135 return NULL;
136
137 struct uwsgi_subscribe_node *choosen_node = NULL;
138 node = current_slot->nodes;
139 uint64_t min_rc = 0;
140 while (node) {
141 if (!node->death_mark) {
142 if (min_rc == 0 || node->reference < min_rc) {
143 min_rc = node->reference;
144 choosen_node = node;
145 if (min_rc == 0 && !(node->next && node->next->reference <= node->reference && node->next->last_requests <= node->last_requests))
146 break;
147 }
148 }
149 node = node->next;
150 }
151
152 if (choosen_node) {
153 choosen_node->reference++;
154 }
155
156 return choosen_node;
157 }
158
159 // weighted least reference count
uwsgi_subscription_algo_wlrc(struct uwsgi_subscribe_slot * current_slot,struct uwsgi_subscribe_node * node)160 static struct uwsgi_subscribe_node *uwsgi_subscription_algo_wlrc(struct uwsgi_subscribe_slot *current_slot, struct uwsgi_subscribe_node *node) {
161 // if node is NULL we are in the second step (in wlrc mode we do not use the first step)
162 if (node)
163 return NULL;
164
165 struct uwsgi_subscribe_node *choosen_node = NULL;
166 node = current_slot->nodes;
167 double min_rc = 0;
168 while (node) {
169 if (!node->death_mark) {
170 // node->weight is always >= 1, we can safely use it as divider
171 double ref = (double) node->reference / (double) node->weight;
172 double next_node_ref = 0;
173 if (node->next)
174 next_node_ref = (double) node->next->reference / (double) node->next->weight;
175
176 if (min_rc == 0 || ref < min_rc) {
177 min_rc = ref;
178 choosen_node = node;
179 if (min_rc == 0 && !(node->next && next_node_ref <= ref && node->next->last_requests <= node->last_requests))
180 break;
181 }
182 }
183 node = node->next;
184 }
185
186 if (choosen_node) {
187 choosen_node->reference++;
188 }
189
190 return choosen_node;
191 }
192
193 // weighted round robin algo
uwsgi_subscription_algo_wrr(struct uwsgi_subscribe_slot * current_slot,struct uwsgi_subscribe_node * node)194 static struct uwsgi_subscribe_node *uwsgi_subscription_algo_wrr(struct uwsgi_subscribe_slot *current_slot, struct uwsgi_subscribe_node *node) {
195 // if node is NULL we are in the second step
196 if (node) {
197 if (node->death_mark == 0 && node->wrr > 0) {
198 node->wrr--;
199 node->reference++;
200 return node;
201 }
202 return NULL;
203 }
204
205 // no wrr > 0 node found, reset them
206 node = current_slot->nodes;
207 uint64_t min_weight = 0;
208 while (node) {
209 if (!node->death_mark) {
210 if (min_weight == 0 || node->weight < min_weight)
211 min_weight = node->weight;
212 }
213 node = node->next;
214 }
215
216 // now set wrr
217 node = current_slot->nodes;
218 struct uwsgi_subscribe_node *choosen_node = NULL;
219 while (node) {
220 if (!node->death_mark) {
221 node->wrr = node->weight / min_weight;
222 choosen_node = node;
223 }
224 node = node->next;
225 }
226 if (choosen_node) {
227 choosen_node->wrr--;
228 choosen_node->reference++;
229 }
230 return choosen_node;
231 }
232
uwsgi_subscription_set_algo(char * algo)233 void uwsgi_subscription_set_algo(char *algo) {
234
235 if (!algo)
236 goto wrr;
237
238 if (!strcmp(algo, "wrr")) {
239 uwsgi.subscription_algo = uwsgi_subscription_algo_wrr;
240 return;
241 }
242
243 if (!strcmp(algo, "lrc")) {
244 uwsgi.subscription_algo = uwsgi_subscription_algo_lrc;
245 return;
246 }
247
248 if (!strcmp(algo, "wlrc")) {
249 uwsgi.subscription_algo = uwsgi_subscription_algo_wlrc;
250 return;
251 }
252
253 wrr:
254 uwsgi.subscription_algo = uwsgi_subscription_algo_wrr;
255 }
256
uwsgi_get_subscribe_node(struct uwsgi_subscribe_slot ** slot,char * key,uint16_t keylen)257 struct uwsgi_subscribe_node *uwsgi_get_subscribe_node(struct uwsgi_subscribe_slot **slot, char *key, uint16_t keylen) {
258
259 if (keylen > 0xff)
260 return NULL;
261
262 struct uwsgi_subscribe_slot *current_slot = uwsgi_get_subscribe_slot(slot, key, keylen);
263 if (!current_slot)
264 return NULL;
265
266 // slot found, move up in the list increasing hits
267 current_slot->hits++;
268 time_t now = uwsgi_now();
269 struct uwsgi_subscribe_node *node = current_slot->nodes;
270 while (node) {
271 // is the node alive ?
272 if (now - node->last_check > uwsgi.subscription_tolerance) {
273 if (node->death_mark == 0)
274 uwsgi_log("[uwsgi-subscription for pid %d] %.*s => marking %.*s as failed (no announce received in %d seconds)\n", (int) uwsgi.mypid, (int) keylen, key, (int) node->len, node->name, uwsgi.subscription_tolerance);
275 node->failcnt++;
276 node->death_mark = 1;
277 }
278 // do i need to remove the node ?
279 if (node->death_mark && node->reference == 0) {
280 // remove the node and move to next
281 struct uwsgi_subscribe_node *dead_node = node;
282 node = node->next;
283 // if the slot has been removed, return NULL;
284 if (uwsgi_remove_subscribe_node(slot, dead_node) == 1) {
285 return NULL;
286 }
287 continue;
288 }
289
290 struct uwsgi_subscribe_node *choosen_node = uwsgi.subscription_algo(current_slot, node);
291 if (choosen_node)
292 return choosen_node;
293
294 node = node->next;
295 }
296
297 return uwsgi.subscription_algo(current_slot, node);
298 }
299
uwsgi_get_subscribe_node_by_name(struct uwsgi_subscribe_slot ** slot,char * key,uint16_t keylen,char * val,uint16_t vallen)300 struct uwsgi_subscribe_node *uwsgi_get_subscribe_node_by_name(struct uwsgi_subscribe_slot **slot, char *key, uint16_t keylen, char *val, uint16_t vallen) {
301
302 if (keylen > 0xff)
303 return NULL;
304 struct uwsgi_subscribe_slot *current_slot = uwsgi_get_subscribe_slot(slot, key, keylen);
305 if (current_slot) {
306 struct uwsgi_subscribe_node *node = current_slot->nodes;
307 while (node) {
308 if (!uwsgi_strncmp(val, vallen, node->name, node->len)) {
309 return node;
310 }
311 node = node->next;
312 }
313 }
314
315 return NULL;
316 }
317
uwsgi_remove_subscribe_node(struct uwsgi_subscribe_slot ** slot,struct uwsgi_subscribe_node * node)318 int uwsgi_remove_subscribe_node(struct uwsgi_subscribe_slot **slot, struct uwsgi_subscribe_node *node) {
319
320 int ret = 0;
321
322 struct uwsgi_subscribe_node *a_node;
323 struct uwsgi_subscribe_slot *node_slot = node->slot;
324 struct uwsgi_subscribe_slot *prev_slot = node_slot->prev;
325 struct uwsgi_subscribe_slot *next_slot = node_slot->next;
326
327 int hash_key = node_slot->hash;
328
329 // over-engineering to avoid race conditions
330 node->len = 0;
331
332 if (node == node_slot->nodes) {
333 node_slot->nodes = node->next;
334 }
335 else {
336 a_node = node_slot->nodes;
337 while (a_node) {
338 if (a_node->next == node) {
339 a_node->next = node->next;
340 break;
341 }
342 a_node = a_node->next;
343 }
344 }
345
346 free(node);
347 // no more nodes, remove the slot too
348 if (node_slot->nodes == NULL) {
349
350 ret = 1;
351
352 // first check if i am the only node
353 if ((!prev_slot && !next_slot) || next_slot == node_slot) {
354 #ifdef UWSGI_SSL
355 if (node_slot->sign_ctx) {
356 EVP_PKEY_free(node_slot->sign_public_key);
357 EVP_MD_CTX_destroy(node_slot->sign_ctx);
358 }
359 #ifdef SSL_CTRL_SET_TLSEXT_HOSTNAME
360 // if there is a SNI context active, destroy it
361 if (node_slot->sni_enabled) {
362 uwsgi_ssl_del_sni_item(node_slot->key, node_slot->keylen);
363 }
364 #endif
365 #endif
366 free(node_slot);
367 slot[hash_key] = NULL;
368 goto end;
369 }
370
371 // if i am the main entry point, set the next value
372 if (node_slot == slot[hash_key]) {
373 slot[hash_key] = next_slot;
374 }
375
376 if (prev_slot) {
377 prev_slot->next = next_slot;
378 }
379 if (next_slot) {
380 next_slot->prev = prev_slot;
381 }
382
383 #ifdef UWSGI_SSL
384 if (node_slot->sign_ctx) {
385 EVP_PKEY_free(node_slot->sign_public_key);
386 EVP_MD_CTX_destroy(node_slot->sign_ctx);
387 }
388 #endif
389 free(node_slot);
390 }
391
392 end:
393
394 return ret;
395 }
396
397 #ifdef UWSGI_SSL
398 static int subscription_new_sign_ctx(struct uwsgi_subscribe_slot *, struct uwsgi_subscribe_req *);
399 static int subscription_is_safe(struct uwsgi_subscribe_req *);
400 #endif
401
uwsgi_add_subscribe_node(struct uwsgi_subscribe_slot ** slot,struct uwsgi_subscribe_req * usr)402 struct uwsgi_subscribe_node *uwsgi_add_subscribe_node(struct uwsgi_subscribe_slot **slot, struct uwsgi_subscribe_req *usr) {
403
404 struct uwsgi_subscribe_slot *current_slot = uwsgi_get_subscribe_slot(slot, usr->key, usr->keylen), *old_slot = NULL, *a_slot;
405 struct uwsgi_subscribe_node *node, *old_node = NULL;
406
407 if (usr->address_len > 0xff || usr->address_len == 0)
408 return NULL;
409
410 if (current_slot) {
411 #ifdef UWSGI_SSL
412 if (uwsgi.subscriptions_sign_check_dir && !uwsgi_subscription_sign_check(current_slot, usr)) {
413 return NULL;
414 }
415 #endif
416
417 if (uwsgi.subscriptions_credentials_check_dir && !uwsgi_subscription_credentials_check(current_slot, usr)) {
418 return NULL;
419 }
420
421 node = current_slot->nodes;
422 while (node) {
423 if (!uwsgi_strncmp(node->name, node->len, usr->address, usr->address_len)) {
424 #ifdef UWSGI_SSL
425 // this should avoid sending sniffed packets...
426 if (current_slot->sign_ctx && !subscription_is_safe(usr) && usr->unix_check <= node->unix_check) {
427 uwsgi_log("[uwsgi-subscription for pid %d] invalid (sniffed ?) packet sent for slot: %.*s node: %.*s unix_check: %lu\n", (int) uwsgi.mypid, usr->keylen, usr->key, usr->address_len, usr->address, (unsigned long) usr->unix_check);
428 return NULL;
429 }
430 // eventually the packet could be upgraded to sni...
431 uwsgi_subscription_sni_check(current_slot, usr);
432 #endif
433 // remove death mark and update cores and load
434 node->death_mark = 0;
435 node->last_check = uwsgi_now();
436 node->cores = usr->cores;
437 node->load = usr->load;
438 node->weight = usr->weight;
439 if (!node->weight)
440 node->weight = 1;
441 node->last_requests = 0;
442 return node;
443 }
444 old_node = node;
445 node = node->next;
446 }
447
448 #ifdef UWSGI_SSL
449 if (current_slot->sign_ctx && !subscription_is_safe(usr) && usr->unix_check < (uwsgi_now() - (time_t) uwsgi.subscriptions_sign_check_tolerance)) {
450 uwsgi_log("[uwsgi-subscription for pid %d] invalid (sniffed ?) packet sent for slot: %.*s node: %.*s unix_check: %lu\n", (int) uwsgi.mypid, usr->keylen, usr->key, usr->address_len, usr->address, (unsigned long) usr->unix_check);
451 return NULL;
452 }
453 // check here as we are sure the node will be added
454 uwsgi_subscription_sni_check(current_slot, usr);
455 #endif
456
457 node = uwsgi_malloc(sizeof(struct uwsgi_subscribe_node));
458 node->len = usr->address_len;
459 node->modifier1 = usr->modifier1;
460 node->modifier2 = usr->modifier2;
461 node->requests = 0;
462 node->last_requests = 0;
463 node->tx = 0;
464 node->rx = 0;
465 node->reference = 0;
466 node->death_mark = 0;
467 node->failcnt = 0;
468 node->cores = usr->cores;
469 node->load = usr->load;
470 node->weight = usr->weight;
471 node->unix_check = usr->unix_check;
472 if (!node->weight)
473 node->weight = 1;
474 node->wrr = 0;
475 node->pid = usr->pid;
476 node->uid = usr->uid;
477 node->gid = usr->gid;
478 node->notify[0] = 0;
479 if (usr->notify_len > 0 && usr->notify_len < 102) {
480 memcpy(node->notify, usr->notify, usr->notify_len);
481 node->notify[usr->notify_len] = 0;
482 }
483 node->last_check = uwsgi_now();
484 node->slot = current_slot;
485 memcpy(node->name, usr->address, usr->address_len);
486 if (old_node) {
487 old_node->next = node;
488 }
489 node->next = NULL;
490
491 uwsgi_log("[uwsgi-subscription for pid %d] %.*s => new node: %.*s\n", (int) uwsgi.mypid, usr->keylen, usr->key, usr->address_len, usr->address);
492 if (node->notify[0]) {
493 char buf[1024];
494 int ret = snprintf(buf, 1024, "[subscription ack] %.*s => new node: %.*s", usr->keylen, usr->key, usr->address_len, usr->address);
495 if (ret > 0 && ret < 1024)
496 uwsgi_notify_msg(node->notify, buf, ret);
497 }
498 return node;
499 }
500 else {
501 current_slot = uwsgi_malloc(sizeof(struct uwsgi_subscribe_slot));
502 #ifdef UWSGI_SSL
503 current_slot->sign_ctx = NULL;
504 if (uwsgi.subscriptions_sign_check_dir && !subscription_new_sign_ctx(current_slot, usr)) {
505 free(current_slot);
506 return NULL;
507 }
508 #endif
509 uint32_t hash = djb33x_hash(usr->key, usr->keylen);
510 int hash_key = hash % 0xffff;
511 current_slot->hash = hash_key;
512 current_slot->keylen = usr->keylen;
513 memcpy(current_slot->key, usr->key, usr->keylen);
514 if (uwsgi.subscriptions_credentials_check_dir) {
515 if (!uwsgi_subscription_credentials_check(current_slot, usr)) {
516 free(current_slot);
517 return NULL;
518 }
519 }
520
521 current_slot->key[usr->keylen] = 0;
522 current_slot->hits = 0;
523 #ifdef UWSGI_SSL
524 current_slot->sni_enabled = 0;
525 uwsgi_subscription_sni_check(current_slot, usr);
526 #endif
527 current_slot->nodes = uwsgi_malloc(sizeof(struct uwsgi_subscribe_node));
528 current_slot->nodes->slot = current_slot;
529 current_slot->nodes->len = usr->address_len;
530 current_slot->nodes->reference = 0;
531 current_slot->nodes->requests = 0;
532 current_slot->nodes->last_requests = 0;
533 current_slot->nodes->tx = 0;
534 current_slot->nodes->rx = 0;
535 current_slot->nodes->death_mark = 0;
536 current_slot->nodes->failcnt = 0;
537 current_slot->nodes->modifier1 = usr->modifier1;
538 current_slot->nodes->modifier2 = usr->modifier2;
539 current_slot->nodes->cores = usr->cores;
540 current_slot->nodes->load = usr->load;
541 current_slot->nodes->weight = usr->weight;
542 current_slot->nodes->unix_check = usr->unix_check;
543 if (!current_slot->nodes->weight)
544 current_slot->nodes->weight = 1;
545 current_slot->nodes->wrr = 0;
546 current_slot->nodes->pid = usr->pid;
547 current_slot->nodes->uid = usr->uid;
548 current_slot->nodes->gid = usr->gid;
549 current_slot->nodes->notify[0] = 0;
550 if (usr->notify_len > 0 && usr->notify_len < 102) {
551 memcpy(current_slot->nodes->notify, usr->notify, usr->notify_len);
552 current_slot->nodes->notify[usr->notify_len] = 0;
553 }
554 memcpy(current_slot->nodes->name, usr->address, usr->address_len);
555 current_slot->nodes->last_check = uwsgi_now();
556
557 current_slot->nodes->next = NULL;
558
559 a_slot = slot[hash_key];
560 while (a_slot) {
561 old_slot = a_slot;
562 a_slot = a_slot->next;
563 }
564
565
566 if (old_slot) {
567 old_slot->next = current_slot;
568 }
569
570 current_slot->prev = old_slot;
571 current_slot->next = NULL;
572
573
574 if (!slot[hash_key] || current_slot->prev == NULL) {
575 slot[hash_key] = current_slot;
576 }
577
578 uwsgi_log("[uwsgi-subscription for pid %d] new pool: %.*s (hash key: %d)\n", (int) uwsgi.mypid, usr->keylen, usr->key, current_slot->hash);
579 uwsgi_log("[uwsgi-subscription for pid %d] %.*s => new node: %.*s\n", (int) uwsgi.mypid, usr->keylen, usr->key, usr->address_len, usr->address);
580
581 if (current_slot->nodes->notify[0]) {
582 char buf[1024];
583 int ret = snprintf(buf, 1024, "[subscription ack] %.*s => new node: %.*s", usr->keylen, usr->key, usr->address_len, usr->address);
584 if (ret > 0 && ret < 1024)
585 uwsgi_notify_msg(current_slot->nodes->notify, buf, ret);
586 }
587 return current_slot->nodes;
588 }
589
590 }
591
send_subscription(int sfd,char * host,char * message,uint16_t message_size)592 static void send_subscription(int sfd, char *host, char *message, uint16_t message_size) {
593
594 int fd = sfd;
595 struct sockaddr_in udp_addr;
596 struct sockaddr_un un_addr;
597 ssize_t ret;
598
599 char *udp_port = strchr(host, ':');
600
601 if (fd == -1) {
602 if (udp_port) {
603 fd = socket(AF_INET, SOCK_DGRAM, 0);
604 }
605 else {
606 fd = socket(AF_UNIX, SOCK_DGRAM, 0);
607 }
608 if (fd < 0) {
609 uwsgi_error("send_subscription()/socket()");
610 return;
611 }
612 uwsgi_socket_nb(fd);
613 }
614 else if (fd == -2) {
615 static int unix_fd = -1;
616 static int inet_fd = -1;
617 if (udp_port) {
618 if (inet_fd == -1) {
619 inet_fd = socket(AF_INET, SOCK_DGRAM, 0);
620 if (inet_fd < 0) {
621 uwsgi_error("send_subscription()/socket()");
622 return;
623 }
624 uwsgi_socket_nb(inet_fd);
625 }
626 fd = inet_fd;
627 }
628 else {
629 if (unix_fd == -1) {
630 unix_fd = socket(AF_UNIX, SOCK_DGRAM, 0);
631 if (unix_fd < 0) {
632 uwsgi_error("send_subscription()/socket()");
633 return;
634 }
635 uwsgi_socket_nb(unix_fd);
636 }
637 fd = unix_fd;
638 }
639 }
640
641 if (udp_port) {
642 udp_port[0] = 0;
643 memset(&udp_addr, 0, sizeof(struct sockaddr_in));
644 udp_addr.sin_family = AF_INET;
645 udp_addr.sin_port = htons(atoi(udp_port + 1));
646 udp_addr.sin_addr.s_addr = inet_addr(host);
647 ret = sendto(fd, message, message_size, 0, (struct sockaddr *) &udp_addr, sizeof(udp_addr));
648 udp_port[0] = ':';
649 }
650 else {
651 memset(&un_addr, 0, sizeof(struct sockaddr_un));
652 un_addr.sun_family = AF_UNIX;
653 // use 102 as the magic number
654 strncat(un_addr.sun_path, host, 102);
655 if (uwsgi.subscriptions_use_credentials) {
656 // could be useless as internally the socket could add them automagically
657 ret = uwsgi_pass_cred2(fd, message, message_size, (struct sockaddr *) &un_addr, sizeof(un_addr));
658 }
659 else {
660 ret = sendto(fd, message, message_size, 0, (struct sockaddr *) &un_addr, sizeof(un_addr));
661 }
662 }
663
664 if (ret < 0) {
665 uwsgi_error("send_subscription()/sendto()");
666 }
667
668 if (sfd == -1)
669 close(fd);
670 }
671
uwsgi_subscription_ub(char * key,size_t keysize,uint8_t modifier1,uint8_t modifier2,uint8_t cmd,char * socket_name,char * sign,char * sni_key,char * sni_crt,char * sni_ca)672 static struct uwsgi_buffer *uwsgi_subscription_ub(char *key, size_t keysize, uint8_t modifier1, uint8_t modifier2, uint8_t cmd, char *socket_name, char *sign, char *sni_key, char *sni_crt, char *sni_ca) {
673 struct uwsgi_buffer *ub = uwsgi_buffer_new(4096);
674
675 // make space for uwsgi header
676 ub->pos = 4;
677
678 if (uwsgi_buffer_append_keyval(ub, "key", 3, key, keysize))
679 goto end;
680 if (uwsgi_buffer_append_keyval(ub, "address", 7, socket_name, strlen(socket_name)))
681 goto end;
682
683 if (uwsgi.subscribe_with_modifier1) {
684 modifier1 = atoi(uwsgi.subscribe_with_modifier1);
685 }
686 if (uwsgi_buffer_append_keynum(ub, "modifier1", 9, modifier1))
687 goto end;
688 if (uwsgi_buffer_append_keynum(ub, "modifier2", 9, modifier2))
689 goto end;
690 if (uwsgi_buffer_append_keynum(ub, "cores", 5, uwsgi.numproc * uwsgi.cores))
691 goto end;
692 if (uwsgi_buffer_append_keynum(ub, "load", 4, uwsgi.shared->load))
693 goto end;
694 if (uwsgi.auto_weight) {
695 if (uwsgi_buffer_append_keynum(ub, "weight", 6, uwsgi.numproc * uwsgi.cores))
696 goto end;
697 }
698 else {
699 if (uwsgi_buffer_append_keynum(ub, "weight", 6, uwsgi.weight))
700 goto end;
701 }
702
703 if (sni_key) {
704 if (uwsgi_buffer_append_keyval(ub, "sni_key", 7, sni_key, strlen(sni_key)))
705 goto end;
706 }
707
708 if (sni_crt) {
709 if (uwsgi_buffer_append_keyval(ub, "sni_crt", 7, sni_crt, strlen(sni_crt)))
710 goto end;
711 }
712
713 if (sni_ca) {
714 if (uwsgi_buffer_append_keyval(ub, "sni_ca", 6, sni_ca, strlen(sni_ca)))
715 goto end;
716 }
717
718 if (uwsgi.subscription_notify_socket) {
719 if (uwsgi_buffer_append_keyval(ub, "notify", 6, uwsgi.subscription_notify_socket, strlen(uwsgi.subscription_notify_socket)))
720 goto end;
721 }
722 else if (uwsgi.notify_socket_fd > -1 && uwsgi.notify_socket) {
723 if (uwsgi_buffer_append_keyval(ub, "notify", 6, uwsgi.notify_socket, strlen(uwsgi.notify_socket)))
724 goto end;
725 }
726
727 #ifdef UWSGI_SSL
728 if (sign) {
729 if (uwsgi_buffer_append_keynum(ub, "unix", 4, (uwsgi_now() + (time_t) cmd)))
730 goto end;
731
732 unsigned int signature_len = 0;
733 char *signature = uwsgi_rsa_sign(sign, ub->buf + 4, ub->pos - 4, &signature_len);
734 if (signature && signature_len > 0) {
735 if (uwsgi_buffer_append_keyval(ub, "sign", 4, signature, signature_len)) {
736 free(signature);
737 goto end;
738 }
739 free(signature);
740 }
741 }
742 #endif
743
744 // add uwsgi header
745 if (uwsgi_buffer_set_uh(ub, 224, cmd))
746 goto end;
747
748 return ub;
749
750 end:
751 uwsgi_buffer_destroy(ub);
752 return NULL;
753 }
754
uwsgi_send_subscription_from_fd(int fd,char * udp_address,char * key,size_t keysize,uint8_t modifier1,uint8_t modifier2,uint8_t cmd,char * socket_name,char * sign,char * sni_key,char * sni_crt,char * sni_ca)755 void uwsgi_send_subscription_from_fd(int fd, char *udp_address, char *key, size_t keysize, uint8_t modifier1, uint8_t modifier2, uint8_t cmd, char *socket_name, char *sign, char *sni_key, char *sni_crt, char *sni_ca) {
756
757 if (socket_name == NULL && !uwsgi.sockets)
758 return;
759
760 if (!socket_name) {
761 socket_name = uwsgi.sockets->name;
762 }
763
764 struct uwsgi_buffer *ub = uwsgi_subscription_ub(key, keysize, modifier1, modifier2, cmd, socket_name, sign, sni_key, sni_crt, sni_ca);
765
766 if (!ub)
767 return;
768
769 send_subscription(fd, udp_address, ub->buf, ub->pos);
770 uwsgi_buffer_destroy(ub);
771 }
772
773
uwsgi_send_subscription(char * udp_address,char * key,size_t keysize,uint8_t modifier1,uint8_t modifier2,uint8_t cmd,char * socket_name,char * sign,char * sni_key,char * sni_crt,char * sni_ca)774 void uwsgi_send_subscription(char *udp_address, char *key, size_t keysize, uint8_t modifier1, uint8_t modifier2, uint8_t cmd, char *socket_name, char *sign, char *sni_key, char *sni_crt, char *sni_ca) {
775 uwsgi_send_subscription_from_fd(-1, udp_address, key, keysize, modifier1, modifier2, cmd, socket_name, sign, sni_key, sni_crt, sni_ca);
776 }
777
778 #ifdef UWSGI_SSL
subscription_is_safe(struct uwsgi_subscribe_req * usr)779 static int subscription_is_safe(struct uwsgi_subscribe_req *usr) {
780 struct uwsgi_string_list *usl = NULL;
781 uwsgi_foreach(usl, uwsgi.subscriptions_sign_skip_uid) {
782 if (usl->custom == 0) {
783 usl->custom = atoi(usl->value);
784 }
785 if (usr->uid > 0 && usr->uid == (uid_t) usl->custom) {
786 return 1;
787 }
788 }
789 return 0;
790 }
subscription_new_sign_ctx(struct uwsgi_subscribe_slot * slot,struct uwsgi_subscribe_req * usr)791 static int subscription_new_sign_ctx(struct uwsgi_subscribe_slot *slot, struct uwsgi_subscribe_req *usr) {
792 if (subscription_is_safe(usr)) return 1;
793
794 if (usr->sign_len == 0 || usr->base_len == 0)
795 return 0;
796
797 if (usr->unix_check < (uwsgi_now() - (time_t) uwsgi.subscriptions_sign_check_tolerance)) {
798 uwsgi_log("[uwsgi-subscription for pid %d] invalid (sniffed ?) packet sent for slot: %.*s node: %.*s unix_check: %lu\n", (int) uwsgi.mypid, usr->keylen, usr->key, usr->address_len, usr->address, (unsigned long) usr->unix_check);
799 return 0;
800 }
801
802 char *keyfile = uwsgi_sanitize_cert_filename(uwsgi.subscriptions_sign_check_dir, usr->key, usr->keylen);
803 FILE *kf = fopen(keyfile, "r");
804 free(keyfile);
805 if (!kf) return 0;
806 slot->sign_public_key = PEM_read_PUBKEY(kf, NULL, NULL, NULL);
807 fclose(kf);
808 if (!slot->sign_public_key) {
809 uwsgi_log("unable to load public key for %.*s\n", usr->keylen, usr->key);
810 return 0;
811 }
812 slot->sign_ctx = EVP_MD_CTX_create();
813 if (!slot->sign_ctx) {
814 uwsgi_log("unable to initialize EVP context for %.*s\n", usr->keylen, usr->key);
815 EVP_PKEY_free(slot->sign_public_key);
816 return 0;
817 }
818
819 if (!uwsgi_subscription_sign_check(slot, usr)) {
820 EVP_PKEY_free(slot->sign_public_key);
821 EVP_MD_CTX_destroy(slot->sign_ctx);
822 return 0;
823 }
824
825 return 1;
826 }
uwsgi_subscription_sign_check(struct uwsgi_subscribe_slot * slot,struct uwsgi_subscribe_req * usr)827 int uwsgi_subscription_sign_check(struct uwsgi_subscribe_slot *slot, struct uwsgi_subscribe_req *usr) {
828 if (subscription_is_safe(usr)) return 1;
829
830 if (usr->sign_len == 0 || usr->base_len == 0)
831 return 0;
832
833 if (!slot->sign_ctx) {
834 if (!subscription_new_sign_ctx(slot, usr)) return 0;
835 }
836
837 if (EVP_VerifyInit_ex(slot->sign_ctx, uwsgi.subscriptions_sign_check_md, NULL) == 0) {
838 ERR_print_errors_fp(stderr);
839 return 0;
840 }
841
842 if (EVP_VerifyUpdate(slot->sign_ctx, usr->base, usr->base_len) == 0) {
843 ERR_print_errors_fp(stderr);
844 return 0;
845 }
846
847 if (EVP_VerifyFinal(slot->sign_ctx, (unsigned char *) usr->sign, usr->sign_len, slot->sign_public_key) != 1) {
848 #ifdef UWSGI_DEBUG
849 ERR_print_errors_fp(stderr);
850 #endif
851 return 0;
852 }
853
854
855 return 1;
856 }
857 #endif
858
uwsgi_no_subscriptions(struct uwsgi_subscribe_slot ** slot)859 int uwsgi_no_subscriptions(struct uwsgi_subscribe_slot **slot) {
860 int i;
861 for (i = 0; i < UMAX16; i++) {
862 if (slot[i])
863 return 0;
864 }
865 return 1;
866 }
867
uwsgi_subscription_init_ht()868 struct uwsgi_subscribe_slot **uwsgi_subscription_init_ht() {
869 if (!uwsgi.subscription_algo) {
870 uwsgi_subscription_set_algo(NULL);
871 }
872 return uwsgi_calloc(sizeof(struct uwsgi_subscription_slot *) * UMAX16);
873 }
874
uwsgi_subscribe(char * subscription,uint8_t cmd)875 void uwsgi_subscribe(char *subscription, uint8_t cmd) {
876
877 size_t subfile_size;
878 size_t i;
879 char *key = NULL;
880 int keysize = 0;
881 char *modifier1 = NULL;
882 int modifier1_len = 0;
883 char *socket_name = NULL;
884 char *udp_address = subscription;
885 char *udp_port = NULL;
886 char *subscription_key = NULL;
887 char *sign = NULL;
888
889 // check for explicit socket_name
890 char *equal = strchr(subscription, '=');
891 if (equal) {
892 socket_name = subscription;
893 if (socket_name[0] == '=') {
894 equal = strchr(socket_name + 1, '=');
895 if (!equal)
896 return;
897 *equal = '\0';
898 struct uwsgi_socket *us = uwsgi_get_shared_socket_by_num(atoi(socket_name + 1));
899 if (!us)
900 return;
901 socket_name = us->name;
902 }
903 *equal = '\0';
904 udp_address = equal + 1;
905 }
906
907 // check for unix socket
908 if (udp_address[0] != '/') {
909 udp_port = strchr(udp_address, ':');
910 if (!udp_port) {
911 if (equal)
912 *equal = '=';
913 return;
914 }
915 subscription_key = strchr(udp_port + 1, ':');
916 }
917 else {
918 subscription_key = strchr(udp_address + 1, ':');
919 }
920
921 if (!subscription_key) {
922 if (equal)
923 *equal = '=';
924 return;
925 }
926
927 udp_address = uwsgi_concat2n(udp_address, subscription_key - udp_address, "", 0);
928
929 if (subscription_key[1] == '@') {
930 if (!uwsgi_file_exists(subscription_key + 2))
931 goto clear;
932 char *lines = uwsgi_open_and_read(subscription_key + 2, &subfile_size, 1, NULL);
933 if (subfile_size > 0) {
934 key = lines;
935 for (i = 0; i < subfile_size; i++) {
936 if (lines[i] == 0) {
937 if (keysize > 0) {
938 if (key[0] != '#' && key[0] != '\n') {
939 modifier1 = strchr(key, ',');
940 if (modifier1) {
941 modifier1[0] = 0;
942 modifier1++;
943 modifier1_len = strlen(modifier1);
944 keysize = strlen(key);
945 }
946 uwsgi_send_subscription(udp_address, key, keysize, uwsgi_str_num(modifier1, modifier1_len), 0, cmd, socket_name, sign, NULL, NULL, NULL);
947 modifier1 = NULL;
948 modifier1_len = 0;
949 }
950 }
951 break;
952 }
953 else if (lines[i] == '\n') {
954 if (keysize > 0) {
955 if (key[0] != '#' && key[0] != '\n') {
956 lines[i] = 0;
957 modifier1 = strchr(key, ',');
958 if (modifier1) {
959 modifier1[0] = 0;
960 modifier1++;
961 modifier1_len = strlen(modifier1);
962 keysize = strlen(key);
963 }
964 uwsgi_send_subscription(udp_address, key, keysize, uwsgi_str_num(modifier1, modifier1_len), 0, cmd, socket_name, sign, NULL, NULL, NULL);
965 modifier1 = NULL;
966 modifier1_len = 0;
967 lines[i] = '\n';
968 }
969 }
970 key = lines + i + 1;
971 keysize = 0;
972 continue;
973 }
974 keysize++;
975 }
976 }
977 free(lines);
978 }
979 else {
980 modifier1 = strchr(subscription_key + 1, ',');
981 if (modifier1) {
982 modifier1[0] = 0;
983 modifier1++;
984
985 sign = strchr(modifier1 + 1, ',');
986 if (sign) {
987 *sign = 0;
988 sign++;
989 }
990 modifier1_len = strlen(modifier1);
991 }
992
993 uwsgi_send_subscription(udp_address, subscription_key + 1, strlen(subscription_key + 1), uwsgi_str_num(modifier1, modifier1_len), 0, cmd, socket_name, sign, NULL, NULL, NULL);
994 if (modifier1)
995 modifier1[-1] = ',';
996 if (sign)
997 sign[-1] = ',';
998 }
999
1000 clear:
1001 if (equal)
1002 *equal = '=';
1003 free(udp_address);
1004
1005 }
1006
uwsgi_subscribe2(char * arg,uint8_t cmd)1007 void uwsgi_subscribe2(char *arg, uint8_t cmd) {
1008
1009 char *s2_server = NULL;
1010 char *s2_key = NULL;
1011 char *s2_socket = NULL;
1012 char *s2_addr = NULL;
1013 char *s2_weight = NULL;
1014 char *s2_sign = NULL;
1015 char *s2_modifier1 = NULL;
1016 char *s2_modifier2 = NULL;
1017 char *s2_check = NULL;
1018 char *s2_sni_key = NULL;
1019 char *s2_sni_crt = NULL;
1020 char *s2_sni_ca = NULL;
1021
1022 if (uwsgi_kvlist_parse(arg, strlen(arg), ',', '=', "server", &s2_server, "key", &s2_key, "socket", &s2_socket, "addr", &s2_addr, "weight", &s2_weight, "modifier1", &s2_modifier1, "modifier2", &s2_modifier2, "sign", &s2_sign, "check", &s2_check, "sni_key", &s2_sni_key, "sni_crt", &s2_sni_crt, "sni_ca", &s2_sni_ca, NULL)) {
1023 return;
1024 }
1025
1026 if (!s2_server || !s2_key)
1027 goto end;
1028
1029 if (s2_check) {
1030 if (uwsgi_file_exists(s2_check))
1031 goto end;
1032 }
1033
1034 if (s2_weight) {
1035 uwsgi.weight = atoi(s2_weight);
1036 }
1037
1038 if (s2_socket) {
1039 struct uwsgi_socket *us = uwsgi_get_socket_by_num(atoi(s2_socket));
1040 if (us) {
1041 if (s2_addr) {
1042 free(s2_addr);
1043 }
1044 s2_addr = uwsgi_str(us->name);
1045 }
1046 }
1047
1048 uint8_t modifier1 = 0;
1049 uint8_t modifier2 = 0;
1050
1051 if (s2_modifier1) {
1052 modifier1 = atoi(s2_modifier1);
1053 }
1054
1055 if (s2_modifier2) {
1056 modifier2 = atoi(s2_modifier2);
1057 }
1058
1059 uwsgi_send_subscription(s2_server, s2_key, strlen(s2_key), modifier1, modifier2, cmd, s2_addr, s2_sign, s2_sni_key, s2_sni_crt, s2_sni_ca);
1060 end:
1061 if (s2_server)
1062 free(s2_server);
1063 if (s2_key)
1064 free(s2_key);
1065 if (s2_socket)
1066 free(s2_socket);
1067 if (s2_addr)
1068 free(s2_addr);
1069 if (s2_weight)
1070 free(s2_weight);
1071 if (s2_modifier1)
1072 free(s2_modifier1);
1073 if (s2_modifier2)
1074 free(s2_modifier2);
1075 if (s2_sign)
1076 free(s2_sign);
1077 if (s2_check)
1078 free(s2_check);
1079 if (s2_sni_crt)
1080 free(s2_sni_crt);
1081 if (s2_sni_key)
1082 free(s2_sni_key);
1083 if (s2_sni_ca)
1084 free(s2_sni_ca);
1085 }
1086
uwsgi_subscribe_all(uint8_t cmd,int verbose)1087 void uwsgi_subscribe_all(uint8_t cmd, int verbose) {
1088
1089 if (uwsgi.subscriptions_blocked)
1090 return;
1091 // -- subscribe
1092 struct uwsgi_string_list *subscriptions = uwsgi.subscriptions;
1093 while (subscriptions) {
1094 if (verbose) {
1095 uwsgi_log("%s %s\n", cmd ? "unsubscribing from" : "subscribing to", subscriptions->value);
1096 }
1097 uwsgi_subscribe(subscriptions->value, cmd);
1098 subscriptions = subscriptions->next;
1099 }
1100
1101 // --subscribe2
1102 subscriptions = uwsgi.subscriptions2;
1103 while (subscriptions) {
1104 if (verbose) {
1105 uwsgi_log("%s %s\n", cmd ? "unsubscribing from" : "subscribing to", subscriptions->value);
1106 }
1107 uwsgi_subscribe2(subscriptions->value, cmd);
1108 subscriptions = subscriptions->next;
1109 }
1110
1111 }
1112