1 #include "uwsgi.h"
2 
3 /*
4 
5 	subscription subsystem
6 
7 	each subscription slot is as an hashed item in a dictionary
8 
9 	each slot has a circular linked list containing the nodes names
10 
11 	the structure and system is very similar to uwsgi_dyn_dict already used by the mime type parser
12 
13 	This system is not mean to run on shared memory. If you have multiple processes for the same app, you have to create
14 	a new subscriptions slot list.
15 
16 	To avoid removal of already using nodes, a reference count system has been implemented
17 
18 */
19 
20 
21 extern struct uwsgi_server uwsgi;
22 
23 #ifdef UWSGI_SSL
uwsgi_subscription_sni_check(struct uwsgi_subscribe_slot * current_slot,struct uwsgi_subscribe_req * usr)24 static void uwsgi_subscription_sni_check(struct uwsgi_subscribe_slot *current_slot, struct uwsgi_subscribe_req *usr) {
25 	if (usr->sni_key_len > 0 && usr->sni_crt_len > 0) {
26 		if (!current_slot->sni_enabled) {
27 			char *sni_key = uwsgi_concat2n(usr->sni_key, usr->sni_key_len, "", 0);
28 			char *sni_crt = uwsgi_concat2n(usr->sni_crt, usr->sni_crt_len, "", 0);
29 			char *sni_ca = NULL;
30 			if (usr->sni_ca_len > 0) {
31 				sni_ca = uwsgi_concat2n(usr->sni_ca, usr->sni_ca_len, "", 0);
32 			}
33 			char *servername = NULL;
34 			char *colon = memchr(current_slot->key, ':', current_slot->keylen);
35 			if (colon) {
36 				servername = uwsgi_concat2n(current_slot->key, colon - current_slot->key, "", 0);
37 			}
38 			else {
39 				servername = uwsgi_concat2n(current_slot->key, current_slot->keylen, "", 0);
40 			}
41 			if (uwsgi_ssl_add_sni_item(servername, sni_crt, sni_key, uwsgi.sni_dir_ciphers, sni_ca)) {
42 				current_slot->sni_enabled = 1;
43 			}
44 			if (sni_key)
45 				free(sni_key);
46 			if (sni_crt)
47 				free(sni_crt);
48 			if (sni_ca)
49 				free(sni_ca);
50 		}
51 	}
52 }
53 #endif
54 
uwsgi_subscription_credentials_check(struct uwsgi_subscribe_slot * slot,struct uwsgi_subscribe_req * usr)55 int uwsgi_subscription_credentials_check(struct uwsgi_subscribe_slot *slot, struct uwsgi_subscribe_req *usr) {
56 	struct uwsgi_string_list *usl = NULL;
57 	uwsgi_foreach(usl, uwsgi.subscriptions_credentials_check_dir) {
58 		char *filename = uwsgi_concat2n(usl->value, usl->len, slot->key, slot->keylen);
59 		struct stat st;
60 		int ret = stat(filename, &st);
61 		free(filename);
62 		if (ret != 0)
63 			continue;
64 		if (st.st_uid != usr->uid)
65 			continue;
66 		if (st.st_gid != usr->gid)
67 			continue;
68 		// accepted...
69 		return 1;
70 	}
71 	return 0;
72 }
73 
uwsgi_get_subscribe_slot(struct uwsgi_subscribe_slot ** slot,char * key,uint16_t keylen)74 struct uwsgi_subscribe_slot *uwsgi_get_subscribe_slot(struct uwsgi_subscribe_slot **slot, char *key, uint16_t keylen) {
75 
76 	if (keylen > 0xff)
77 		return NULL;
78 
79 	uint32_t hash = djb33x_hash(key, keylen);
80 	int hash_key = hash % 0xffff;
81 
82 	struct uwsgi_subscribe_slot *current_slot = slot[hash_key];
83 
84 
85 #ifdef UWSGI_DEBUG
86 	uwsgi_log("****************************\n");
87 	while (current_slot) {
88 		uwsgi_log("slot %.*s %d\n", current_slot->keylen, current_slot->key, current_slot->hits);
89 		current_slot = current_slot->next;
90 	}
91 	uwsgi_log("****************************\n");
92 	current_slot = slot[hash_key];
93 #endif
94 
95 	while (current_slot) {
96 		if (!uwsgi_strncmp(key, keylen, current_slot->key, current_slot->keylen)) {
97 			// auto optimization
98 			if (current_slot->prev) {
99 				if (current_slot->hits > current_slot->prev->hits) {
100 					struct uwsgi_subscribe_slot *slot_parent = current_slot->prev->prev, *slot_prev = current_slot->prev;
101 					if (slot_parent) {
102 						slot_parent->next = current_slot;
103 					}
104 					else {
105 						slot[hash_key] = current_slot;
106 					}
107 
108 					if (current_slot->next) {
109 						current_slot->next->prev = slot_prev;
110 					}
111 
112 					slot_prev->prev = current_slot;
113 					slot_prev->next = current_slot->next;
114 
115 					current_slot->next = slot_prev;
116 					current_slot->prev = slot_parent;
117 
118 				}
119 			}
120 			return current_slot;
121 		}
122 		current_slot = current_slot->next;
123 		// check for loopy optimization
124 		if (current_slot == slot[hash_key])
125 			break;
126 	}
127 
128 	return NULL;
129 }
130 
131 // least reference count
uwsgi_subscription_algo_lrc(struct uwsgi_subscribe_slot * current_slot,struct uwsgi_subscribe_node * node)132 static struct uwsgi_subscribe_node *uwsgi_subscription_algo_lrc(struct uwsgi_subscribe_slot *current_slot, struct uwsgi_subscribe_node *node) {
133 	// if node is NULL we are in the second step (in lrc mode we do not use the first step)
134 	if (node)
135 		return NULL;
136 
137 	struct uwsgi_subscribe_node *choosen_node = NULL;
138 	node = current_slot->nodes;
139 	uint64_t min_rc = 0;
140 	while (node) {
141 		if (!node->death_mark) {
142 			if (min_rc == 0 || node->reference < min_rc) {
143 				min_rc = node->reference;
144 				choosen_node = node;
145 				if (min_rc == 0 && !(node->next && node->next->reference <= node->reference && node->next->last_requests <= node->last_requests))
146 					break;
147 			}
148 		}
149 		node = node->next;
150 	}
151 
152 	if (choosen_node) {
153 		choosen_node->reference++;
154 	}
155 
156 	return choosen_node;
157 }
158 
159 // weighted least reference count
uwsgi_subscription_algo_wlrc(struct uwsgi_subscribe_slot * current_slot,struct uwsgi_subscribe_node * node)160 static struct uwsgi_subscribe_node *uwsgi_subscription_algo_wlrc(struct uwsgi_subscribe_slot *current_slot, struct uwsgi_subscribe_node *node) {
161 	// if node is NULL we are in the second step (in wlrc mode we do not use the first step)
162 	if (node)
163 		return NULL;
164 
165 	struct uwsgi_subscribe_node *choosen_node = NULL;
166 	node = current_slot->nodes;
167 	double min_rc = 0;
168 	while (node) {
169 		if (!node->death_mark) {
170 			// node->weight is always >= 1, we can safely use it as divider
171 			double ref = (double) node->reference / (double) node->weight;
172 			double next_node_ref = 0;
173 			if (node->next)
174 				next_node_ref = (double) node->next->reference / (double) node->next->weight;
175 
176 			if (min_rc == 0 || ref < min_rc) {
177 				min_rc = ref;
178 				choosen_node = node;
179 				if (min_rc == 0 && !(node->next && next_node_ref <= ref && node->next->last_requests <= node->last_requests))
180 					break;
181 			}
182 		}
183 		node = node->next;
184 	}
185 
186 	if (choosen_node) {
187 		choosen_node->reference++;
188 	}
189 
190 	return choosen_node;
191 }
192 
193 // weighted round robin algo
uwsgi_subscription_algo_wrr(struct uwsgi_subscribe_slot * current_slot,struct uwsgi_subscribe_node * node)194 static struct uwsgi_subscribe_node *uwsgi_subscription_algo_wrr(struct uwsgi_subscribe_slot *current_slot, struct uwsgi_subscribe_node *node) {
195 	// if node is NULL we are in the second step
196 	if (node) {
197 		if (node->death_mark == 0 && node->wrr > 0) {
198 			node->wrr--;
199 			node->reference++;
200 			return node;
201 		}
202 		return NULL;
203 	}
204 
205 	// no wrr > 0 node found, reset them
206 	node = current_slot->nodes;
207 	uint64_t min_weight = 0;
208 	while (node) {
209 		if (!node->death_mark) {
210 			if (min_weight == 0 || node->weight < min_weight)
211 				min_weight = node->weight;
212 		}
213 		node = node->next;
214 	}
215 
216 	// now set wrr
217 	node = current_slot->nodes;
218 	struct uwsgi_subscribe_node *choosen_node = NULL;
219 	while (node) {
220 		if (!node->death_mark) {
221 			node->wrr = node->weight / min_weight;
222 			choosen_node = node;
223 		}
224 		node = node->next;
225 	}
226 	if (choosen_node) {
227 		choosen_node->wrr--;
228 		choosen_node->reference++;
229 	}
230 	return choosen_node;
231 }
232 
uwsgi_subscription_set_algo(char * algo)233 void uwsgi_subscription_set_algo(char *algo) {
234 
235 	if (!algo)
236 		goto wrr;
237 
238 	if (!strcmp(algo, "wrr")) {
239 		uwsgi.subscription_algo = uwsgi_subscription_algo_wrr;
240 		return;
241 	}
242 
243 	if (!strcmp(algo, "lrc")) {
244 		uwsgi.subscription_algo = uwsgi_subscription_algo_lrc;
245 		return;
246 	}
247 
248 	if (!strcmp(algo, "wlrc")) {
249 		uwsgi.subscription_algo = uwsgi_subscription_algo_wlrc;
250 		return;
251 	}
252 
253 wrr:
254 	uwsgi.subscription_algo = uwsgi_subscription_algo_wrr;
255 }
256 
uwsgi_get_subscribe_node(struct uwsgi_subscribe_slot ** slot,char * key,uint16_t keylen)257 struct uwsgi_subscribe_node *uwsgi_get_subscribe_node(struct uwsgi_subscribe_slot **slot, char *key, uint16_t keylen) {
258 
259 	if (keylen > 0xff)
260 		return NULL;
261 
262 	struct uwsgi_subscribe_slot *current_slot = uwsgi_get_subscribe_slot(slot, key, keylen);
263 	if (!current_slot)
264 		return NULL;
265 
266 	// slot found, move up in the list increasing hits
267 	current_slot->hits++;
268 	time_t now = uwsgi_now();
269 	struct uwsgi_subscribe_node *node = current_slot->nodes;
270 	while (node) {
271 		// is the node alive ?
272 		if (now - node->last_check > uwsgi.subscription_tolerance) {
273 			if (node->death_mark == 0)
274 				uwsgi_log("[uwsgi-subscription for pid %d] %.*s => marking %.*s as failed (no announce received in %d seconds)\n", (int) uwsgi.mypid, (int) keylen, key, (int) node->len, node->name, uwsgi.subscription_tolerance);
275 			node->failcnt++;
276 			node->death_mark = 1;
277 		}
278 		// do i need to remove the node ?
279 		if (node->death_mark && node->reference == 0) {
280 			// remove the node and move to next
281 			struct uwsgi_subscribe_node *dead_node = node;
282 			node = node->next;
283 			// if the slot has been removed, return NULL;
284 			if (uwsgi_remove_subscribe_node(slot, dead_node) == 1) {
285 				return NULL;
286 			}
287 			continue;
288 		}
289 
290 		struct uwsgi_subscribe_node *choosen_node = uwsgi.subscription_algo(current_slot, node);
291 		if (choosen_node)
292 			return choosen_node;
293 
294 		node = node->next;
295 	}
296 
297 	return uwsgi.subscription_algo(current_slot, node);
298 }
299 
uwsgi_get_subscribe_node_by_name(struct uwsgi_subscribe_slot ** slot,char * key,uint16_t keylen,char * val,uint16_t vallen)300 struct uwsgi_subscribe_node *uwsgi_get_subscribe_node_by_name(struct uwsgi_subscribe_slot **slot, char *key, uint16_t keylen, char *val, uint16_t vallen) {
301 
302 	if (keylen > 0xff)
303 		return NULL;
304 	struct uwsgi_subscribe_slot *current_slot = uwsgi_get_subscribe_slot(slot, key, keylen);
305 	if (current_slot) {
306 		struct uwsgi_subscribe_node *node = current_slot->nodes;
307 		while (node) {
308 			if (!uwsgi_strncmp(val, vallen, node->name, node->len)) {
309 				return node;
310 			}
311 			node = node->next;
312 		}
313 	}
314 
315 	return NULL;
316 }
317 
uwsgi_remove_subscribe_node(struct uwsgi_subscribe_slot ** slot,struct uwsgi_subscribe_node * node)318 int uwsgi_remove_subscribe_node(struct uwsgi_subscribe_slot **slot, struct uwsgi_subscribe_node *node) {
319 
320 	int ret = 0;
321 
322 	struct uwsgi_subscribe_node *a_node;
323 	struct uwsgi_subscribe_slot *node_slot = node->slot;
324 	struct uwsgi_subscribe_slot *prev_slot = node_slot->prev;
325 	struct uwsgi_subscribe_slot *next_slot = node_slot->next;
326 
327 	int hash_key = node_slot->hash;
328 
329 	// over-engineering to avoid race conditions
330 	node->len = 0;
331 
332 	if (node == node_slot->nodes) {
333 		node_slot->nodes = node->next;
334 	}
335 	else {
336 		a_node = node_slot->nodes;
337 		while (a_node) {
338 			if (a_node->next == node) {
339 				a_node->next = node->next;
340 				break;
341 			}
342 			a_node = a_node->next;
343 		}
344 	}
345 
346 	free(node);
347 	// no more nodes, remove the slot too
348 	if (node_slot->nodes == NULL) {
349 
350 		ret = 1;
351 
352 		// first check if i am the only node
353 		if ((!prev_slot && !next_slot) || next_slot == node_slot) {
354 #ifdef UWSGI_SSL
355 			if (node_slot->sign_ctx) {
356 				EVP_PKEY_free(node_slot->sign_public_key);
357 				EVP_MD_CTX_destroy(node_slot->sign_ctx);
358 			}
359 #ifdef SSL_CTRL_SET_TLSEXT_HOSTNAME
360 			// if there is a SNI context active, destroy it
361 			if (node_slot->sni_enabled) {
362 				uwsgi_ssl_del_sni_item(node_slot->key, node_slot->keylen);
363 			}
364 #endif
365 #endif
366 			free(node_slot);
367 			slot[hash_key] = NULL;
368 			goto end;
369 		}
370 
371 		// if i am the main entry point, set the next value
372 		if (node_slot == slot[hash_key]) {
373 			slot[hash_key] = next_slot;
374 		}
375 
376 		if (prev_slot) {
377 			prev_slot->next = next_slot;
378 		}
379 		if (next_slot) {
380 			next_slot->prev = prev_slot;
381 		}
382 
383 #ifdef UWSGI_SSL
384 		if (node_slot->sign_ctx) {
385 			EVP_PKEY_free(node_slot->sign_public_key);
386 			EVP_MD_CTX_destroy(node_slot->sign_ctx);
387 		}
388 #endif
389 		free(node_slot);
390 	}
391 
392 end:
393 
394 	return ret;
395 }
396 
397 #ifdef UWSGI_SSL
398 static int subscription_new_sign_ctx(struct uwsgi_subscribe_slot *, struct uwsgi_subscribe_req *);
399 static int subscription_is_safe(struct uwsgi_subscribe_req *);
400 #endif
401 
uwsgi_add_subscribe_node(struct uwsgi_subscribe_slot ** slot,struct uwsgi_subscribe_req * usr)402 struct uwsgi_subscribe_node *uwsgi_add_subscribe_node(struct uwsgi_subscribe_slot **slot, struct uwsgi_subscribe_req *usr) {
403 
404 	struct uwsgi_subscribe_slot *current_slot = uwsgi_get_subscribe_slot(slot, usr->key, usr->keylen), *old_slot = NULL, *a_slot;
405 	struct uwsgi_subscribe_node *node, *old_node = NULL;
406 
407 	if (usr->address_len > 0xff || usr->address_len == 0)
408 		return NULL;
409 
410 	if (current_slot) {
411 #ifdef UWSGI_SSL
412 		if (uwsgi.subscriptions_sign_check_dir && !uwsgi_subscription_sign_check(current_slot, usr)) {
413 			return NULL;
414 		}
415 #endif
416 
417 		if (uwsgi.subscriptions_credentials_check_dir && !uwsgi_subscription_credentials_check(current_slot, usr)) {
418 			return NULL;
419 		}
420 
421 		node = current_slot->nodes;
422 		while (node) {
423 			if (!uwsgi_strncmp(node->name, node->len, usr->address, usr->address_len)) {
424 #ifdef UWSGI_SSL
425 				// this should avoid sending sniffed packets...
426 				if (current_slot->sign_ctx && !subscription_is_safe(usr) && usr->unix_check <= node->unix_check) {
427 					uwsgi_log("[uwsgi-subscription for pid %d] invalid (sniffed ?) packet sent for slot: %.*s node: %.*s unix_check: %lu\n", (int) uwsgi.mypid, usr->keylen, usr->key, usr->address_len, usr->address, (unsigned long) usr->unix_check);
428 					return NULL;
429 				}
430 				// eventually the packet could be upgraded to sni...
431 				uwsgi_subscription_sni_check(current_slot, usr);
432 #endif
433 				// remove death mark and update cores and load
434 				node->death_mark = 0;
435 				node->last_check = uwsgi_now();
436 				node->cores = usr->cores;
437 				node->load = usr->load;
438 				node->weight = usr->weight;
439 				if (!node->weight)
440 					node->weight = 1;
441 				node->last_requests = 0;
442 				return node;
443 			}
444 			old_node = node;
445 			node = node->next;
446 		}
447 
448 #ifdef UWSGI_SSL
449 		if (current_slot->sign_ctx && !subscription_is_safe(usr) && usr->unix_check < (uwsgi_now() - (time_t) uwsgi.subscriptions_sign_check_tolerance)) {
450 			uwsgi_log("[uwsgi-subscription for pid %d] invalid (sniffed ?) packet sent for slot: %.*s node: %.*s unix_check: %lu\n", (int) uwsgi.mypid, usr->keylen, usr->key, usr->address_len, usr->address, (unsigned long) usr->unix_check);
451                         return NULL;
452 		}
453 		// check here as we are sure the node will be added
454 		uwsgi_subscription_sni_check(current_slot, usr);
455 #endif
456 
457 		node = uwsgi_malloc(sizeof(struct uwsgi_subscribe_node));
458 		node->len = usr->address_len;
459 		node->modifier1 = usr->modifier1;
460 		node->modifier2 = usr->modifier2;
461 		node->requests = 0;
462 		node->last_requests = 0;
463 		node->tx = 0;
464 		node->rx = 0;
465 		node->reference = 0;
466 		node->death_mark = 0;
467 		node->failcnt = 0;
468 		node->cores = usr->cores;
469 		node->load = usr->load;
470 		node->weight = usr->weight;
471 		node->unix_check = usr->unix_check;
472 		if (!node->weight)
473 			node->weight = 1;
474 		node->wrr = 0;
475 		node->pid = usr->pid;
476 		node->uid = usr->uid;
477 		node->gid = usr->gid;
478 		node->notify[0] = 0;
479 		if (usr->notify_len > 0 && usr->notify_len < 102) {
480 			memcpy(node->notify, usr->notify, usr->notify_len);
481 			node->notify[usr->notify_len] = 0;
482 		}
483 		node->last_check = uwsgi_now();
484 		node->slot = current_slot;
485 		memcpy(node->name, usr->address, usr->address_len);
486 		if (old_node) {
487 			old_node->next = node;
488 		}
489 		node->next = NULL;
490 
491 		uwsgi_log("[uwsgi-subscription for pid %d] %.*s => new node: %.*s\n", (int) uwsgi.mypid, usr->keylen, usr->key, usr->address_len, usr->address);
492 		if (node->notify[0]) {
493 			char buf[1024];
494 			int ret = snprintf(buf, 1024, "[subscription ack] %.*s => new node: %.*s", usr->keylen, usr->key, usr->address_len, usr->address);
495 			if (ret > 0 && ret < 1024)
496 				uwsgi_notify_msg(node->notify, buf, ret);
497 		}
498 		return node;
499 	}
500 	else {
501 		current_slot = uwsgi_malloc(sizeof(struct uwsgi_subscribe_slot));
502 #ifdef UWSGI_SSL
503 		current_slot->sign_ctx = NULL;
504 		if (uwsgi.subscriptions_sign_check_dir && !subscription_new_sign_ctx(current_slot, usr)) {
505 			free(current_slot);
506 			return NULL;
507 		}
508 #endif
509 		uint32_t hash = djb33x_hash(usr->key, usr->keylen);
510 		int hash_key = hash % 0xffff;
511 		current_slot->hash = hash_key;
512 		current_slot->keylen = usr->keylen;
513 		memcpy(current_slot->key, usr->key, usr->keylen);
514 		if (uwsgi.subscriptions_credentials_check_dir) {
515 			if (!uwsgi_subscription_credentials_check(current_slot, usr)) {
516 				free(current_slot);
517 				return NULL;
518 			}
519 		}
520 
521 		current_slot->key[usr->keylen] = 0;
522 		current_slot->hits = 0;
523 #ifdef UWSGI_SSL
524 		current_slot->sni_enabled = 0;
525 		uwsgi_subscription_sni_check(current_slot, usr);
526 #endif
527 		current_slot->nodes = uwsgi_malloc(sizeof(struct uwsgi_subscribe_node));
528 		current_slot->nodes->slot = current_slot;
529 		current_slot->nodes->len = usr->address_len;
530 		current_slot->nodes->reference = 0;
531 		current_slot->nodes->requests = 0;
532 		current_slot->nodes->last_requests = 0;
533 		current_slot->nodes->tx = 0;
534 		current_slot->nodes->rx = 0;
535 		current_slot->nodes->death_mark = 0;
536 		current_slot->nodes->failcnt = 0;
537 		current_slot->nodes->modifier1 = usr->modifier1;
538 		current_slot->nodes->modifier2 = usr->modifier2;
539 		current_slot->nodes->cores = usr->cores;
540 		current_slot->nodes->load = usr->load;
541 		current_slot->nodes->weight = usr->weight;
542 		current_slot->nodes->unix_check = usr->unix_check;
543 		if (!current_slot->nodes->weight)
544 			current_slot->nodes->weight = 1;
545 		current_slot->nodes->wrr = 0;
546 		current_slot->nodes->pid = usr->pid;
547 		current_slot->nodes->uid = usr->uid;
548 		current_slot->nodes->gid = usr->gid;
549 		current_slot->nodes->notify[0] = 0;
550 		if (usr->notify_len > 0 && usr->notify_len < 102) {
551 			memcpy(current_slot->nodes->notify, usr->notify, usr->notify_len);
552 			current_slot->nodes->notify[usr->notify_len] = 0;
553 		}
554 		memcpy(current_slot->nodes->name, usr->address, usr->address_len);
555 		current_slot->nodes->last_check = uwsgi_now();
556 
557 		current_slot->nodes->next = NULL;
558 
559 		a_slot = slot[hash_key];
560 		while (a_slot) {
561 			old_slot = a_slot;
562 			a_slot = a_slot->next;
563 		}
564 
565 
566 		if (old_slot) {
567 			old_slot->next = current_slot;
568 		}
569 
570 		current_slot->prev = old_slot;
571 		current_slot->next = NULL;
572 
573 
574 		if (!slot[hash_key] || current_slot->prev == NULL) {
575 			slot[hash_key] = current_slot;
576 		}
577 
578 		uwsgi_log("[uwsgi-subscription for pid %d] new pool: %.*s (hash key: %d)\n", (int) uwsgi.mypid, usr->keylen, usr->key, current_slot->hash);
579 		uwsgi_log("[uwsgi-subscription for pid %d] %.*s => new node: %.*s\n", (int) uwsgi.mypid, usr->keylen, usr->key, usr->address_len, usr->address);
580 
581 		if (current_slot->nodes->notify[0]) {
582 			char buf[1024];
583 			int ret = snprintf(buf, 1024, "[subscription ack] %.*s => new node: %.*s", usr->keylen, usr->key, usr->address_len, usr->address);
584 			if (ret > 0 && ret < 1024)
585 				uwsgi_notify_msg(current_slot->nodes->notify, buf, ret);
586 		}
587 		return current_slot->nodes;
588 	}
589 
590 }
591 
send_subscription(int sfd,char * host,char * message,uint16_t message_size)592 static void send_subscription(int sfd, char *host, char *message, uint16_t message_size) {
593 
594 	int fd = sfd;
595 	struct sockaddr_in udp_addr;
596 	struct sockaddr_un un_addr;
597 	ssize_t ret;
598 
599 	char *udp_port = strchr(host, ':');
600 
601 	if (fd == -1) {
602 		if (udp_port) {
603 			fd = socket(AF_INET, SOCK_DGRAM, 0);
604 		}
605 		else {
606 			fd = socket(AF_UNIX, SOCK_DGRAM, 0);
607 		}
608 		if (fd < 0) {
609 			uwsgi_error("send_subscription()/socket()");
610 			return;
611 		}
612 		uwsgi_socket_nb(fd);
613 	}
614 	else if (fd == -2) {
615 		static int unix_fd = -1;
616 		static int inet_fd = -1;
617 		if (udp_port) {
618 			if (inet_fd == -1) {
619 				inet_fd = socket(AF_INET, SOCK_DGRAM, 0);
620 				if (inet_fd < 0) {
621 					uwsgi_error("send_subscription()/socket()");
622 					return;
623 				}
624 				uwsgi_socket_nb(inet_fd);
625 			}
626 			fd = inet_fd;
627 		}
628 		else {
629 			if (unix_fd == -1) {
630 				unix_fd = socket(AF_UNIX, SOCK_DGRAM, 0);
631 				if (unix_fd < 0) {
632 					uwsgi_error("send_subscription()/socket()");
633 					return;
634 				}
635 				uwsgi_socket_nb(unix_fd);
636 			}
637 			fd = unix_fd;
638 		}
639 	}
640 
641 	if (udp_port) {
642 		udp_port[0] = 0;
643 		memset(&udp_addr, 0, sizeof(struct sockaddr_in));
644 		udp_addr.sin_family = AF_INET;
645 		udp_addr.sin_port = htons(atoi(udp_port + 1));
646 		udp_addr.sin_addr.s_addr = inet_addr(host);
647 		ret = sendto(fd, message, message_size, 0, (struct sockaddr *) &udp_addr, sizeof(udp_addr));
648 		udp_port[0] = ':';
649 	}
650 	else {
651 		memset(&un_addr, 0, sizeof(struct sockaddr_un));
652 		un_addr.sun_family = AF_UNIX;
653 		// use 102 as the magic number
654 		strncat(un_addr.sun_path, host, 102);
655 		if (uwsgi.subscriptions_use_credentials) {
656 			// could be useless as internally the socket could add them automagically
657 			ret = uwsgi_pass_cred2(fd, message, message_size, (struct sockaddr *) &un_addr, sizeof(un_addr));
658 		}
659 		else {
660 			ret = sendto(fd, message, message_size, 0, (struct sockaddr *) &un_addr, sizeof(un_addr));
661 		}
662 	}
663 
664 	if (ret < 0) {
665 		uwsgi_error("send_subscription()/sendto()");
666 	}
667 
668 	if (sfd == -1)
669 		close(fd);
670 }
671 
uwsgi_subscription_ub(char * key,size_t keysize,uint8_t modifier1,uint8_t modifier2,uint8_t cmd,char * socket_name,char * sign,char * sni_key,char * sni_crt,char * sni_ca)672 static struct uwsgi_buffer *uwsgi_subscription_ub(char *key, size_t keysize, uint8_t modifier1, uint8_t modifier2, uint8_t cmd, char *socket_name, char *sign, char *sni_key, char *sni_crt, char *sni_ca) {
673 	struct uwsgi_buffer *ub = uwsgi_buffer_new(4096);
674 
675 	// make space for uwsgi header
676 	ub->pos = 4;
677 
678 	if (uwsgi_buffer_append_keyval(ub, "key", 3, key, keysize))
679 		goto end;
680 	if (uwsgi_buffer_append_keyval(ub, "address", 7, socket_name, strlen(socket_name)))
681 		goto end;
682 
683 	if (uwsgi.subscribe_with_modifier1) {
684 		modifier1 = atoi(uwsgi.subscribe_with_modifier1);
685 	}
686 	if (uwsgi_buffer_append_keynum(ub, "modifier1", 9, modifier1))
687 		goto end;
688 	if (uwsgi_buffer_append_keynum(ub, "modifier2", 9, modifier2))
689 		goto end;
690 	if (uwsgi_buffer_append_keynum(ub, "cores", 5, uwsgi.numproc * uwsgi.cores))
691 		goto end;
692 	if (uwsgi_buffer_append_keynum(ub, "load", 4, uwsgi.shared->load))
693 		goto end;
694 	if (uwsgi.auto_weight) {
695 		if (uwsgi_buffer_append_keynum(ub, "weight", 6, uwsgi.numproc * uwsgi.cores))
696 			goto end;
697 	}
698 	else {
699 		if (uwsgi_buffer_append_keynum(ub, "weight", 6, uwsgi.weight))
700 			goto end;
701 	}
702 
703 	if (sni_key) {
704 		if (uwsgi_buffer_append_keyval(ub, "sni_key", 7, sni_key, strlen(sni_key)))
705 			goto end;
706 	}
707 
708 	if (sni_crt) {
709 		if (uwsgi_buffer_append_keyval(ub, "sni_crt", 7, sni_crt, strlen(sni_crt)))
710 			goto end;
711 	}
712 
713 	if (sni_ca) {
714 		if (uwsgi_buffer_append_keyval(ub, "sni_ca", 6, sni_ca, strlen(sni_ca)))
715 			goto end;
716 	}
717 
718 	if (uwsgi.subscription_notify_socket) {
719 		if (uwsgi_buffer_append_keyval(ub, "notify", 6, uwsgi.subscription_notify_socket, strlen(uwsgi.subscription_notify_socket)))
720 			goto end;
721 	}
722 	else if (uwsgi.notify_socket_fd > -1 && uwsgi.notify_socket) {
723 		if (uwsgi_buffer_append_keyval(ub, "notify", 6, uwsgi.notify_socket, strlen(uwsgi.notify_socket)))
724 			goto end;
725 	}
726 
727 #ifdef UWSGI_SSL
728 	if (sign) {
729 		if (uwsgi_buffer_append_keynum(ub, "unix", 4, (uwsgi_now() + (time_t) cmd)))
730 			goto end;
731 
732 		unsigned int signature_len = 0;
733 		char *signature = uwsgi_rsa_sign(sign, ub->buf + 4, ub->pos - 4, &signature_len);
734 		if (signature && signature_len > 0) {
735 			if (uwsgi_buffer_append_keyval(ub, "sign", 4, signature, signature_len)) {
736 				free(signature);
737 				goto end;
738 			}
739 			free(signature);
740 		}
741 	}
742 #endif
743 
744 	// add uwsgi header
745 	if (uwsgi_buffer_set_uh(ub, 224, cmd))
746 		goto end;
747 
748 	return ub;
749 
750 end:
751 	uwsgi_buffer_destroy(ub);
752 	return NULL;
753 }
754 
uwsgi_send_subscription_from_fd(int fd,char * udp_address,char * key,size_t keysize,uint8_t modifier1,uint8_t modifier2,uint8_t cmd,char * socket_name,char * sign,char * sni_key,char * sni_crt,char * sni_ca)755 void uwsgi_send_subscription_from_fd(int fd, char *udp_address, char *key, size_t keysize, uint8_t modifier1, uint8_t modifier2, uint8_t cmd, char *socket_name, char *sign, char *sni_key, char *sni_crt, char *sni_ca) {
756 
757 	if (socket_name == NULL && !uwsgi.sockets)
758 		return;
759 
760 	if (!socket_name) {
761 		socket_name = uwsgi.sockets->name;
762 	}
763 
764 	struct uwsgi_buffer *ub = uwsgi_subscription_ub(key, keysize, modifier1, modifier2, cmd, socket_name, sign, sni_key, sni_crt, sni_ca);
765 
766 	if (!ub)
767 		return;
768 
769 	send_subscription(fd, udp_address, ub->buf, ub->pos);
770 	uwsgi_buffer_destroy(ub);
771 }
772 
773 
uwsgi_send_subscription(char * udp_address,char * key,size_t keysize,uint8_t modifier1,uint8_t modifier2,uint8_t cmd,char * socket_name,char * sign,char * sni_key,char * sni_crt,char * sni_ca)774 void uwsgi_send_subscription(char *udp_address, char *key, size_t keysize, uint8_t modifier1, uint8_t modifier2, uint8_t cmd, char *socket_name, char *sign, char *sni_key, char *sni_crt, char *sni_ca) {
775 	uwsgi_send_subscription_from_fd(-1, udp_address, key, keysize, modifier1, modifier2, cmd, socket_name, sign, sni_key, sni_crt, sni_ca);
776 }
777 
778 #ifdef UWSGI_SSL
subscription_is_safe(struct uwsgi_subscribe_req * usr)779 static int subscription_is_safe(struct uwsgi_subscribe_req *usr) {
780 	struct uwsgi_string_list *usl = NULL;
781         uwsgi_foreach(usl, uwsgi.subscriptions_sign_skip_uid) {
782                 if (usl->custom == 0) {
783                         usl->custom = atoi(usl->value);
784                 }
785                 if (usr->uid > 0 && usr->uid == (uid_t) usl->custom) {
786                         return 1;
787                 }
788         }
789 	return 0;
790 }
subscription_new_sign_ctx(struct uwsgi_subscribe_slot * slot,struct uwsgi_subscribe_req * usr)791 static int subscription_new_sign_ctx(struct uwsgi_subscribe_slot *slot, struct uwsgi_subscribe_req *usr) {
792 	if (subscription_is_safe(usr)) return 1;
793 
794 	if (usr->sign_len == 0 || usr->base_len == 0)
795 		return 0;
796 
797 	if (usr->unix_check < (uwsgi_now() - (time_t) uwsgi.subscriptions_sign_check_tolerance)) {
798         	uwsgi_log("[uwsgi-subscription for pid %d] invalid (sniffed ?) packet sent for slot: %.*s node: %.*s unix_check: %lu\n", (int) uwsgi.mypid, usr->keylen, usr->key, usr->address_len, usr->address, (unsigned long) usr->unix_check);
799 		return 0;
800         }
801 
802 	char *keyfile = uwsgi_sanitize_cert_filename(uwsgi.subscriptions_sign_check_dir, usr->key, usr->keylen);
803 	FILE *kf = fopen(keyfile, "r");
804 	free(keyfile);
805 	if (!kf) return 0;
806 	slot->sign_public_key = PEM_read_PUBKEY(kf, NULL, NULL, NULL);
807 	fclose(kf);
808 	if (!slot->sign_public_key) {
809         	uwsgi_log("unable to load public key for %.*s\n", usr->keylen, usr->key);
810 		return 0;
811 	}
812 	slot->sign_ctx = EVP_MD_CTX_create();
813 	if (!slot->sign_ctx) {
814         	uwsgi_log("unable to initialize EVP context for %.*s\n", usr->keylen, usr->key);
815                 EVP_PKEY_free(slot->sign_public_key);
816 		return 0;
817 	}
818 
819 	if (!uwsgi_subscription_sign_check(slot, usr)) {
820 		EVP_PKEY_free(slot->sign_public_key);
821 		EVP_MD_CTX_destroy(slot->sign_ctx);
822 		return 0;
823 	}
824 
825 	return 1;
826 }
uwsgi_subscription_sign_check(struct uwsgi_subscribe_slot * slot,struct uwsgi_subscribe_req * usr)827 int uwsgi_subscription_sign_check(struct uwsgi_subscribe_slot *slot, struct uwsgi_subscribe_req *usr) {
828 	if (subscription_is_safe(usr)) return 1;
829 
830 	if (usr->sign_len == 0 || usr->base_len == 0)
831 		return 0;
832 
833 	if (!slot->sign_ctx) {
834 		if (!subscription_new_sign_ctx(slot, usr)) return 0;
835 	}
836 
837 	if (EVP_VerifyInit_ex(slot->sign_ctx, uwsgi.subscriptions_sign_check_md, NULL) == 0) {
838 		ERR_print_errors_fp(stderr);
839 		return 0;
840 	}
841 
842 	if (EVP_VerifyUpdate(slot->sign_ctx, usr->base, usr->base_len) == 0) {
843 		ERR_print_errors_fp(stderr);
844 		return 0;
845 	}
846 
847 	if (EVP_VerifyFinal(slot->sign_ctx, (unsigned char *) usr->sign, usr->sign_len, slot->sign_public_key) != 1) {
848 #ifdef UWSGI_DEBUG
849 		ERR_print_errors_fp(stderr);
850 #endif
851 		return 0;
852 	}
853 
854 
855 	return 1;
856 }
857 #endif
858 
uwsgi_no_subscriptions(struct uwsgi_subscribe_slot ** slot)859 int uwsgi_no_subscriptions(struct uwsgi_subscribe_slot **slot) {
860 	int i;
861 	for (i = 0; i < UMAX16; i++) {
862 		if (slot[i])
863 			return 0;
864 	}
865 	return 1;
866 }
867 
uwsgi_subscription_init_ht()868 struct uwsgi_subscribe_slot **uwsgi_subscription_init_ht() {
869 	if (!uwsgi.subscription_algo) {
870 		uwsgi_subscription_set_algo(NULL);
871 	}
872 	return uwsgi_calloc(sizeof(struct uwsgi_subscription_slot *) * UMAX16);
873 }
874 
uwsgi_subscribe(char * subscription,uint8_t cmd)875 void uwsgi_subscribe(char *subscription, uint8_t cmd) {
876 
877 	size_t subfile_size;
878 	size_t i;
879 	char *key = NULL;
880 	int keysize = 0;
881 	char *modifier1 = NULL;
882 	int modifier1_len = 0;
883 	char *socket_name = NULL;
884 	char *udp_address = subscription;
885 	char *udp_port = NULL;
886 	char *subscription_key = NULL;
887 	char *sign = NULL;
888 
889 	// check for explicit socket_name
890 	char *equal = strchr(subscription, '=');
891 	if (equal) {
892 		socket_name = subscription;
893 		if (socket_name[0] == '=') {
894 			equal = strchr(socket_name + 1, '=');
895 			if (!equal)
896 				return;
897 			*equal = '\0';
898 			struct uwsgi_socket *us = uwsgi_get_shared_socket_by_num(atoi(socket_name + 1));
899 			if (!us)
900 				return;
901 			socket_name = us->name;
902 		}
903 		*equal = '\0';
904 		udp_address = equal + 1;
905 	}
906 
907 	// check for unix socket
908 	if (udp_address[0] != '/') {
909 		udp_port = strchr(udp_address, ':');
910 		if (!udp_port) {
911 			if (equal)
912 				*equal = '=';
913 			return;
914 		}
915 		subscription_key = strchr(udp_port + 1, ':');
916 	}
917 	else {
918 		subscription_key = strchr(udp_address + 1, ':');
919 	}
920 
921 	if (!subscription_key) {
922 		if (equal)
923 			*equal = '=';
924 		return;
925 	}
926 
927 	udp_address = uwsgi_concat2n(udp_address, subscription_key - udp_address, "", 0);
928 
929 	if (subscription_key[1] == '@') {
930 		if (!uwsgi_file_exists(subscription_key + 2))
931 			goto clear;
932 		char *lines = uwsgi_open_and_read(subscription_key + 2, &subfile_size, 1, NULL);
933 		if (subfile_size > 0) {
934 			key = lines;
935 			for (i = 0; i < subfile_size; i++) {
936 				if (lines[i] == 0) {
937 					if (keysize > 0) {
938 						if (key[0] != '#' && key[0] != '\n') {
939 							modifier1 = strchr(key, ',');
940 							if (modifier1) {
941 								modifier1[0] = 0;
942 								modifier1++;
943 								modifier1_len = strlen(modifier1);
944 								keysize = strlen(key);
945 							}
946 							uwsgi_send_subscription(udp_address, key, keysize, uwsgi_str_num(modifier1, modifier1_len), 0, cmd, socket_name, sign, NULL, NULL, NULL);
947 							modifier1 = NULL;
948 							modifier1_len = 0;
949 						}
950 					}
951 					break;
952 				}
953 				else if (lines[i] == '\n') {
954 					if (keysize > 0) {
955 						if (key[0] != '#' && key[0] != '\n') {
956 							lines[i] = 0;
957 							modifier1 = strchr(key, ',');
958 							if (modifier1) {
959 								modifier1[0] = 0;
960 								modifier1++;
961 								modifier1_len = strlen(modifier1);
962 								keysize = strlen(key);
963 							}
964 							uwsgi_send_subscription(udp_address, key, keysize, uwsgi_str_num(modifier1, modifier1_len), 0, cmd, socket_name, sign, NULL, NULL, NULL);
965 							modifier1 = NULL;
966 							modifier1_len = 0;
967 							lines[i] = '\n';
968 						}
969 					}
970 					key = lines + i + 1;
971 					keysize = 0;
972 					continue;
973 				}
974 				keysize++;
975 			}
976 		}
977 		free(lines);
978 	}
979 	else {
980 		modifier1 = strchr(subscription_key + 1, ',');
981 		if (modifier1) {
982 			modifier1[0] = 0;
983 			modifier1++;
984 
985 			sign = strchr(modifier1 + 1, ',');
986 			if (sign) {
987 				*sign = 0;
988 				sign++;
989 			}
990 			modifier1_len = strlen(modifier1);
991 		}
992 
993 		uwsgi_send_subscription(udp_address, subscription_key + 1, strlen(subscription_key + 1), uwsgi_str_num(modifier1, modifier1_len), 0, cmd, socket_name, sign, NULL, NULL, NULL);
994 		if (modifier1)
995 			modifier1[-1] = ',';
996 		if (sign)
997 			sign[-1] = ',';
998 	}
999 
1000 clear:
1001 	if (equal)
1002 		*equal = '=';
1003 	free(udp_address);
1004 
1005 }
1006 
uwsgi_subscribe2(char * arg,uint8_t cmd)1007 void uwsgi_subscribe2(char *arg, uint8_t cmd) {
1008 
1009 	char *s2_server = NULL;
1010 	char *s2_key = NULL;
1011 	char *s2_socket = NULL;
1012 	char *s2_addr = NULL;
1013 	char *s2_weight = NULL;
1014 	char *s2_sign = NULL;
1015 	char *s2_modifier1 = NULL;
1016 	char *s2_modifier2 = NULL;
1017 	char *s2_check = NULL;
1018 	char *s2_sni_key = NULL;
1019 	char *s2_sni_crt = NULL;
1020 	char *s2_sni_ca = NULL;
1021 
1022 	if (uwsgi_kvlist_parse(arg, strlen(arg), ',', '=', "server", &s2_server, "key", &s2_key, "socket", &s2_socket, "addr", &s2_addr, "weight", &s2_weight, "modifier1", &s2_modifier1, "modifier2", &s2_modifier2, "sign", &s2_sign, "check", &s2_check, "sni_key", &s2_sni_key, "sni_crt", &s2_sni_crt, "sni_ca", &s2_sni_ca, NULL)) {
1023 		return;
1024 	}
1025 
1026 	if (!s2_server || !s2_key)
1027 		goto end;
1028 
1029 	if (s2_check) {
1030 		if (uwsgi_file_exists(s2_check))
1031 			goto end;
1032 	}
1033 
1034 	if (s2_weight) {
1035 		uwsgi.weight = atoi(s2_weight);
1036 	}
1037 
1038 	if (s2_socket) {
1039 		struct uwsgi_socket *us = uwsgi_get_socket_by_num(atoi(s2_socket));
1040 		if (us) {
1041 			if (s2_addr) {
1042 				free(s2_addr);
1043 			}
1044 			s2_addr = uwsgi_str(us->name);
1045 		}
1046 	}
1047 
1048 	uint8_t modifier1 = 0;
1049 	uint8_t modifier2 = 0;
1050 
1051 	if (s2_modifier1) {
1052 		modifier1 = atoi(s2_modifier1);
1053 	}
1054 
1055 	if (s2_modifier2) {
1056 		modifier2 = atoi(s2_modifier2);
1057 	}
1058 
1059 	uwsgi_send_subscription(s2_server, s2_key, strlen(s2_key), modifier1, modifier2, cmd, s2_addr, s2_sign, s2_sni_key, s2_sni_crt, s2_sni_ca);
1060 end:
1061 	if (s2_server)
1062 		free(s2_server);
1063 	if (s2_key)
1064 		free(s2_key);
1065 	if (s2_socket)
1066 		free(s2_socket);
1067 	if (s2_addr)
1068 		free(s2_addr);
1069 	if (s2_weight)
1070 		free(s2_weight);
1071 	if (s2_modifier1)
1072 		free(s2_modifier1);
1073 	if (s2_modifier2)
1074 		free(s2_modifier2);
1075 	if (s2_sign)
1076 		free(s2_sign);
1077 	if (s2_check)
1078 		free(s2_check);
1079 	if (s2_sni_crt)
1080 		free(s2_sni_crt);
1081 	if (s2_sni_key)
1082 		free(s2_sni_key);
1083 	if (s2_sni_ca)
1084 		free(s2_sni_ca);
1085 }
1086 
uwsgi_subscribe_all(uint8_t cmd,int verbose)1087 void uwsgi_subscribe_all(uint8_t cmd, int verbose) {
1088 
1089 	if (uwsgi.subscriptions_blocked)
1090 		return;
1091 	// -- subscribe
1092 	struct uwsgi_string_list *subscriptions = uwsgi.subscriptions;
1093 	while (subscriptions) {
1094 		if (verbose) {
1095 			uwsgi_log("%s %s\n", cmd ? "unsubscribing from" : "subscribing to", subscriptions->value);
1096 		}
1097 		uwsgi_subscribe(subscriptions->value, cmd);
1098 		subscriptions = subscriptions->next;
1099 	}
1100 
1101 	// --subscribe2
1102 	subscriptions = uwsgi.subscriptions2;
1103 	while (subscriptions) {
1104 		if (verbose) {
1105 			uwsgi_log("%s %s\n", cmd ? "unsubscribing from" : "subscribing to", subscriptions->value);
1106 		}
1107 		uwsgi_subscribe2(subscriptions->value, cmd);
1108 		subscriptions = subscriptions->next;
1109 	}
1110 
1111 }
1112