1 #include "uwsgi.h"
2 
3 extern struct uwsgi_server uwsgi;
4 #define cache_item(x) (struct uwsgi_cache_item *) (((char *)uc->items) + ((sizeof(struct uwsgi_cache_item)+uc->keysize) * x))
5 
6 // block bitmap manager
7 
8 /* how the cache bitmap works:
9 
10 	a bitmap is a shared memory area allocated when requested by the user with --cache2
11 
12 	Each block maps to a bit in the bitmap. If the corresponding bit is cleared
13 	the block is usable otherwise the block scanner will search for the next one.
14 
15 	Object can be placed only on consecutive blocks, fragmentation is not allowed.
16 
17 	To increase the scan performance, a 64bit pointer to the last used bit + 1 is hold
18 
19 	To search for free blocks you run
20 
21 	uint64_t uwsgi_cache_find_free_block(struct uwsgi_cache *uc, size_t need)
22 
23 	where need is the size of the object
24 
25 */
26 
cache_full(struct uwsgi_cache * uc)27 static void cache_full(struct uwsgi_cache *uc) {
28 	uint64_t i;
29 	int clear_cache = uc->clear_on_full;
30 
31 	if (!uc->ignore_full) {
32         	if (uc->purge_lru)
33                 	uwsgi_log("LRU item will be purged from cache \"%s\"\n", uc->name);
34                 else
35                 	uwsgi_log("*** DANGER cache \"%s\" is FULL !!! ***\n", uc->name);
36 	}
37 
38         uc->full++;
39 
40         if (uc->purge_lru && uc->lru_head)
41         	uwsgi_cache_del2(uc, NULL, 0, uc->lru_head, UWSGI_CACHE_FLAG_LOCAL);
42 
43 	// we do not need locking here !
44 	if (uc->sweep_on_full) {
45 		uint64_t removed = 0;
46 		uint64_t now = (uint64_t) uwsgi_now();
47 		if (uc->next_scan <= now) {
48 			uc->next_scan = now + uc->sweep_on_full;
49 			for (i = 1; i < uc->max_items; i++) {
50 				struct uwsgi_cache_item *uci = cache_item(i);
51 				if (uci->expires > 0 && uci->expires <= now) {
52 					if (!uwsgi_cache_del2(uc, NULL, 0, i, 0)) {
53 						removed++;
54 					}
55 				}
56 			}
57 		}
58 		if (removed) {
59 			clear_cache = 0;
60 		}
61 	}
62 
63 	if (clear_cache) {
64                 for (i = 1; i < uc->max_items; i++) {
65                 	uwsgi_cache_del2(uc, NULL, 0, i, 0);
66                 }
67 	}
68 }
69 
uwsgi_cache_find_free_blocks(struct uwsgi_cache * uc,uint64_t need)70 static uint64_t uwsgi_cache_find_free_blocks(struct uwsgi_cache *uc, uint64_t need) {
71 	// how many blocks we need ?
72 	uint64_t needed_blocks = need/uc->blocksize;
73 	if (need % uc->blocksize > 0) needed_blocks++;
74 
75 	// which is the first free bit?
76 	uint64_t bitmap_byte = 0;
77 	uint8_t bitmap_bit = 0;
78 
79 	if (uc->blocks_bitmap_pos > 0) {
80 		bitmap_byte = uc->blocks_bitmap_pos/8;
81 		bitmap_bit = uc->blocks_bitmap_pos % 8;
82 	}
83 
84 	// ok we now have the start position, let's search for contiguous blocks
85 	uint8_t *bitmap = uc->blocks_bitmap;
86 	uint64_t base = 0xffffffffffffffffLLU;
87 	uint8_t base_bit = 0;
88 	uint64_t j;
89 	uint64_t found = 0;
90 	uint64_t need_to_scan = uc->blocks_bitmap_size;
91 	// we make an addition round for the corner case of a single byte map not starting from 0
92 	if (bitmap_bit > 0) need_to_scan++;
93 	j = bitmap_byte;
94 	//uwsgi_log("start scanning %llu bytes starting from %llu need: %llu\n", (unsigned long long) need_to_scan, (unsigned long long) bitmap_byte, (unsigned long long) needed_blocks);
95 	while(need_to_scan) {
96 		uint8_t num = bitmap[j];
97 		uint8_t i;
98 		uint8_t bit_pos = 0;
99 		if (j == bitmap_byte) {
100 			i = 1 << (7-bitmap_bit);
101 			bit_pos = bitmap_bit;
102 		}
103 		else {
104 			i = 1 <<7;
105 		}
106 		while(i > 0) {
107 			// used block
108                 	if (num & i) {
109                                 found = 0;
110                                 base = 0xffffffffffffffffLLU;
111                                 base_bit = 0;
112                         }
113 			// free block
114                         else {
115                                 if (base == 0xffffffffffffffffLLU ) {
116                                         base = j;
117 					base_bit = bit_pos;
118                                 }
119                                 found++;
120                                 if (found == needed_blocks) {
121 #ifdef UWSGI_DEBUG
122                                         printf("found %llu consecutive bit starting from byte %llu\n", (unsigned long long) found, (unsigned long long) base);
123 #endif
124 					return ((base*8) + base_bit);
125                                 }
126                         }
127                         i >>= 1;
128 			bit_pos++;
129                 }
130 		j++;
131 		need_to_scan--;
132 		// check for overlap (that is not supported)
133 		if (j >= uc->blocks_bitmap_size) {
134 			j = 0;
135 			found = 0;
136 			base = 0xffffffffffffffffLLU;
137 			base_bit = 0;
138 			// we use bitmap_bit only at the first round
139 			bitmap_bit = 0;
140 		}
141 	}
142 
143 
144 	// no more free blocks
145 	return 0xffffffffffffffffLLU;
146 }
147 
cache_mark_blocks(struct uwsgi_cache * uc,uint64_t index,uint64_t len)148 static uint64_t cache_mark_blocks(struct uwsgi_cache *uc, uint64_t index, uint64_t len) {
149 	uint64_t needed_blocks = len/uc->blocksize;
150 	if (len % uc->blocksize > 0) needed_blocks++;
151 
152 	uint64_t first_byte = index/8;
153 	uint8_t first_byte_bit = index % 8;
154 	// offset starts with 0, so actual last bit is index + needed_blocks - 1
155 	uint64_t last_byte = (index + needed_blocks - 1) / 8;
156 	uint8_t last_byte_bit = (index + needed_blocks - 1) % 8;
157 
158 	uint64_t needed_bytes = (last_byte - first_byte) + 1;
159 
160 	//uwsgi_log("%llu %u %llu %u\n", first_byte, first_byte_bit, last_byte, last_byte_bit);
161 
162 	uint8_t mask = 0xff >> first_byte_bit;
163 
164 	if (needed_bytes == 1) {
165 		// kinda hacky, but it does the job
166 		mask >>= (7 - last_byte_bit);
167 		mask <<= (7 - last_byte_bit);
168 	}
169 
170 	uc->blocks_bitmap[first_byte] |= mask;
171 
172 	if (needed_bytes > 1) {
173 		mask = 0xff << (7 - last_byte_bit);
174 		uc->blocks_bitmap[last_byte] |= mask;
175 	}
176 
177 	if (needed_bytes > 2) {
178 		uint8_t *ptr = &uc->blocks_bitmap[first_byte+1];
179 		memset(ptr, 0xff, needed_bytes-2);
180 	}
181 	return needed_blocks;
182 }
183 
cache_unmark_blocks(struct uwsgi_cache * uc,uint64_t index,uint64_t len)184 static void cache_unmark_blocks(struct uwsgi_cache *uc, uint64_t index, uint64_t len) {
185 	uint64_t needed_blocks = len/uc->blocksize;
186         if (len % uc->blocksize > 0) needed_blocks++;
187 
188         uint64_t first_byte = index/8;
189         uint8_t first_byte_bit = index % 8;
190         // offset starts with 0, so actual last bit is index + needed_blocks - 1
191         uint64_t last_byte = (index + needed_blocks - 1)/8;
192         uint8_t last_byte_bit = (index + needed_blocks - 1) % 8;
193 
194 	uint64_t needed_bytes = (last_byte - first_byte) + 1;
195 
196 	uint8_t mask = 0xff >> first_byte_bit;
197 
198 	if (needed_bytes == 1) {
199                 // kinda hacky, but it does the job
200                 mask >>= (7 - last_byte_bit);
201                 mask <<= (7 - last_byte_bit);
202         }
203 
204         // here we use AND (0+0 = 0 | 1+0 = 0 | 0+1 = 0| 1+1 = 1)
205     	// 0 in mask means "unmark", 1 in mask means "do not change"
206     	// so we need to invert the mask
207         uc->blocks_bitmap[first_byte] &= ~mask;
208 
209         if (needed_bytes > 1) {
210                 mask = 0xff << (7 - last_byte_bit);
211                 uc->blocks_bitmap[last_byte] &= ~mask;
212         }
213 
214         if (needed_bytes > 2) {
215                 uint8_t *ptr = &uc->blocks_bitmap[first_byte+1];
216                 memset(ptr, 0, needed_bytes-2);
217         }
218 }
219 
220 static void cache_send_udp_command(struct uwsgi_cache *, char *, uint16_t, char *, uint16_t, uint64_t, uint8_t);
221 
cache_sync_hook(char * k,uint16_t kl,char * v,uint16_t vl,void * data)222 static void cache_sync_hook(char *k, uint16_t kl, char *v, uint16_t vl, void *data) {
223 	struct uwsgi_cache *uc = (struct uwsgi_cache *) data;
224 	if (!uwsgi_strncmp(k, kl, "items", 5)) {
225 		size_t num = uwsgi_str_num(v, vl);
226 		if (num != uc->max_items) {
227 			uwsgi_log("[cache-sync] invalid cache size, expected %llu received %llu\n", (unsigned long long) uc->max_items, (unsigned long long) num);
228 			exit(1);
229 		}
230 	}
231 	if (!uwsgi_strncmp(k, kl, "blocksize", 9)) {
232 		size_t num = uwsgi_str_num(v, vl);
233 		if (num != uc->blocksize) {
234 			uwsgi_log("[cache-sync] invalid cache block size, expected %llu received %llu\n", (unsigned long long) uc->blocksize, (unsigned long long) num);
235 			exit(1);
236 		}
237 	}
238 }
239 
uwsgi_cache_add_items(struct uwsgi_cache * uc)240 static void uwsgi_cache_add_items(struct uwsgi_cache *uc) {
241 	struct uwsgi_string_list *usl = uwsgi.add_cache_item;
242 	while(usl) {
243 		char *space = strchr(usl->value, ' ');
244 		char *key = usl->value;
245 		uint16_t key_len;
246 		if (space) {
247 			// need to skip ?
248 			if (uwsgi_strncmp(uc->name, uc->name_len, usl->value, space-usl->value)) {
249 				goto next;
250 			}
251 			key = space+1;
252 		}
253 		char *value = strchr(key, '=');
254 		if (!value) {
255 			uwsgi_log("[cache] unable to store item %s\n", usl->value);
256 			goto next;
257 		}
258 		key_len = value - key;
259 		value++;
260 		uint64_t len = (usl->value + usl->len) - value;
261 		uwsgi_wlock(uc->lock);
262 		if (!uwsgi_cache_set2(uc, key, key_len, value, len, 0, 0)) {
263 			uwsgi_log("[cache] stored \"%.*s\" in \"%s\"\n", key_len, key, uc->name);
264 		}
265 		else {
266 			uwsgi_log("[cache-error] unable to store \"%.*s\" in \"%s\"\n", key_len, key, uc->name);
267 		}
268 		uwsgi_rwunlock(uc->lock);
269 next:
270 		usl = usl->next;
271 	}
272 }
273 
uwsgi_cache_load_files(struct uwsgi_cache * uc)274 static void uwsgi_cache_load_files(struct uwsgi_cache *uc) {
275 
276 	struct uwsgi_string_list *usl = uwsgi.load_file_in_cache;
277 	while(usl) {
278 		size_t len = 0;
279 		char *value = NULL;
280 		char *key = usl->value;
281 		uint16_t key_len = usl->len;
282 		char *space = strchr(usl->value, ' ');
283 		if (space) {
284 			// need to skip ?
285 			if (uwsgi_strncmp(uc->name, uc->name_len, usl->value, space-usl->value)) {
286 				goto next;
287 			}
288 			key = space+1;
289 			key_len = usl->len - ((space-usl->value)+1);
290 		}
291 		value = uwsgi_open_and_read(key, &len, 0, NULL);
292 		if (value) {
293 			uwsgi_wlock(uc->lock);
294 			if (!uwsgi_cache_set2(uc, key, key_len, value, len, 0, 0)) {
295 				uwsgi_log("[cache] stored \"%.*s\" in \"%s\"\n", key_len, key, uc->name);
296 			}
297 			else {
298 				uwsgi_log("[cache-error] unable to store \"%.*s\" in \"%s\"\n", key_len, key, uc->name);
299 			}
300 			uwsgi_rwunlock(uc->lock);
301 			free(value);
302 		}
303 		else {
304 			uwsgi_log("[cache-error] unable to read file \"%.*s\"\n", key_len, key);
305 		}
306 next:
307 		usl = usl->next;
308 	}
309 
310 #ifdef UWSGI_ZLIB
311 	usl = uwsgi.load_file_in_cache_gzip;
312         while(usl) {
313                 size_t len = 0;
314                 char *value = NULL;
315                 char *key = usl->value;
316                 uint16_t key_len = usl->len;
317                 char *space = strchr(usl->value, ' ');
318                 if (space) {
319                         // need to skip ?
320                         if (uwsgi_strncmp(uc->name, uc->name_len, usl->value, space-usl->value)) {
321                                 goto next2;
322                         }
323                         key = space+1;
324                         key_len = usl->len - ((space-usl->value)+1);
325                 }
326                 value = uwsgi_open_and_read(key, &len, 0, NULL);
327                 if (value) {
328 			struct uwsgi_buffer *gzipped = uwsgi_gzip(value, len);
329 			if (gzipped) {
330                         	uwsgi_wlock(uc->lock);
331                         	if (!uwsgi_cache_set2(uc, key, key_len, gzipped->buf, gzipped->len, 0, 0)) {
332                                 	uwsgi_log("[cache-gzip] stored \"%.*s\" in \"%s\"\n", key_len, key, uc->name);
333                         	}
334                         	uwsgi_rwunlock(uc->lock);
335 				uwsgi_buffer_destroy(gzipped);
336 			}
337                         free(value);
338                 }
339 next2:
340                 usl = usl->next;
341         }
342 #endif
343 }
344 
345 
346 
uwsgi_cache_init(struct uwsgi_cache * uc)347 void uwsgi_cache_init(struct uwsgi_cache *uc) {
348 
349 	uc->hashtable = uwsgi_calloc_shared(sizeof(uint64_t) * uc->hashsize);
350 	uc->unused_blocks_stack = uwsgi_calloc_shared(sizeof(uint64_t) * uc->max_items);
351 	uc->unused_blocks_stack_ptr = 0;
352 	uc->filesize = ( (sizeof(struct uwsgi_cache_item)+uc->keysize) * uc->max_items) + (uc->blocksize * uc->blocks);
353 
354 	uint64_t i;
355 	for (i = 1; i < uc->max_items; i++) {
356         	uc->unused_blocks_stack_ptr++;
357                 uc->unused_blocks_stack[uc->unused_blocks_stack_ptr] = i;
358         }
359 
360 	if (uc->use_blocks_bitmap) {
361 		uc->blocks_bitmap_size = uc->blocks/8;
362                 uint8_t m = uc->blocks % 8;
363 		if (m > 0) uc->blocks_bitmap_size++;
364 		uc->blocks_bitmap = uwsgi_calloc_shared(uc->blocks_bitmap_size);
365 		if (m > 0) {
366 			uc->blocks_bitmap[uc->blocks_bitmap_size-1] = 0xff >> m;
367 		}
368 	}
369 
370 	//uwsgi.cache_items = (struct uwsgi_cache_item *) mmap(NULL, sizeof(struct uwsgi_cache_item) * uwsgi.cache_max_items, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANON, -1, 0);
371 	if (uc->store) {
372 		int cache_fd;
373 		struct stat cst;
374 
375         if (uc->store_delete && !stat(uc->store, &cst) && ((size_t) cst.st_size != uc->filesize || !S_ISREG(cst.st_mode))) {
376             uwsgi_log("Removing invalid cache store file: %s\n", uc->store);
377             if (unlink(uc->store) != 0) {
378                 uwsgi_log("Cannot remove invalid cache store file: %s\n", uc->store);
379                 exit(1);
380             }
381         }
382 
383 		if (stat(uc->store, &cst)) {
384 			uwsgi_log("creating a new cache store file: %s\n", uc->store);
385 			cache_fd = open(uc->store, O_CREAT | O_RDWR, S_IRUSR | S_IWUSR);
386 			if (cache_fd >= 0) {
387 				// fill the caching store
388 				if (ftruncate(cache_fd, uc->filesize)) {
389 					uwsgi_log("ftruncate()");
390 					exit(1);
391 				}
392 			}
393 		}
394 		else {
395 			if ((size_t) cst.st_size != uc->filesize || !S_ISREG(cst.st_mode)) {
396 				uwsgi_log("invalid cache store file. Please remove it or fix cache blocksize/items to match its size\n");
397 				exit(1);
398 			}
399 			cache_fd = open(uc->store, O_CREAT | O_RDWR, S_IRUSR | S_IWUSR);
400 			uwsgi_log("recovered cache from backing store file: %s\n", uc->store);
401 		}
402 
403 		if (cache_fd < 0) {
404 			uwsgi_error_open(uc->store);
405 			exit(1);
406 		}
407 		uc->items = (struct uwsgi_cache_item *) mmap(NULL, uc->filesize, PROT_READ | PROT_WRITE, MAP_SHARED, cache_fd, 0);
408 		if (uc->items == MAP_FAILED) {
409 			uwsgi_error("uwsgi_cache_init()/mmap() [with store]");
410 			exit(1);
411 		}
412 
413 		uwsgi_cache_fix(uc);
414 		close(cache_fd);
415 	}
416 	else {
417 		uc->items = (struct uwsgi_cache_item *) mmap(NULL, uc->filesize, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANON, -1, 0);
418 		if (uc->items == MAP_FAILED) {
419 			uwsgi_error("uwsgi_cache_init()/mmap()");
420 			exit(1);
421 		}
422 		uint64_t i;
423 		for (i = 0; i < uc->max_items; i++) {
424 			// here we only need to clear the item header
425 			memset(cache_item(i), 0, sizeof(struct uwsgi_cache_item));
426 		}
427 	}
428 
429 	uc->data = ((char *)uc->items) + ((sizeof(struct uwsgi_cache_item)+uc->keysize) * uc->max_items);
430 
431 	if (uc->name) {
432 		// can't free that until shutdown
433 		char *lock_name = uwsgi_concat2("cache_", uc->name);
434 		uc->lock = uwsgi_rwlock_init(lock_name);
435 	}
436 	else {
437 		uc->lock = uwsgi_rwlock_init("cache");
438 	}
439 
440 	uwsgi_log("*** Cache \"%s\" initialized: %lluMB (key: %llu bytes, keys: %llu bytes, data: %llu bytes, bitmap: %llu bytes) preallocated ***\n",
441 			uc->name,
442 			(unsigned long long) uc->filesize / (1024 * 1024),
443 			(unsigned long long) sizeof(struct uwsgi_cache_item)+uc->keysize,
444 			(unsigned long long) ((sizeof(struct uwsgi_cache_item)+uc->keysize) * uc->max_items), (unsigned long long) (uc->blocksize * uc->blocks),
445 			(unsigned long long) uc->blocks_bitmap_size);
446 
447 	uwsgi_cache_setup_nodes(uc);
448 
449 	uc->udp_node_socket = socket(AF_INET, SOCK_DGRAM, 0);
450 	if (uc->udp_node_socket < 0) {
451 		uwsgi_error("[cache-udp-node] socket()");
452 		exit(1);
453 	}
454 	uwsgi_socket_nb(uc->udp_node_socket);
455 
456 	uwsgi_cache_sync_from_nodes(uc);
457 
458 	uwsgi_cache_load_files(uc);
459 
460 	uwsgi_cache_add_items(uc);
461 
462 }
463 
check_lazy(struct uwsgi_cache * uc,struct uwsgi_cache_item * uci,uint64_t slot)464 static uint64_t check_lazy(struct uwsgi_cache *uc, struct uwsgi_cache_item *uci, uint64_t slot) {
465 	if (!uci->expires || !uc->lazy_expire) return slot;
466 	uint64_t now = (uint64_t) uwsgi_now();
467 	// expired ?
468 	if (uci->expires <= now) {
469 		uwsgi_cache_del2(uc, NULL, 0, slot, UWSGI_CACHE_FLAG_LOCAL);
470 		return 0;
471 	}
472 	return slot;
473 }
474 
uwsgi_cache_get_index(struct uwsgi_cache * uc,char * key,uint16_t keylen)475 static uint64_t uwsgi_cache_get_index(struct uwsgi_cache *uc, char *key, uint16_t keylen) {
476 
477 	uint32_t hash = uc->hash->func(key, keylen);
478 	uint32_t hash_key = hash % uc->hashsize;
479 
480 	uint64_t slot = uc->hashtable[hash_key];
481 
482 	// optimization
483 	if (slot == 0) return 0;
484 
485 	//uwsgi_log("hash_key = %lu slot = %llu\n", hash_key, (unsigned long long) slot);
486 
487 	struct uwsgi_cache_item *uci = cache_item(slot);
488 	uint64_t rounds = 0;
489 
490 	// first round
491 	if (uci->hash % uc->hashsize != hash_key)
492 		return 0;
493 	if (uci->hash != hash)
494 		goto cycle;
495 	if (uci->keysize != keylen)
496 		goto cycle;
497 	if (memcmp(uci->key, key, keylen))
498 		goto cycle;
499 
500 	return check_lazy(uc, uci, slot);
501 
502 cycle:
503 	while (uci->next) {
504 		slot = uci->next;
505 		uci = cache_item(slot);
506 		rounds++;
507 		if (rounds > uc->max_items) {
508 			uwsgi_log("ALARM !!! cache-loop (and potential deadlock) detected slot = %lu prev = %lu next = %lu\n", slot, uci->prev, uci->next);
509 			// terrible case: the whole uWSGI stack can deadlock, leaving only the master alive
510 			// if the master is avalable, trigger a brutal reload
511 			if (uwsgi.master_process) {
512 				kill(uwsgi.workers[0].pid, SIGTERM);
513 			}
514 			// otherwise kill the current worker (could be pretty useless...)
515 			else {
516 				exit(1);
517 			}
518 		}
519 		if (uci->hash != hash)
520 			continue;
521 		if (uci->keysize != keylen)
522 			continue;
523 		if (!memcmp(uci->key, key, keylen)) {
524 			return check_lazy(uc, uci, slot);
525 		}
526 	}
527 
528 	return 0;
529 }
530 
uwsgi_cache_exists2(struct uwsgi_cache * uc,char * key,uint16_t keylen)531 uint32_t uwsgi_cache_exists2(struct uwsgi_cache *uc, char *key, uint16_t keylen) {
532 
533 	return uwsgi_cache_get_index(uc, key, keylen);
534 }
535 
lru_remove_item(struct uwsgi_cache * uc,uint64_t index)536 static void lru_remove_item(struct uwsgi_cache *uc, uint64_t index)
537 {
538 	struct uwsgi_cache_item *prev, *next, *curr = cache_item(index);
539 
540 	if (curr->lru_next) {
541 		next = cache_item(curr->lru_next);
542 		next->lru_prev = curr->lru_prev;
543 	} else
544 		uc->lru_tail = curr->lru_prev;
545 
546 	if (curr->lru_prev) {
547 		prev = cache_item(curr->lru_prev);
548 		prev->lru_next = curr->lru_next;
549 	} else
550 		uc->lru_head = curr->lru_next;
551 }
552 
lru_add_item(struct uwsgi_cache * uc,uint64_t index)553 static void lru_add_item(struct uwsgi_cache *uc, uint64_t index)
554 {
555 	struct uwsgi_cache_item *prev, *curr = cache_item(index);
556 
557 	if (uc->lru_tail) {
558 		prev = cache_item(uc->lru_tail);
559 		prev->lru_next = index;
560 	} else
561 		uc->lru_head = index;
562 
563 	curr->lru_next = 0;
564 	curr->lru_prev = uc->lru_tail;
565 	uc->lru_tail = index;
566 }
567 
uwsgi_cache_get2(struct uwsgi_cache * uc,char * key,uint16_t keylen,uint64_t * valsize)568 char *uwsgi_cache_get2(struct uwsgi_cache *uc, char *key, uint16_t keylen, uint64_t * valsize) {
569 
570 	uint64_t index = uwsgi_cache_get_index(uc, key, keylen);
571 
572 	if (index) {
573 		struct uwsgi_cache_item *uci = cache_item(index);
574 		if (uci->flags & UWSGI_CACHE_FLAG_UNGETTABLE)
575 			return NULL;
576 		*valsize = uci->valsize;
577 		if (uc->purge_lru) {
578 			lru_remove_item(uc, index);
579 			lru_add_item(uc, index);
580 		}
581 		uci->hits++;
582 		uc->hits++;
583 		return uc->data + (uci->first_block * uc->blocksize);
584 	}
585 
586 	uc->miss++;
587 
588 	return NULL;
589 }
590 
uwsgi_cache_num2(struct uwsgi_cache * uc,char * key,uint16_t keylen)591 int64_t uwsgi_cache_num2(struct uwsgi_cache *uc, char *key, uint16_t keylen) {
592 
593         uint64_t index = uwsgi_cache_get_index(uc, key, keylen);
594 
595         if (index) {
596                 struct uwsgi_cache_item *uci = cache_item(index);
597 		if (uci->flags & UWSGI_CACHE_FLAG_UNGETTABLE)
598                         return 0;
599                 uci->hits++;
600                 uc->hits++;
601 		int64_t *num = (int64_t *) (uc->data + (uci->first_block * uc->blocksize));
602 		return *num;
603         }
604 
605         uc->miss++;
606 	return 0;
607 }
608 
uwsgi_cache_get3(struct uwsgi_cache * uc,char * key,uint16_t keylen,uint64_t * valsize,uint64_t * expires)609 char *uwsgi_cache_get3(struct uwsgi_cache *uc, char *key, uint16_t keylen, uint64_t * valsize, uint64_t *expires) {
610 
611         uint64_t index = uwsgi_cache_get_index(uc, key, keylen);
612 
613         if (index) {
614                 struct uwsgi_cache_item *uci = cache_item(index);
615                 if (uci->flags & UWSGI_CACHE_FLAG_UNGETTABLE)
616                         return NULL;
617                 *valsize = uci->valsize;
618 		if (expires)
619 			*expires = uci->expires;
620 		if (uc->purge_lru) {
621 			lru_remove_item(uc, index);
622 			lru_add_item(uc, index);
623 		}
624                 uci->hits++;
625                 uc->hits++;
626                 return uc->data + (uci->first_block * uc->blocksize);
627         }
628 
629         uc->miss++;
630 
631         return NULL;
632 }
633 
uwsgi_cache_get4(struct uwsgi_cache * uc,char * key,uint16_t keylen,uint64_t * valsize,uint64_t * hits)634 char *uwsgi_cache_get4(struct uwsgi_cache *uc, char *key, uint16_t keylen, uint64_t * valsize, uint64_t *hits) {
635 
636         uint64_t index = uwsgi_cache_get_index(uc, key, keylen);
637 
638         if (index) {
639                 struct uwsgi_cache_item *uci = cache_item(index);
640                 if (uci->flags & UWSGI_CACHE_FLAG_UNGETTABLE)
641                         return NULL;
642                 *valsize = uci->valsize;
643                 if (hits)
644                         *hits = uci->hits;
645                 uci->hits++;
646                 uc->hits++;
647                 return uc->data + (uci->first_block * uc->blocksize);
648         }
649 
650         uc->miss++;
651 
652         return NULL;
653 }
654 
655 
uwsgi_cache_del2(struct uwsgi_cache * uc,char * key,uint16_t keylen,uint64_t index,uint16_t flags)656 int uwsgi_cache_del2(struct uwsgi_cache *uc, char *key, uint16_t keylen, uint64_t index, uint16_t flags) {
657 
658 
659 	struct uwsgi_cache_item *uci;
660 	int ret = -1;
661 
662 	if (!index) index = uwsgi_cache_get_index(uc, key, keylen);
663 
664 	if (index) {
665 		uci = cache_item(index);
666 		if (uci->keysize > 0) {
667 			// unmark blocks
668 			if (uc->blocks_bitmap) cache_unmark_blocks(uc, uci->first_block, uci->valsize);
669 			// put back the block in unused stack
670 			uc->unused_blocks_stack_ptr++;
671 			uc->unused_blocks_stack[uc->unused_blocks_stack_ptr] = index;
672 
673 			// unlink prev and next (if any)
674 			if (uci->prev) {
675                         	struct uwsgi_cache_item *ucii = cache_item(uci->prev);
676                         	ucii->next = uci->next;
677                 	}
678                 	else {
679                         	// set next as the new entry point (could be 0)
680                         	uc->hashtable[uci->hash % uc->hashsize] = uci->next;
681                 	}
682 
683                 	if (uci->next) {
684                         	struct uwsgi_cache_item *ucii = cache_item(uci->next);
685                         	ucii->prev = uci->prev;
686                 	}
687 
688                 	if (!uci->prev && !uci->next) {
689                         	// reset hashtable entry
690                         	uc->hashtable[uci->hash % uc->hashsize] = 0;
691                 	}
692 
693 			if (uc->purge_lru)
694 				lru_remove_item(uc, index);
695 
696 			uc->n_items--;
697 		}
698 
699 		ret = 0;
700 
701 		uci->keysize = 0;
702 		uci->valsize = 0;
703 		uci->hash = 0;
704 		uci->prev = 0;
705 		uci->next = 0;
706 		uci->expires = 0;
707 
708 		if (uc->use_last_modified) {
709 			uc->last_modified_at = uwsgi_now();
710 		}
711 	}
712 
713 	if (uc->nodes && ret == 0 && !(flags & UWSGI_CACHE_FLAG_LOCAL)) {
714                 cache_send_udp_command(uc, key, keylen, NULL, 0, 0, 11);
715         }
716 
717 	return ret;
718 }
719 
uwsgi_cache_fix(struct uwsgi_cache * uc)720 void uwsgi_cache_fix(struct uwsgi_cache *uc) {
721 
722 	uint64_t i;
723 	unsigned long long restored = 0;
724 	uint64_t next_scan = 0;
725 
726 	// reset unused blocks
727 	uc->unused_blocks_stack_ptr = 0;
728 
729 	for (i = 1; i < uc->max_items; i++) {
730 		// valid record ?
731 		struct uwsgi_cache_item *uci = cache_item(i);
732 		if (uci->keysize) {
733 			if (!uci->prev) {
734 				// put value in hash_table
735 				uc->hashtable[uci->hash % uc->hashsize] = i;
736 			}
737 			if (uci->expires && (!next_scan || next_scan > uci->expires)) {
738 				next_scan = uci->expires;
739 			}
740 			restored++;
741 		}
742 		else {
743 			// put this record in unused stack
744 			uc->unused_blocks_stack_ptr++;
745 			uc->unused_blocks_stack[uc->unused_blocks_stack_ptr] = i;
746 		}
747 	}
748 
749 	uc->next_scan = next_scan;
750 	uc->n_items = restored;
751 	uwsgi_log("[uwsgi-cache] restored %llu items\n", uc->n_items);
752 }
753 
uwsgi_cache_set2(struct uwsgi_cache * uc,char * key,uint16_t keylen,char * val,uint64_t vallen,uint64_t expires,uint64_t flags)754 int uwsgi_cache_set2(struct uwsgi_cache *uc, char *key, uint16_t keylen, char *val, uint64_t vallen, uint64_t expires, uint64_t flags) {
755 
756 	uint64_t index = 0, last_index = 0;
757 
758 	struct uwsgi_cache_item *uci, *ucii;
759 
760 	// used to reset key allocation in bitmap mode
761 
762 	int ret = -1;
763 	time_t now = 0;
764 
765 	if (!keylen || !vallen)
766 		return -1;
767 
768 	if (keylen > uc->keysize)
769 		return -1;
770 
771 	if (vallen > uc->max_item_size) return -1;
772 
773 	if ((flags & UWSGI_CACHE_FLAG_MATH) && vallen != 8) return -1;
774 
775 	//uwsgi_log("putting cache data in key %.*s %d\n", keylen, key, vallen);
776 	index = uwsgi_cache_get_index(uc, key, keylen);
777 	if (!index) {
778 		if (!uc->unused_blocks_stack_ptr) {
779 			cache_full(uc);
780 			if (!uc->unused_blocks_stack_ptr)
781 				goto end;
782 		}
783 
784 		index = uc->unused_blocks_stack[uc->unused_blocks_stack_ptr];
785 		uc->unused_blocks_stack_ptr--;
786 
787 		uci = cache_item(index);
788 		if (!uc->blocks_bitmap) {
789 			uci->first_block = index;
790 		}
791 		else {
792 			uci->first_block = uwsgi_cache_find_free_blocks(uc, vallen);
793 			if (uci->first_block == 0xffffffffffffffffLLU) {
794 				uc->unused_blocks_stack_ptr++;
795 				cache_full(uc);
796                                 goto end;
797 			}
798 			// mark used blocks;
799 			uint64_t needed_blocks = cache_mark_blocks(uc, uci->first_block, vallen);
800 			// optimize the scan
801 			if (uci->first_block + needed_blocks >= uc->blocks) {
802                         	uc->blocks_bitmap_pos = 0;
803                         }
804                         else {
805                         	uc->blocks_bitmap_pos = uci->first_block + needed_blocks;
806                         }
807 		}
808 		if (uc->purge_lru)
809 			lru_add_item(uc, index);
810 		else if (expires && !(flags & UWSGI_CACHE_FLAG_ABSEXPIRE)) {
811 			now = uwsgi_now();
812 			expires += now;
813 			if (!uc->next_scan || uc->next_scan > expires)
814 				uc->next_scan = expires;
815 		}
816 		uci->expires = expires;
817 		uci->hash = uc->hash->func(key, keylen);
818 		uci->hits = 0;
819 		uci->flags = flags;
820 		memcpy(uci->key, key, keylen);
821 
822 		if ( !(flags & UWSGI_CACHE_FLAG_MATH)) {
823 			memcpy(((char *) uc->data) + (uci->first_block * uc->blocksize), val, vallen);
824 		}
825 		// ok math operations here
826 		else {
827 			int64_t *num = (int64_t *)(((char *) uc->data) + (uci->first_block * uc->blocksize));
828 			*num = uc->math_initial;
829 			int64_t *delta = (int64_t *) val;
830 			if (flags & UWSGI_CACHE_FLAG_INC) {
831 				*num += *delta;
832 			}
833 			else if (flags & UWSGI_CACHE_FLAG_DEC) {
834 				*num -= *delta;
835 			}
836 			else if (flags & UWSGI_CACHE_FLAG_MUL) {
837 				*num *= *delta;
838 			}
839 			else if (flags & UWSGI_CACHE_FLAG_DIV) {
840 				if (*delta == 0) {
841 					*num = 0;
842 				}
843 				else {
844 					*num /= *delta;
845 				}
846 			}
847 		}
848 
849 		// set this as late as possibile (to reduce races risk)
850 
851 		uci->valsize = vallen;
852 		uci->keysize = keylen;
853 		ret = 0;
854 		// now put the value in the hashtable
855 		uint32_t slot = uci->hash % uc->hashsize;
856 		// reset values
857 		uci->prev = 0;
858 		uci->next = 0;
859 
860 		last_index = uc->hashtable[slot];
861 		if (last_index == 0) {
862 			uc->hashtable[slot] = index;
863 		}
864 		else {
865 			// append to first available next
866 			ucii = cache_item(last_index);
867 			while (ucii->next) {
868 				last_index = ucii->next;
869 				ucii = cache_item(last_index);
870 			}
871 			ucii->next = index;
872 			uci->prev = last_index;
873 		}
874 
875 		uc->n_items++ ;
876 	}
877 	else if (flags & UWSGI_CACHE_FLAG_UPDATE) {
878 		uci = cache_item(index);
879 		if (!(flags & UWSGI_CACHE_FLAG_FIXEXPIRE)) {
880 			if (uc->purge_lru) {
881 				lru_remove_item(uc, index);
882 				lru_add_item(uc, index);
883 			} else if (expires && !(flags & UWSGI_CACHE_FLAG_ABSEXPIRE)) {
884 				now = uwsgi_now();
885 				expires += now;
886 				if (!uc->next_scan || uc->next_scan > expires)
887 					uc->next_scan = expires;
888 			}
889 			uci->expires = expires;
890 		}
891 		if (uc->blocks_bitmap) {
892 			// we have a special case here, as we need to find a new series of free blocks
893 			uint64_t old_first_block = uci->first_block;
894 			uci->first_block = uwsgi_cache_find_free_blocks(uc, vallen);
895                         if (uci->first_block == 0xffffffffffffffffLLU) {
896 				uci->first_block = old_first_block;
897 				cache_full(uc);
898                                 goto end;
899                         }
900                         // mark used blocks;
901                         uint64_t needed_blocks = cache_mark_blocks(uc, uci->first_block, vallen);
902                         // optimize the scan
903                         if (uci->first_block + needed_blocks >= uc->blocks) {
904                                 uc->blocks_bitmap_pos = 0;
905                         }
906                         else {
907                                 uc->blocks_bitmap_pos = uci->first_block + needed_blocks;
908                         }
909 			// unmark the old blocks
910 			cache_unmark_blocks(uc, old_first_block, uci->valsize);
911 		}
912 		if ( !(flags & UWSGI_CACHE_FLAG_MATH)) {
913 			memcpy(((char *) uc->data) + (uci->first_block * uc->blocksize), val, vallen);
914 		}
915 		else {
916 			int64_t *num = (int64_t *)(((char *) uc->data) + (uci->first_block * uc->blocksize));
917                         int64_t *delta = (int64_t *) val;
918                         if (flags & UWSGI_CACHE_FLAG_INC) {
919                                 *num += *delta;
920                         }
921                         else if (flags & UWSGI_CACHE_FLAG_DEC) {
922                                 *num -= *delta;
923                         }
924                         else if (flags & UWSGI_CACHE_FLAG_MUL) {
925                                 *num *= *delta;
926                         }
927                         else if (flags & UWSGI_CACHE_FLAG_DIV) {
928                                 if (*delta == 0) {
929                                         *num = 0;
930                                 }
931                                 else {
932                                         *num /= *delta;
933                                 }
934                         }
935 		}
936 		uci->valsize = vallen;
937 		ret = 0;
938 	}
939 
940 	if (uc->use_last_modified) {
941 		uc->last_modified_at = (now ? now : uwsgi_now());
942 	}
943 
944 	if (uc->nodes && ret == 0 && !(flags & UWSGI_CACHE_FLAG_LOCAL)) {
945 		cache_send_udp_command(uc, key, keylen, val, vallen, expires, 10);
946 	}
947 
948 
949 end:
950 	return ret;
951 
952 }
953 
954 
cache_send_udp_command(struct uwsgi_cache * uc,char * key,uint16_t keylen,char * val,uint16_t vallen,uint64_t expires,uint8_t cmd)955 static void cache_send_udp_command(struct uwsgi_cache *uc, char *key, uint16_t keylen, char *val, uint16_t vallen, uint64_t expires, uint8_t cmd) {
956 
957 		struct uwsgi_header uh;
958 		uint8_t u_k[2];
959 		uint8_t u_v[2];
960 		uint8_t u_e[2];
961 		uint16_t vallen16 = vallen;
962 		struct iovec iov[7];
963 		struct msghdr mh;
964 
965 		memset(&mh, 0, sizeof(struct msghdr));
966 		mh.msg_iov = iov;
967 		mh.msg_iovlen = 3;
968 
969 		if (cmd == 10) {
970 			mh.msg_iovlen = 7;
971 		}
972 
973 		iov[0].iov_base = &uh;
974 		iov[0].iov_len = 4;
975 
976 		u_k[0] = (uint8_t) (keylen & 0xff);
977         	u_k[1] = (uint8_t) ((keylen >> 8) & 0xff);
978 
979 		iov[1].iov_base = u_k;
980 		iov[1].iov_len = 2;
981 
982 		iov[2].iov_base = key;
983 		iov[2].iov_len = keylen;
984 
985 		uh.pktsize = 2 + keylen;
986 
987 		if (cmd == 10) {
988 			u_v[0] = (uint8_t) (vallen16 & 0xff);
989         		u_v[1] = (uint8_t) ((vallen16 >> 8) & 0xff);
990 
991 			iov[3].iov_base = u_v;
992 			iov[3].iov_len = 2;
993 
994 			iov[4].iov_base = val;
995 			iov[4].iov_len = vallen16;
996 
997 			char es[sizeof(UMAX64_STR) + 1];
998         		uint16_t es_size = uwsgi_long2str2n(expires, es, sizeof(UMAX64_STR));
999 
1000 			u_e[0] = (uint8_t) (es_size & 0xff);
1001         		u_e[1] = (uint8_t) ((es_size >> 8) & 0xff);
1002 
1003 			iov[5].iov_base = u_e;
1004                 	iov[5].iov_len = 2;
1005 
1006                 	iov[6].iov_base = es;
1007                 	iov[6].iov_len = es_size;
1008 
1009 			uh.pktsize += 2 + vallen16 + 2 + es_size;
1010 		}
1011 
1012 		uh.modifier1 = 111;
1013 		uh.modifier2 = cmd;
1014 
1015 		struct uwsgi_string_list *usl = uc->nodes;
1016 		while(usl) {
1017 			mh.msg_name = usl->custom_ptr;
1018 			mh.msg_namelen = usl->custom;
1019 			if (sendmsg(uc->udp_node_socket, &mh, 0) <= 0) {
1020 				uwsgi_error("[cache-udp-node] sendmsg()");
1021 			}
1022 			usl = usl->next;
1023 		}
1024 
1025 }
1026 
cache_udp_server_loop(void * ucache)1027 void *cache_udp_server_loop(void *ucache) {
1028         // block all signals
1029         sigset_t smask;
1030         sigfillset(&smask);
1031         pthread_sigmask(SIG_BLOCK, &smask, NULL);
1032 
1033 	struct uwsgi_cache *uc = (struct uwsgi_cache *) ucache;
1034 
1035         int queue = event_queue_init();
1036         struct uwsgi_string_list *usl = uc->udp_servers;
1037         while(usl) {
1038                 if (strchr(usl->value, ':')) {
1039                         int fd = bind_to_udp(usl->value, 0, 0);
1040                         if (fd < 0) {
1041                                 uwsgi_log("[cache-udp-server] cannot bind to %s\n", usl->value);
1042                                 exit(1);
1043                         }
1044                         uwsgi_socket_nb(fd);
1045                         event_queue_add_fd_read(queue, fd);
1046                         uwsgi_log("*** udp server for cache \"%s\" running on %s ***\n", uc->name, usl->value);
1047                 }
1048                 usl = usl->next;
1049         }
1050 
1051         // allocate 64k chunk to receive messages
1052         char *buf = uwsgi_malloc(UMAX16);
1053 
1054 	for(;;) {
1055                 uint16_t pktsize = 0, ss = 0;
1056                 int interesting_fd = -1;
1057                 int rlen = event_queue_wait(queue, -1, &interesting_fd);
1058                 if (rlen <= 0) continue;
1059                 if (interesting_fd < 0) continue;
1060                 ssize_t len = read(interesting_fd, buf, UMAX16);
1061                 if (len <= 7) {
1062                         uwsgi_error("[cache-udp-server] read()");
1063                 }
1064                 if (buf[0] != 111) continue;
1065                 memcpy(&pktsize, buf+1, 2);
1066                 if (pktsize != len-4) continue;
1067 
1068                 memcpy(&ss, buf + 4, 2);
1069                 if (4+ss > pktsize) continue;
1070                 uint16_t keylen = ss;
1071                 char *key = buf + 6;
1072 
1073                 // cache set/update
1074                 if (buf[3] == 10) {
1075                         if (keylen + 2 + 2 > pktsize) continue;
1076                         memcpy(&ss, buf + 6 + keylen, 2);
1077                         if (4+keylen+ss > pktsize) continue;
1078                         uint16_t vallen = ss;
1079                         char *val = buf + 8 + keylen;
1080                         uint64_t expires = 0;
1081                         if (2 + keylen + 2 + vallen + 2 < pktsize) {
1082                                 memcpy(&ss, buf + 8 + keylen + vallen , 2);
1083                                 if (6+keylen+vallen+ss > pktsize) continue;
1084                                 expires = uwsgi_str_num(buf + 10 + keylen+vallen, ss);
1085                         }
1086                         uwsgi_wlock(uc->lock);
1087                         if (uwsgi_cache_set2(uc, key, keylen, val, vallen, expires, UWSGI_CACHE_FLAG_UPDATE|UWSGI_CACHE_FLAG_LOCAL|UWSGI_CACHE_FLAG_ABSEXPIRE)) {
1088                                 uwsgi_log("[cache-udp-server] unable to update cache\n");
1089                         }
1090                         uwsgi_rwunlock(uc->lock);
1091                 }
1092                 // cache del
1093                 else if (buf[3] == 11) {
1094                         uwsgi_wlock(uc->lock);
1095                         if (uwsgi_cache_del2(uc, key, keylen, 0, UWSGI_CACHE_FLAG_LOCAL)) {
1096                                 uwsgi_log("[cache-udp-server] unable to update cache\n");
1097                         }
1098                         uwsgi_rwunlock(uc->lock);
1099                 }
1100         }
1101 
1102         return NULL;
1103 }
1104 
cache_sweeper_free_items(struct uwsgi_cache * uc)1105 static uint64_t cache_sweeper_free_items(struct uwsgi_cache *uc) {
1106 	uint64_t i;
1107 	uint64_t freed_items = 0;
1108 
1109 	if (uc->no_expire || uc->purge_lru || uc->lazy_expire)
1110 		return 0;
1111 
1112 	uwsgi_rlock(uc->lock);
1113 	if (!uc->next_scan || uc->next_scan > (uint64_t)uwsgi.current_time) {
1114 		uwsgi_rwunlock(uc->lock);
1115 		return 0;
1116 	}
1117 	uwsgi_rwunlock(uc->lock);
1118 
1119 	// skip the first slot
1120 	for (i = 1; i < uc->max_items; i++) {
1121 		struct uwsgi_cache_item *uci = cache_item(i);
1122 
1123 		uwsgi_wlock(uc->lock);
1124 		// we reset next scan time first, then we find the least
1125 		// expiration time from those that are NOT expired yet.
1126 		if (i == 1)
1127 			uc->next_scan = 0;
1128 
1129 		if (uci->expires) {
1130 			if (uci->expires <= (uint64_t)uwsgi.current_time) {
1131 				uwsgi_cache_del2(uc, NULL, 0, i, UWSGI_CACHE_FLAG_LOCAL);
1132 				freed_items++;
1133 			} else if (!uc->next_scan || uc->next_scan > uci->expires) {
1134 				uc->next_scan = uci->expires;
1135 			}
1136 		}
1137 		uwsgi_rwunlock(uc->lock);
1138 	}
1139 
1140 	return freed_items;
1141 }
1142 
cache_sweeper_loop(void * ucache)1143 static void *cache_sweeper_loop(void *ucache) {
1144 
1145         // block all signals
1146         sigset_t smask;
1147         sigfillset(&smask);
1148         pthread_sigmask(SIG_BLOCK, &smask, NULL);
1149 
1150         if (!uwsgi.cache_expire_freq)
1151                 uwsgi.cache_expire_freq = 3;
1152 
1153         // remove expired cache items TODO use rb_tree timeouts
1154         for (;;) {
1155 		struct uwsgi_cache *uc;
1156 
1157 		for (uc = (struct uwsgi_cache *)ucache; uc; uc = uc->next) {
1158 			uint64_t freed_items = cache_sweeper_free_items(uc);
1159 			if (uwsgi.cache_report_freed_items && freed_items)
1160 				uwsgi_log("freed %llu items for cache \"%s\"\n", (unsigned long long)freed_items, uc->name);
1161 		}
1162 
1163 		sleep(uwsgi.cache_expire_freq);
1164         }
1165 
1166         return NULL;
1167 }
1168 
uwsgi_cache_sync_all()1169 void uwsgi_cache_sync_all() {
1170 
1171 	struct uwsgi_cache *uc = uwsgi.caches;
1172 	while(uc) {
1173 		if (uc->store && (uwsgi.master_cycles == 0 || (uc->store_sync > 0 && (uwsgi.master_cycles % uc->store_sync) == 0))) {
1174                 	if (msync(uc->items, uc->filesize, MS_ASYNC)) {
1175                         	uwsgi_error("uwsgi_cache_sync_all()/msync()");
1176                         }
1177 		}
1178 		uc = uc->next;
1179 	}
1180 }
1181 
uwsgi_cache_start_sweepers()1182 void uwsgi_cache_start_sweepers() {
1183 	struct uwsgi_cache *uc = uwsgi.caches;
1184 
1185 	if (uwsgi.cache_no_expire)
1186 		return;
1187 
1188 	int need_to_run = 0;
1189 	while(uc) {
1190 		if (!uc->no_expire && !uc->purge_lru && !uc->lazy_expire) {
1191 			need_to_run = 1;
1192 			break;
1193 		}
1194 		uc = uc->next;
1195         }
1196 
1197 	if (!need_to_run) return;
1198 
1199 	pthread_t cache_sweeper;
1200         if (pthread_create(&cache_sweeper, NULL, cache_sweeper_loop, uwsgi.caches)) {
1201         	uwsgi_error("uwsgi_cache_start_sweepers()/pthread_create()");
1202                 uwsgi_log("unable to run the cache sweeper!!!\n");
1203 		return;
1204 	}
1205         uwsgi_log("cache sweeper thread enabled\n");
1206 }
1207 
uwsgi_cache_start_sync_servers()1208 void uwsgi_cache_start_sync_servers() {
1209 
1210 	struct uwsgi_cache *uc = uwsgi.caches;
1211 	while(uc) {
1212 		if (!uc->udp_servers) goto next;
1213 		pthread_t cache_udp_server;
1214                 if (pthread_create(&cache_udp_server, NULL, cache_udp_server_loop, (void *) uc)) {
1215                         uwsgi_error("pthread_create()");
1216                         uwsgi_log("unable to run the cache udp server !!!\n");
1217                 }
1218                 else {
1219                         uwsgi_log("udp server thread enabled for cache \"%s\"\n", uc->name);
1220                 }
1221 next:
1222 		uc = uc->next;
1223         }
1224 }
1225 
uwsgi_cache_create(char * arg)1226 struct uwsgi_cache *uwsgi_cache_create(char *arg) {
1227 	struct uwsgi_cache *old_uc = NULL, *uc = uwsgi.caches;
1228 	while(uc) {
1229 		old_uc = uc;
1230 		uc = uc->next;
1231 	}
1232 
1233 	uc = uwsgi_calloc_shared(sizeof(struct uwsgi_cache));
1234 	if (old_uc) {
1235 		old_uc->next = uc;
1236 	}
1237 	else {
1238 		uwsgi.caches = uc;
1239 	}
1240 
1241 	// default (old-stye) cache ?
1242 	if (!arg) {
1243 		uc->name = "default";
1244 		uc->name_len = strlen(uc->name);
1245 		uc->blocksize = uwsgi.cache_blocksize;
1246 		if (!uc->blocksize) uc->blocksize = UMAX16;
1247 		uc->max_item_size = uc->blocksize;
1248 		uc->max_items = uwsgi.cache_max_items;
1249 		uc->blocks = uwsgi.cache_max_items;
1250 		uc->keysize = 2048;
1251 		uc->hashsize = UMAX16;
1252 		uc->hash = uwsgi_hash_algo_get("djb33x");
1253 		uc->store = uwsgi.cache_store;
1254 		uc->nodes = uwsgi.cache_udp_node;
1255 		uc->udp_servers = uwsgi.cache_udp_server;
1256 		uc->store_sync = uwsgi.cache_store_sync;
1257 		uc->use_last_modified = (uint8_t) uwsgi.cache_use_last_modified;
1258 
1259 		if (uwsgi.cache_sync) {
1260 			uwsgi_string_new_list(&uc->sync_nodes, uwsgi.cache_sync);
1261 		}
1262 	}
1263 	else {
1264 		char *c_name = NULL;
1265 		char *c_max_items = NULL;
1266 		char *c_blocksize = NULL;
1267 		char *c_blocks = NULL;
1268 		char *c_hash = NULL;
1269 		char *c_hashsize = NULL;
1270 		char *c_keysize = NULL;
1271 		char *c_store = NULL;
1272 		char *c_store_sync = NULL;
1273 		char *c_store_delete = NULL;
1274 		char *c_nodes = NULL;
1275 		char *c_sync = NULL;
1276 		char *c_udp_servers = NULL;
1277 		char *c_bitmap = NULL;
1278 		char *c_use_last_modified = NULL;
1279 		char *c_math_initial = NULL;
1280 		char *c_ignore_full = NULL;
1281 		char *c_purge_lru = NULL;
1282 		char *c_lazy_expire = NULL;
1283 		char *c_sweep_on_full = NULL;
1284 		char *c_clear_on_full = NULL;
1285 		char *c_no_expire = NULL;
1286 
1287 		if (uwsgi_kvlist_parse(arg, strlen(arg), ',', '=',
1288                         "name", &c_name,
1289                         "max_items", &c_max_items,
1290                         "maxitems", &c_max_items,
1291                         "items", &c_max_items,
1292                         "blocksize", &c_blocksize,
1293                         "blocks", &c_blocks,
1294                         "hash", &c_hash,
1295                         "hashsize", &c_hashsize,
1296                         "hash_size", &c_hashsize,
1297                         "keysize", &c_keysize,
1298                         "key_size", &c_keysize,
1299                         "store", &c_store,
1300                         "store_sync", &c_store_sync,
1301                         "storesync", &c_store_sync,
1302                         "store_delete", &c_store_delete,
1303                         "storedelete", &c_store_delete,
1304                         "node", &c_nodes,
1305                         "nodes", &c_nodes,
1306                         "sync", &c_sync,
1307                         "udp", &c_udp_servers,
1308                         "udp_servers", &c_udp_servers,
1309                         "udp_server", &c_udp_servers,
1310                         "udpservers", &c_udp_servers,
1311                         "udpserver", &c_udp_servers,
1312                         "bitmap", &c_bitmap,
1313                         "lastmod", &c_use_last_modified,
1314                         "math_initial", &c_math_initial,
1315                         "ignore_full", &c_ignore_full,
1316 			"purge_lru", &c_purge_lru,
1317 			"lru", &c_purge_lru,
1318 			"lazy_expire", &c_lazy_expire,
1319 			"lazy", &c_lazy_expire,
1320 			"sweep_on_full", &c_sweep_on_full,
1321 			"clear_on_full", &c_clear_on_full,
1322 			"no_expire", &c_no_expire,
1323                 	NULL)) {
1324 			uwsgi_log("unable to parse cache definition\n");
1325 			exit(1);
1326         	}
1327 		if (!c_name) {
1328 			uwsgi_log("you have to specify a cache name\n");
1329 			exit(1);
1330 		}
1331 		if (!c_max_items) {
1332 			uwsgi_log("you have to specify the maximum number of cache items\n");
1333 			exit(1);
1334 		}
1335 
1336 		uc->name = c_name;
1337 		uc->name_len = strlen(c_name);
1338 		uc->max_items = uwsgi_n64(c_max_items);
1339 		if (!uc->max_items) {
1340 			uwsgi_log("you have to specify the maximum number of cache items\n");
1341 			exit(1);
1342 		}
1343 
1344 		// defaults
1345 		uc->blocks = uc->max_items;
1346 		uc->blocksize = UMAX16;
1347 		uc->keysize = 2048;
1348 		uc->hashsize = UMAX16;
1349 		uc->hash = uwsgi_hash_algo_get("djb33x");
1350 
1351 		// customize
1352 		if (c_blocksize) uc->blocksize = uwsgi_n64(c_blocksize);
1353 		if (!uc->blocksize) { uwsgi_log("invalid cache blocksize for \"%s\"\n", uc->name); exit(1); }
1354 		// set the true max size of an item
1355 		uc->max_item_size = uc->blocksize;
1356 
1357 		if (c_blocks) uc->blocks = uwsgi_n64(c_blocks);
1358 		if (!uc->blocks) { uwsgi_log("invalid cache blocks for \"%s\"\n", uc->name); exit(1); }
1359 		if (c_hash) uc->hash = uwsgi_hash_algo_get(c_hash);
1360 		if (!uc->hash) { uwsgi_log("invalid cache hash for \"%s\"\n", uc->name); exit(1); }
1361 		if (c_hashsize) uc->hashsize = uwsgi_n64(c_hashsize);
1362 		if (!uc->hashsize) { uwsgi_log("invalid cache hashsize for \"%s\"\n", uc->name); exit(1); }
1363 		if (c_keysize) uc->keysize = uwsgi_n64(c_keysize);
1364 		if (!uc->keysize || uc->keysize >= UMAX16) { uwsgi_log("invalid cache keysize for \"%s\"\n", uc->name); exit(1); }
1365 		if (c_bitmap) {
1366 			uc->use_blocks_bitmap = 1;
1367 			uc->max_item_size = uc->blocksize * uc->blocks;
1368 		}
1369 		if (c_use_last_modified) uc->use_last_modified = 1;
1370 		if (c_ignore_full) uc->ignore_full = 1;
1371 
1372 		if (c_store_delete) uc->store_delete = 1;
1373 
1374 		if (c_math_initial) uc->math_initial = strtol(c_math_initial, NULL, 10);
1375 
1376 		if (c_lazy_expire) uc->lazy_expire = 1;
1377 		if (c_sweep_on_full) {
1378 			uc->sweep_on_full = uwsgi_n64(c_sweep_on_full);
1379 		}
1380 		if (c_clear_on_full) uc->clear_on_full = 1;
1381 		if (c_no_expire) uc->no_expire = 1;
1382 
1383 		uc->store_sync = uwsgi.cache_store_sync;
1384 		if (c_store_sync) { uc->store_sync = uwsgi_n64(c_store_sync); }
1385 
1386 		if (uc->blocks < uc->max_items) {
1387 			uwsgi_log("invalid number of cache blocks for \"%s\", must be higher than max_items (%llu)\n", uc->name, uc->max_items);
1388 			exit(1);
1389 		}
1390 
1391 		uc->store = c_store;
1392 
1393 		if (c_nodes) {
1394 			char *p, *ctx = NULL;
1395 			uwsgi_foreach_token(c_nodes, ";", p, ctx) {
1396 				uwsgi_string_new_list(&uc->nodes, p);
1397 			}
1398 		}
1399 
1400 		if (c_sync) {
1401 			char *p, *ctx = NULL;
1402 			uwsgi_foreach_token(c_sync, ";", p, ctx) {
1403                                 uwsgi_string_new_list(&uc->sync_nodes, p);
1404                         }
1405 		}
1406 
1407 		if (c_udp_servers) {
1408 			char *p, *ctx = NULL;
1409                         uwsgi_foreach_token(c_udp_servers, ";", p, ctx) {
1410                                 uwsgi_string_new_list(&uc->udp_servers, p);
1411                         }
1412                 }
1413 
1414 		if (c_purge_lru)
1415 			uc->purge_lru = 1;
1416 	}
1417 
1418 	uwsgi_cache_init(uc);
1419 	return uc;
1420 }
1421 
uwsgi_cache_by_name(char * name)1422 struct uwsgi_cache *uwsgi_cache_by_name(char *name) {
1423 	struct uwsgi_cache *uc = uwsgi.caches;
1424 	if (!name || *name == 0) {
1425 		return uwsgi.caches;
1426 	}
1427 	while(uc) {
1428 		if (uc->name && !strcmp(uc->name, name)) {
1429 			return uc;
1430 		}
1431 		uc = uc->next;
1432 	}
1433 	return NULL;
1434 }
1435 
uwsgi_cache_by_namelen(char * name,uint16_t len)1436 struct uwsgi_cache *uwsgi_cache_by_namelen(char *name, uint16_t len) {
1437         struct uwsgi_cache *uc = uwsgi.caches;
1438         if (!name || *name == 0) {
1439                 return uwsgi.caches;
1440         }
1441         while(uc) {
1442                 if (uc->name && !uwsgi_strncmp(uc->name, uc->name_len, name, len)) {
1443                         return uc;
1444                 }
1445                 uc = uc->next;
1446         }
1447         return NULL;
1448 }
1449 
uwsgi_cache_create_all()1450 void uwsgi_cache_create_all() {
1451 
1452 	if (uwsgi.cache_setup) return;
1453 
1454 	// register embedded hash algorithms
1455         uwsgi_hash_algo_register_all();
1456 
1457         // setup default cache
1458         if (uwsgi.cache_max_items > 0) {
1459                 uwsgi_cache_create(NULL);
1460         }
1461 
1462         // setup new generation caches
1463         struct uwsgi_string_list *usl = uwsgi.cache2;
1464         while(usl) {
1465                 uwsgi_cache_create(usl->value);
1466                 usl = usl->next;
1467         }
1468 
1469 	uwsgi.cache_setup = 1;
1470 }
1471 
1472 /*
1473  * uWSGI cache magic functions. They can be used by plugin to easily access local and remote caches
1474  *
1475  * they generate (when needed) a new memory buffer. Locking is automatically managed
1476  *
1477  * You have to free the returned memory !!!
1478  *
1479  */
1480 
uwsgi_cache_magic_context_hook(char * key,uint16_t key_len,char * value,uint16_t vallen,void * data)1481 void uwsgi_cache_magic_context_hook(char *key, uint16_t key_len, char *value, uint16_t vallen, void *data) {
1482 	struct uwsgi_cache_magic_context *ucmc = (struct uwsgi_cache_magic_context *) data;
1483 
1484 	if (!uwsgi_strncmp(key, key_len, "cmd", 3)) {
1485 		ucmc->cmd = value;
1486 		ucmc->cmd_len = vallen;
1487 		return;
1488 	}
1489 
1490 	if (!uwsgi_strncmp(key, key_len, "key", 3)) {
1491                 ucmc->key = value;
1492                 ucmc->key_len = vallen;
1493                 return;
1494         }
1495 
1496 	if (!uwsgi_strncmp(key, key_len, "expires", 7)) {
1497                 ucmc->expires = uwsgi_str_num(value, vallen);
1498                 return;
1499         }
1500 
1501 	if (!uwsgi_strncmp(key, key_len, "size", 4)) {
1502                 ucmc->size = uwsgi_str_num(value, vallen);
1503                 return;
1504         }
1505 
1506 	if (!uwsgi_strncmp(key, key_len, "cache", 5)) {
1507                 ucmc->cache = value;
1508                 ucmc->cache_len = vallen;
1509                 return;
1510         }
1511 
1512 	if (!uwsgi_strncmp(key, key_len, "status", 6)) {
1513                 ucmc->status = value;
1514                 ucmc->status_len = vallen;
1515                 return;
1516         }
1517 }
1518 
uwsgi_cache_prepare_magic_get(char * cache_name,uint16_t cache_name_len,char * key,uint16_t key_len)1519 static struct uwsgi_buffer *uwsgi_cache_prepare_magic_get(char *cache_name, uint16_t cache_name_len, char *key, uint16_t key_len) {
1520 	struct uwsgi_buffer *ub = uwsgi_buffer_new(uwsgi.page_size);
1521 	ub->pos = 4;
1522 
1523 	if (uwsgi_buffer_append_keyval(ub, "cmd", 3, "get", 3)) goto error;
1524 	if (uwsgi_buffer_append_keyval(ub, "key", 3, key, key_len)) goto error;
1525 	if (cache_name) {
1526 		if (uwsgi_buffer_append_keyval(ub, "cache", 5, cache_name, cache_name_len)) goto error;
1527 	}
1528 
1529 	return ub;
1530 error:
1531 	uwsgi_buffer_destroy(ub);
1532 	return NULL;
1533 }
1534 
uwsgi_cache_prepare_magic_exists(char * cache_name,uint16_t cache_name_len,char * key,uint16_t key_len)1535 struct uwsgi_buffer *uwsgi_cache_prepare_magic_exists(char *cache_name, uint16_t cache_name_len, char *key, uint16_t key_len) {
1536         struct uwsgi_buffer *ub = uwsgi_buffer_new(uwsgi.page_size);
1537 	ub->pos = 4;
1538 
1539         if (uwsgi_buffer_append_keyval(ub, "cmd", 3, "exists", 6)) goto error;
1540         if (uwsgi_buffer_append_keyval(ub, "key", 3, key, key_len)) goto error;
1541         if (cache_name) {
1542                 if (uwsgi_buffer_append_keyval(ub, "cache", 5, cache_name, cache_name_len)) goto error;
1543         }
1544 
1545         return ub;
1546 error:
1547         uwsgi_buffer_destroy(ub);
1548         return NULL;
1549 }
1550 
uwsgi_cache_prepare_magic_del(char * cache_name,uint16_t cache_name_len,char * key,uint16_t key_len)1551 struct uwsgi_buffer *uwsgi_cache_prepare_magic_del(char *cache_name, uint16_t cache_name_len, char *key, uint16_t key_len) {
1552         struct uwsgi_buffer *ub = uwsgi_buffer_new(uwsgi.page_size);
1553 	ub->pos = 4;
1554 
1555         if (uwsgi_buffer_append_keyval(ub, "cmd", 3, "del", 3)) goto error;
1556         if (uwsgi_buffer_append_keyval(ub, "key", 3, key, key_len)) goto error;
1557         if (cache_name) {
1558                 if (uwsgi_buffer_append_keyval(ub, "cache", 5, cache_name, cache_name_len)) goto error;
1559         }
1560 
1561         return ub;
1562 error:
1563         uwsgi_buffer_destroy(ub);
1564         return NULL;
1565 }
1566 
uwsgi_cache_prepare_magic_clear(char * cache_name,uint16_t cache_name_len)1567 struct uwsgi_buffer *uwsgi_cache_prepare_magic_clear(char *cache_name, uint16_t cache_name_len) {
1568         struct uwsgi_buffer *ub = uwsgi_buffer_new(uwsgi.page_size);
1569         ub->pos = 4;
1570 
1571         if (uwsgi_buffer_append_keyval(ub, "cmd", 3, "clear", 5)) goto error;
1572         if (cache_name) {
1573                 if (uwsgi_buffer_append_keyval(ub, "cache", 5, cache_name, cache_name_len)) goto error;
1574         }
1575 
1576         return ub;
1577 error:
1578         uwsgi_buffer_destroy(ub);
1579         return NULL;
1580 }
1581 
1582 
uwsgi_cache_prepare_magic_set(char * cache_name,uint16_t cache_name_len,char * key,uint16_t key_len,uint64_t len,uint64_t expires)1583 struct uwsgi_buffer *uwsgi_cache_prepare_magic_set(char *cache_name, uint16_t cache_name_len, char *key, uint16_t key_len, uint64_t len, uint64_t expires) {
1584         struct uwsgi_buffer *ub = uwsgi_buffer_new(uwsgi.page_size);
1585 	ub->pos = 4;
1586 
1587         if (uwsgi_buffer_append_keyval(ub, "cmd", 3, "set", 3)) goto error;
1588         if (uwsgi_buffer_append_keyval(ub, "key", 3, key, key_len)) goto error;
1589         if (uwsgi_buffer_append_keynum(ub, "size", 4, len)) goto error;
1590 	if (expires > 0) {
1591 		if (uwsgi_buffer_append_keynum(ub, "expires", 7, expires)) goto error;
1592 	}
1593         if (uwsgi_buffer_append_keynum(ub, "size", 4, len)) goto error;
1594         if (cache_name) {
1595                 if (uwsgi_buffer_append_keyval(ub, "cache", 5, cache_name, cache_name_len)) goto error;
1596         }
1597 
1598         return ub;
1599 error:
1600         uwsgi_buffer_destroy(ub);
1601         return NULL;
1602 }
1603 
uwsgi_cache_prepare_magic_update(char * cache_name,uint16_t cache_name_len,char * key,uint16_t key_len,uint64_t len,uint64_t expires)1604 struct uwsgi_buffer *uwsgi_cache_prepare_magic_update(char *cache_name, uint16_t cache_name_len, char *key, uint16_t key_len, uint64_t len, uint64_t expires) {
1605         struct uwsgi_buffer *ub = uwsgi_buffer_new(uwsgi.page_size);
1606 	ub->pos = 4;
1607 
1608         if (uwsgi_buffer_append_keyval(ub, "cmd", 3, "update", 6)) goto error;
1609         if (uwsgi_buffer_append_keyval(ub, "key", 3, key, key_len)) goto error;
1610         if (uwsgi_buffer_append_keynum(ub, "size", 4, len)) goto error;
1611         if (expires > 0) {
1612                 if (uwsgi_buffer_append_keynum(ub, "expires", 7, expires)) goto error;
1613         }
1614         if (uwsgi_buffer_append_keynum(ub, "size", 4, len)) goto error;
1615         if (cache_name) {
1616                 if (uwsgi_buffer_append_keyval(ub, "cache", 5, cache_name, cache_name_len)) goto error;
1617         }
1618 
1619         return ub;
1620 error:
1621         uwsgi_buffer_destroy(ub);
1622         return NULL;
1623 }
1624 
cache_magic_send_and_manage(int fd,struct uwsgi_buffer * ub,char * stream,uint64_t stream_len,int timeout,struct uwsgi_cache_magic_context * ucmc)1625 static int cache_magic_send_and_manage(int fd, struct uwsgi_buffer *ub, char *stream, uint64_t stream_len, int timeout, struct uwsgi_cache_magic_context *ucmc) {
1626 	if (uwsgi_buffer_set_uh(ub, 111, 17)) return -1;
1627 
1628 	if (stream) {
1629 		if (uwsgi_buffer_append(ub, stream, stream_len)) return -1;
1630 	}
1631 
1632 	if (uwsgi_write_true_nb(fd, ub->buf, ub->pos, timeout)) return -1;
1633 
1634 	// ok now wait for the response, using the same buffer of the request
1635 	// NOTE: after using a uwsgi_buffer in that way we basically destroy (even if we can safely free it)
1636 	size_t rlen = ub->pos;
1637 	if (uwsgi_read_with_realloc(fd, &ub->buf, &rlen, timeout, NULL, NULL)) return -1;
1638 	// try to fix the buffer to maintain size info
1639 	ub->pos = rlen;
1640 
1641 	// now we have a uwsgi dictionary with all of the options needed, let's parse it
1642 	memset(ucmc, 0, sizeof(struct uwsgi_cache_magic_context));
1643 	if (uwsgi_hooked_parse(ub->buf, rlen, uwsgi_cache_magic_context_hook, ucmc)) return -1;
1644 	return 0;
1645 }
1646 
uwsgi_cache_magic_get(char * key,uint16_t keylen,uint64_t * vallen,uint64_t * expires,char * cache)1647 char *uwsgi_cache_magic_get(char *key, uint16_t keylen, uint64_t *vallen, uint64_t *expires, char *cache) {
1648 	struct uwsgi_cache_magic_context ucmc;
1649 	struct uwsgi_cache *uc = NULL;
1650 	char *cache_server = NULL;
1651 	char *cache_name = NULL;
1652 	uint16_t cache_name_len = 0;
1653 	if (cache) {
1654 		char *at = strchr(cache, '@');
1655 		if (!at) {
1656 			uc = uwsgi_cache_by_name(cache);
1657 		}
1658 		else {
1659 			cache_server = at + 1;
1660 			cache_name = cache;
1661 			cache_name_len = at - cache;
1662 		}
1663 	}
1664 	// use default (local) cache
1665 	else {
1666 		uc = uwsgi.caches;
1667 	}
1668 
1669 	// we have a local cache !!!
1670 	if (uc) {
1671 		if (uc->purge_lru)
1672 			uwsgi_wlock(uc->lock);
1673 		else
1674 			uwsgi_rlock(uc->lock);
1675 		char *value = uwsgi_cache_get3(uc, key, keylen, vallen, expires);
1676 		if (!value) {
1677 			uwsgi_rwunlock(uc->lock);
1678 			return NULL;
1679 		}
1680 		char *buf = uwsgi_malloc(*vallen);
1681 		memcpy(buf, value, *vallen);
1682 		uwsgi_rwunlock(uc->lock);
1683 		return buf;
1684 	}
1685 
1686 	// we have a remote one
1687 	if (cache_server) {
1688 		int fd = uwsgi_connect(cache_server, 0, 1);
1689 		if (fd < 0) return NULL;
1690 
1691 		int ret = uwsgi.wait_write_hook(fd, uwsgi.socket_timeout);
1692 		if (ret <= 0) {
1693 			close(fd);
1694 			return NULL;
1695 		}
1696 
1697 		struct uwsgi_buffer *ub = uwsgi_cache_prepare_magic_get(cache_name, cache_name_len, key, keylen);
1698 		if (!ub) {
1699 			close(fd);
1700 			return NULL;
1701 		}
1702 
1703 		if (cache_magic_send_and_manage(fd, ub, NULL, 0, uwsgi.socket_timeout, &ucmc)) {
1704 			close(fd);
1705                         uwsgi_buffer_destroy(ub);
1706                         return NULL;
1707 		}
1708 
1709 		if (uwsgi_strncmp(ucmc.status, ucmc.status_len, "ok", 2)) {
1710 			close(fd);
1711                         uwsgi_buffer_destroy(ub);
1712                         return NULL;
1713 		}
1714 
1715 		if (ucmc.size == 0) {
1716 			close(fd);
1717                         uwsgi_buffer_destroy(ub);
1718                         return NULL;
1719 		}
1720 
1721 		// ok we now need to fix our buffer (if needed)
1722 		if (ucmc.size > ub->pos) {
1723 			char *tmp_buf = realloc(ub->buf, ucmc.size);
1724 			if (!tmp_buf) {
1725 				uwsgi_error("uwsgi_cache_magic_get()/realloc()");
1726 				close(fd);
1727 				uwsgi_buffer_destroy(ub);
1728 				return NULL;
1729 			}
1730 			ub->buf = tmp_buf;
1731 		}
1732 
1733 		// read the raw value from the socket
1734 		if (uwsgi_read_whole_true_nb(fd, ub->buf, ucmc.size, uwsgi.socket_timeout)) {
1735 			close(fd);
1736 			uwsgi_buffer_destroy(ub);
1737 			return NULL;
1738 		}
1739 
1740 		// now the magic, we dereference the internal buffer and return it to the caller
1741 		close(fd);
1742 		char *value = ub->buf;
1743 		ub->buf = NULL;
1744 		uwsgi_buffer_destroy(ub);
1745 		*vallen = ucmc.size;
1746 		if (expires) {
1747 			*expires = ucmc.expires;
1748 		}
1749 		return value;
1750 
1751 	}
1752 
1753 	return NULL;
1754 }
1755 
uwsgi_cache_magic_exists(char * key,uint16_t keylen,char * cache)1756 int uwsgi_cache_magic_exists(char *key, uint16_t keylen, char *cache) {
1757         struct uwsgi_cache_magic_context ucmc;
1758         struct uwsgi_cache *uc = NULL;
1759         char *cache_server = NULL;
1760         char *cache_name = NULL;
1761         uint16_t cache_name_len = 0;
1762         if (cache) {
1763                 char *at = strchr(cache, '@');
1764                 if (!at) {
1765                         uc = uwsgi_cache_by_name(cache);
1766                 }
1767                 else {
1768                         cache_server = at + 1;
1769                         cache_name = cache;
1770                         cache_name_len = at - cache;
1771                 }
1772         }
1773         // use default (local) cache
1774         else {
1775                 uc = uwsgi.caches;
1776         }
1777 
1778         // we have a local cache !!!
1779         if (uc) {
1780                 uwsgi_rlock(uc->lock);
1781                 if (!uwsgi_cache_exists2(uc, key, keylen)) {
1782                         uwsgi_rwunlock(uc->lock);
1783                         return 0;
1784                 }
1785 		uwsgi_rwunlock(uc->lock);
1786 		return 1;
1787         }
1788 
1789 	// we have a remote one
1790         if (cache_server) {
1791                 int fd = uwsgi_connect(cache_server, 0, 1);
1792                 if (fd < 0) return 0;
1793 
1794                 int ret = uwsgi.wait_write_hook(fd, uwsgi.socket_timeout);
1795                 if (ret <= 0) {
1796                         close(fd);
1797 			return 0;
1798                 }
1799 
1800                 struct uwsgi_buffer *ub = uwsgi_cache_prepare_magic_exists(cache_name, cache_name_len, key, keylen);
1801                 if (!ub) {
1802                         close(fd);
1803 			return 0;
1804                 }
1805 
1806                 if (cache_magic_send_and_manage(fd, ub, NULL, 0, uwsgi.socket_timeout, &ucmc)) {
1807                         close(fd);
1808                         uwsgi_buffer_destroy(ub);
1809 			return 0;
1810                 }
1811 
1812                 if (uwsgi_strncmp(ucmc.status, ucmc.status_len, "ok", 2)) {
1813                         close(fd);
1814                         uwsgi_buffer_destroy(ub);
1815 			return 0;
1816                 }
1817 
1818 		close(fd);
1819 		uwsgi_buffer_destroy(ub);
1820 		return 1;
1821         }
1822 
1823         return 0;
1824 }
1825 
1826 
uwsgi_cache_magic_set(char * key,uint16_t keylen,char * value,uint64_t vallen,uint64_t expires,uint64_t flags,char * cache)1827 int uwsgi_cache_magic_set(char *key, uint16_t keylen, char *value, uint64_t vallen, uint64_t expires, uint64_t flags, char *cache) {
1828 	struct uwsgi_cache_magic_context ucmc;
1829         struct uwsgi_cache *uc = NULL;
1830         char *cache_server = NULL;
1831         char *cache_name = NULL;
1832         uint16_t cache_name_len = 0;
1833 
1834         if (cache) {
1835                 char *at = strchr(cache, '@');
1836                 if (!at) {
1837                         uc = uwsgi_cache_by_name(cache);
1838                 }
1839                 else {
1840                         cache_server = at + 1;
1841                         cache_name = cache;
1842                         cache_name_len = at - cache;
1843                 }
1844         }
1845 	// use default (local) cache
1846 	else {
1847                 uc = uwsgi.caches;
1848         }
1849 
1850 	// we have a local cache !!!
1851 	if (uc) {
1852                 uwsgi_wlock(uc->lock);
1853                 int ret = uwsgi_cache_set2(uc, key, keylen, value, vallen, expires, flags);
1854                 uwsgi_rwunlock(uc->lock);
1855 		return ret;
1856         }
1857 
1858 	// we have a remote one
1859 	if (cache_server) {
1860 		int fd = uwsgi_connect(cache_server, 0, 1);
1861                 if (fd < 0) return -1;
1862 
1863                 int ret = uwsgi.wait_write_hook(fd, uwsgi.socket_timeout);
1864                 if (ret <= 0) {
1865                         close(fd);
1866                         return -1;
1867                 }
1868 
1869 		struct uwsgi_buffer *ub = NULL;
1870 		if (flags & UWSGI_CACHE_FLAG_UPDATE) {
1871                 	ub = uwsgi_cache_prepare_magic_update(cache_name, cache_name_len, key, keylen, vallen, expires);
1872 		}
1873 		else {
1874                 	ub = uwsgi_cache_prepare_magic_set(cache_name, cache_name_len, key, keylen, vallen, expires);
1875 		}
1876                 if (!ub) {
1877                         close(fd);
1878                         return -1;
1879                 }
1880 
1881                 if (cache_magic_send_and_manage(fd, ub, value, vallen, uwsgi.socket_timeout, &ucmc)) {
1882                         close(fd);
1883                         uwsgi_buffer_destroy(ub);
1884                         return -1;
1885                 }
1886 
1887                 if (uwsgi_strncmp(ucmc.status, ucmc.status_len, "ok", 2)) {
1888                         close(fd);
1889                         uwsgi_buffer_destroy(ub);
1890                         return -1;
1891                 }
1892 
1893 		close(fd);
1894 		uwsgi_buffer_destroy(ub);
1895 		return 0;
1896 
1897         }
1898 
1899         return -1;
1900 
1901 }
1902 
uwsgi_cache_magic_del(char * key,uint16_t keylen,char * cache)1903 int uwsgi_cache_magic_del(char *key, uint16_t keylen, char *cache) {
1904 
1905 	struct uwsgi_cache_magic_context ucmc;
1906         struct uwsgi_cache *uc = NULL;
1907         char *cache_server = NULL;
1908         char *cache_name = NULL;
1909         uint16_t cache_name_len = 0;
1910         if (cache) {
1911                 char *at = strchr(cache, '@');
1912                 if (!at) {
1913                         uc = uwsgi_cache_by_name(cache);
1914                 }
1915                 else {
1916                         cache_server = at + 1;
1917                         cache_name = cache;
1918                         cache_name_len = at - cache;
1919                 }
1920         }
1921         // use default (local) cache
1922         else {
1923                 uc = uwsgi.caches;
1924         }
1925 
1926         // we have a local cache !!!
1927         if (uc) {
1928                 uwsgi_wlock(uc->lock);
1929                 if (uwsgi_cache_del2(uc, key, keylen, 0, 0)) {
1930                         uwsgi_rwunlock(uc->lock);
1931                         return -1;
1932                 }
1933                 uwsgi_rwunlock(uc->lock);
1934                 return 0;
1935         }
1936 
1937         // we have a remote one
1938         if (cache_server) {
1939                 int fd = uwsgi_connect(cache_server, 0, 1);
1940                 if (fd < 0) return -1;
1941 
1942                 int ret = uwsgi.wait_write_hook(fd, uwsgi.socket_timeout);
1943                 if (ret <= 0) {
1944                         close(fd);
1945                         return -1;
1946                 }
1947 
1948                 struct uwsgi_buffer *ub = uwsgi_cache_prepare_magic_del(cache_name, cache_name_len, key, keylen);
1949                 if (!ub) {
1950                         close(fd);
1951                         return -1;
1952                 }
1953 
1954                 if (cache_magic_send_and_manage(fd, ub, NULL, 0, uwsgi.socket_timeout, &ucmc)) {
1955                         close(fd);
1956                         uwsgi_buffer_destroy(ub);
1957                         return -1;
1958                 }
1959 
1960                 if (uwsgi_strncmp(ucmc.status, ucmc.status_len, "ok", 2)) {
1961                         close(fd);
1962                         uwsgi_buffer_destroy(ub);
1963                         return -1;
1964                 }
1965 
1966 		close(fd);
1967 		uwsgi_buffer_destroy(ub);
1968                 return 0;
1969         }
1970 
1971         return -1 ;
1972 
1973 }
1974 
uwsgi_cache_magic_clear(char * cache)1975 int uwsgi_cache_magic_clear(char *cache) {
1976 
1977         struct uwsgi_cache_magic_context ucmc;
1978         struct uwsgi_cache *uc = NULL;
1979         char *cache_server = NULL;
1980         char *cache_name = NULL;
1981         uint16_t cache_name_len = 0;
1982         if (cache) {
1983                 char *at = strchr(cache, '@');
1984                 if (!at) {
1985                         uc = uwsgi_cache_by_name(cache);
1986                 }
1987                 else {
1988                         cache_server = at + 1;
1989                         cache_name = cache;
1990                         cache_name_len = at - cache;
1991                 }
1992         }
1993         // use default (local) cache
1994         else {
1995                 uc = uwsgi.caches;
1996         }
1997 
1998         // we have a local cache !!!
1999         if (uc) {
2000 		uint64_t i;
2001                 uwsgi_wlock(uc->lock);
2002 		for (i = 1; i < uc->max_items; i++) {
2003                 	if (uwsgi_cache_del2(uc, NULL, 0, i, 0)) {
2004                         	uwsgi_rwunlock(uc->lock);
2005                         	return -1;
2006                 	}
2007 		}
2008                 uwsgi_rwunlock(uc->lock);
2009                 return 0;
2010         }
2011 
2012         // we have a remote one
2013         if (cache_server) {
2014                 int fd = uwsgi_connect(cache_server, 0, 1);
2015                 if (fd < 0) return -1;
2016 
2017                 int ret = uwsgi.wait_write_hook(fd, uwsgi.socket_timeout);
2018                 if (ret <= 0) {
2019                         close(fd);
2020                         return -1;
2021                 }
2022 
2023                 struct uwsgi_buffer *ub = uwsgi_cache_prepare_magic_clear(cache_name, cache_name_len);
2024                 if (!ub) {
2025                         close(fd);
2026                         return -1;
2027                 }
2028 
2029                 if (cache_magic_send_and_manage(fd, ub, NULL, 0, uwsgi.socket_timeout, &ucmc)) {
2030                         close(fd);
2031                         uwsgi_buffer_destroy(ub);
2032                         return -1;
2033                 }
2034 
2035                 if (uwsgi_strncmp(ucmc.status, ucmc.status_len, "ok", 2)) {
2036                         close(fd);
2037                         uwsgi_buffer_destroy(ub);
2038                         return -1;
2039                 }
2040 
2041 		close(fd);
2042 		uwsgi_buffer_destroy(ub);
2043                 return 0;
2044         }
2045 
2046         return -1 ;
2047 
2048 }
2049 
2050 
uwsgi_cache_sync_from_nodes(struct uwsgi_cache * uc)2051 void uwsgi_cache_sync_from_nodes(struct uwsgi_cache *uc) {
2052 	struct uwsgi_string_list *usl = uc->sync_nodes;
2053 	while(usl) {
2054 		uwsgi_log("[cache-sync] getting cache dump from %s ...\n", usl->value);
2055 		int fd = uwsgi_connect(usl->value, 0, 0);
2056 		if (fd < 0) {
2057 			uwsgi_log("[cache-sync] unable to connect to the cache server\n");
2058 			goto next;
2059 		}
2060 
2061 		struct uwsgi_buffer *ub = uwsgi_buffer_new(uwsgi.page_size + uc->filesize);
2062 		ub->pos = 4;
2063 		if (uc->name && uwsgi_buffer_append(ub, uc->name, uc->name_len)) {
2064 			uwsgi_buffer_destroy(ub);
2065 			close(fd);
2066 			goto next;
2067 		}
2068 
2069 		if (uwsgi_buffer_set_uh(ub, 111, 6)) {
2070 			uwsgi_buffer_destroy(ub);
2071 			close(fd);
2072 			goto next;
2073 		}
2074 
2075 		if (uwsgi_write_nb(fd, ub->buf, ub->pos, uwsgi.socket_timeout)) {
2076 			uwsgi_buffer_destroy(ub);
2077 			uwsgi_log("[cache-sync] unable to write to the cache server\n");
2078 			close(fd);
2079 			goto next;
2080 		}
2081 
2082 		size_t rlen = ub->pos;
2083 		if (uwsgi_read_with_realloc(fd, &ub->buf, &rlen, uwsgi.socket_timeout, NULL, NULL)) {
2084 			uwsgi_buffer_destroy(ub);
2085 			uwsgi_log("[cache-sync] unable to read from the cache server\n");
2086 			close(fd);
2087 			goto next;
2088 		}
2089 
2090 		uwsgi_hooked_parse(ub->buf, rlen, cache_sync_hook, uc);
2091 
2092 		if (uwsgi_read_nb(fd, (char *) uc->items, uc->filesize, uwsgi.socket_timeout)) {
2093 			uwsgi_buffer_destroy(ub);
2094 			close(fd);
2095                         uwsgi_log("[cache-sync] unable to read from the cache server\n");
2096 			goto next;
2097                 }
2098 
2099 		// reset the hashtable
2100 		memset(uc->hashtable, 0, sizeof(uint64_t) * UMAX16);
2101 		// re-fill the hashtable
2102                 uwsgi_cache_fix(uc);
2103 
2104 		uwsgi_buffer_destroy(ub);
2105 		close(fd);
2106 		break;
2107 next:
2108 		if (!usl->next) {
2109 			exit(1);
2110 		}
2111 		uwsgi_log("[cache-sync] trying with the next sync node...\n");
2112 		usl = usl->next;
2113 	}
2114 }
2115 
2116 
uwsgi_cache_setup_nodes(struct uwsgi_cache * uc)2117 void uwsgi_cache_setup_nodes(struct uwsgi_cache *uc) {
2118 	struct uwsgi_string_list *usl = uc->nodes;
2119 	while(usl) {
2120 		char *port = strchr(usl->value, ':');
2121 		if (!port) {
2122 			uwsgi_log("[cache-udp-node] invalid udp address: %s\n", usl->value);
2123 			exit(1);
2124 		}
2125 		// no need to zero the memory, socket_to_in_addr will do that
2126 		struct sockaddr_in *sin = uwsgi_malloc(sizeof(struct sockaddr_in));
2127 		usl->custom = socket_to_in_addr(usl->value, port, 0, sin);
2128 		usl->custom_ptr = sin;
2129 		uwsgi_log("added udp node %s for cache \"%s\"\n", usl->value, uc->name);
2130 		usl = usl->next;
2131 	}
2132 }
2133 
uwsgi_cache_keys(struct uwsgi_cache * uc,uint64_t * pos,struct uwsgi_cache_item ** uci)2134 struct uwsgi_cache_item *uwsgi_cache_keys(struct uwsgi_cache *uc, uint64_t *pos, struct uwsgi_cache_item **uci) {
2135 
2136 	// security check
2137 	if (*pos >= uc->hashsize) return NULL;
2138 	// iterate hashtable
2139 	uint64_t orig_pos = *pos;
2140 	for(;*pos<uc->hashsize;(*pos)++) {
2141 		// get the cache slot
2142 		uint64_t slot = uc->hashtable[*pos];
2143 		if (*pos == orig_pos && *uci) {
2144 			slot = (*uci)->next;
2145 		}
2146 		if (slot == 0) continue;
2147 
2148 		*uci = cache_item(slot);
2149 		return *uci;
2150 	}
2151 
2152 	(*pos)++;
2153 	return NULL;
2154 }
2155 
uwsgi_cache_rlock(struct uwsgi_cache * uc)2156 void uwsgi_cache_rlock(struct uwsgi_cache *uc) {
2157 	uwsgi_rlock(uc->lock);
2158 }
2159 
uwsgi_cache_rwunlock(struct uwsgi_cache * uc)2160 void uwsgi_cache_rwunlock(struct uwsgi_cache *uc) {
2161 	uwsgi_rwunlock(uc->lock);
2162 }
2163 
uwsgi_cache_item_key(struct uwsgi_cache_item * uci)2164 char *uwsgi_cache_item_key(struct uwsgi_cache_item *uci) {
2165 	return uci->key;
2166 }
2167