1 #include "uwsgi.h"
2
3 extern struct uwsgi_server uwsgi;
4 #define cache_item(x) (struct uwsgi_cache_item *) (((char *)uc->items) + ((sizeof(struct uwsgi_cache_item)+uc->keysize) * x))
5
6 // block bitmap manager
7
8 /* how the cache bitmap works:
9
10 a bitmap is a shared memory area allocated when requested by the user with --cache2
11
12 Each block maps to a bit in the bitmap. If the corresponding bit is cleared
13 the block is usable otherwise the block scanner will search for the next one.
14
15 Object can be placed only on consecutive blocks, fragmentation is not allowed.
16
17 To increase the scan performance, a 64bit pointer to the last used bit + 1 is hold
18
19 To search for free blocks you run
20
21 uint64_t uwsgi_cache_find_free_block(struct uwsgi_cache *uc, size_t need)
22
23 where need is the size of the object
24
25 */
26
cache_full(struct uwsgi_cache * uc)27 static void cache_full(struct uwsgi_cache *uc) {
28 uint64_t i;
29 int clear_cache = uc->clear_on_full;
30
31 if (!uc->ignore_full) {
32 if (uc->purge_lru)
33 uwsgi_log("LRU item will be purged from cache \"%s\"\n", uc->name);
34 else
35 uwsgi_log("*** DANGER cache \"%s\" is FULL !!! ***\n", uc->name);
36 }
37
38 uc->full++;
39
40 if (uc->purge_lru && uc->lru_head)
41 uwsgi_cache_del2(uc, NULL, 0, uc->lru_head, UWSGI_CACHE_FLAG_LOCAL);
42
43 // we do not need locking here !
44 if (uc->sweep_on_full) {
45 uint64_t removed = 0;
46 uint64_t now = (uint64_t) uwsgi_now();
47 if (uc->next_scan <= now) {
48 uc->next_scan = now + uc->sweep_on_full;
49 for (i = 1; i < uc->max_items; i++) {
50 struct uwsgi_cache_item *uci = cache_item(i);
51 if (uci->expires > 0 && uci->expires <= now) {
52 if (!uwsgi_cache_del2(uc, NULL, 0, i, 0)) {
53 removed++;
54 }
55 }
56 }
57 }
58 if (removed) {
59 clear_cache = 0;
60 }
61 }
62
63 if (clear_cache) {
64 for (i = 1; i < uc->max_items; i++) {
65 uwsgi_cache_del2(uc, NULL, 0, i, 0);
66 }
67 }
68 }
69
uwsgi_cache_find_free_blocks(struct uwsgi_cache * uc,uint64_t need)70 static uint64_t uwsgi_cache_find_free_blocks(struct uwsgi_cache *uc, uint64_t need) {
71 // how many blocks we need ?
72 uint64_t needed_blocks = need/uc->blocksize;
73 if (need % uc->blocksize > 0) needed_blocks++;
74
75 // which is the first free bit?
76 uint64_t bitmap_byte = 0;
77 uint8_t bitmap_bit = 0;
78
79 if (uc->blocks_bitmap_pos > 0) {
80 bitmap_byte = uc->blocks_bitmap_pos/8;
81 bitmap_bit = uc->blocks_bitmap_pos % 8;
82 }
83
84 // ok we now have the start position, let's search for contiguous blocks
85 uint8_t *bitmap = uc->blocks_bitmap;
86 uint64_t base = 0xffffffffffffffffLLU;
87 uint8_t base_bit = 0;
88 uint64_t j;
89 uint64_t found = 0;
90 uint64_t need_to_scan = uc->blocks_bitmap_size;
91 // we make an addition round for the corner case of a single byte map not starting from 0
92 if (bitmap_bit > 0) need_to_scan++;
93 j = bitmap_byte;
94 //uwsgi_log("start scanning %llu bytes starting from %llu need: %llu\n", (unsigned long long) need_to_scan, (unsigned long long) bitmap_byte, (unsigned long long) needed_blocks);
95 while(need_to_scan) {
96 uint8_t num = bitmap[j];
97 uint8_t i;
98 uint8_t bit_pos = 0;
99 if (j == bitmap_byte) {
100 i = 1 << (7-bitmap_bit);
101 bit_pos = bitmap_bit;
102 }
103 else {
104 i = 1 <<7;
105 }
106 while(i > 0) {
107 // used block
108 if (num & i) {
109 found = 0;
110 base = 0xffffffffffffffffLLU;
111 base_bit = 0;
112 }
113 // free block
114 else {
115 if (base == 0xffffffffffffffffLLU ) {
116 base = j;
117 base_bit = bit_pos;
118 }
119 found++;
120 if (found == needed_blocks) {
121 #ifdef UWSGI_DEBUG
122 printf("found %llu consecutive bit starting from byte %llu\n", (unsigned long long) found, (unsigned long long) base);
123 #endif
124 return ((base*8) + base_bit);
125 }
126 }
127 i >>= 1;
128 bit_pos++;
129 }
130 j++;
131 need_to_scan--;
132 // check for overlap (that is not supported)
133 if (j >= uc->blocks_bitmap_size) {
134 j = 0;
135 found = 0;
136 base = 0xffffffffffffffffLLU;
137 base_bit = 0;
138 // we use bitmap_bit only at the first round
139 bitmap_bit = 0;
140 }
141 }
142
143
144 // no more free blocks
145 return 0xffffffffffffffffLLU;
146 }
147
cache_mark_blocks(struct uwsgi_cache * uc,uint64_t index,uint64_t len)148 static uint64_t cache_mark_blocks(struct uwsgi_cache *uc, uint64_t index, uint64_t len) {
149 uint64_t needed_blocks = len/uc->blocksize;
150 if (len % uc->blocksize > 0) needed_blocks++;
151
152 uint64_t first_byte = index/8;
153 uint8_t first_byte_bit = index % 8;
154 // offset starts with 0, so actual last bit is index + needed_blocks - 1
155 uint64_t last_byte = (index + needed_blocks - 1) / 8;
156 uint8_t last_byte_bit = (index + needed_blocks - 1) % 8;
157
158 uint64_t needed_bytes = (last_byte - first_byte) + 1;
159
160 //uwsgi_log("%llu %u %llu %u\n", first_byte, first_byte_bit, last_byte, last_byte_bit);
161
162 uint8_t mask = 0xff >> first_byte_bit;
163
164 if (needed_bytes == 1) {
165 // kinda hacky, but it does the job
166 mask >>= (7 - last_byte_bit);
167 mask <<= (7 - last_byte_bit);
168 }
169
170 uc->blocks_bitmap[first_byte] |= mask;
171
172 if (needed_bytes > 1) {
173 mask = 0xff << (7 - last_byte_bit);
174 uc->blocks_bitmap[last_byte] |= mask;
175 }
176
177 if (needed_bytes > 2) {
178 uint8_t *ptr = &uc->blocks_bitmap[first_byte+1];
179 memset(ptr, 0xff, needed_bytes-2);
180 }
181 return needed_blocks;
182 }
183
cache_unmark_blocks(struct uwsgi_cache * uc,uint64_t index,uint64_t len)184 static void cache_unmark_blocks(struct uwsgi_cache *uc, uint64_t index, uint64_t len) {
185 uint64_t needed_blocks = len/uc->blocksize;
186 if (len % uc->blocksize > 0) needed_blocks++;
187
188 uint64_t first_byte = index/8;
189 uint8_t first_byte_bit = index % 8;
190 // offset starts with 0, so actual last bit is index + needed_blocks - 1
191 uint64_t last_byte = (index + needed_blocks - 1)/8;
192 uint8_t last_byte_bit = (index + needed_blocks - 1) % 8;
193
194 uint64_t needed_bytes = (last_byte - first_byte) + 1;
195
196 uint8_t mask = 0xff >> first_byte_bit;
197
198 if (needed_bytes == 1) {
199 // kinda hacky, but it does the job
200 mask >>= (7 - last_byte_bit);
201 mask <<= (7 - last_byte_bit);
202 }
203
204 // here we use AND (0+0 = 0 | 1+0 = 0 | 0+1 = 0| 1+1 = 1)
205 // 0 in mask means "unmark", 1 in mask means "do not change"
206 // so we need to invert the mask
207 uc->blocks_bitmap[first_byte] &= ~mask;
208
209 if (needed_bytes > 1) {
210 mask = 0xff << (7 - last_byte_bit);
211 uc->blocks_bitmap[last_byte] &= ~mask;
212 }
213
214 if (needed_bytes > 2) {
215 uint8_t *ptr = &uc->blocks_bitmap[first_byte+1];
216 memset(ptr, 0, needed_bytes-2);
217 }
218 }
219
220 static void cache_send_udp_command(struct uwsgi_cache *, char *, uint16_t, char *, uint16_t, uint64_t, uint8_t);
221
cache_sync_hook(char * k,uint16_t kl,char * v,uint16_t vl,void * data)222 static void cache_sync_hook(char *k, uint16_t kl, char *v, uint16_t vl, void *data) {
223 struct uwsgi_cache *uc = (struct uwsgi_cache *) data;
224 if (!uwsgi_strncmp(k, kl, "items", 5)) {
225 size_t num = uwsgi_str_num(v, vl);
226 if (num != uc->max_items) {
227 uwsgi_log("[cache-sync] invalid cache size, expected %llu received %llu\n", (unsigned long long) uc->max_items, (unsigned long long) num);
228 exit(1);
229 }
230 }
231 if (!uwsgi_strncmp(k, kl, "blocksize", 9)) {
232 size_t num = uwsgi_str_num(v, vl);
233 if (num != uc->blocksize) {
234 uwsgi_log("[cache-sync] invalid cache block size, expected %llu received %llu\n", (unsigned long long) uc->blocksize, (unsigned long long) num);
235 exit(1);
236 }
237 }
238 }
239
uwsgi_cache_add_items(struct uwsgi_cache * uc)240 static void uwsgi_cache_add_items(struct uwsgi_cache *uc) {
241 struct uwsgi_string_list *usl = uwsgi.add_cache_item;
242 while(usl) {
243 char *space = strchr(usl->value, ' ');
244 char *key = usl->value;
245 uint16_t key_len;
246 if (space) {
247 // need to skip ?
248 if (uwsgi_strncmp(uc->name, uc->name_len, usl->value, space-usl->value)) {
249 goto next;
250 }
251 key = space+1;
252 }
253 char *value = strchr(key, '=');
254 if (!value) {
255 uwsgi_log("[cache] unable to store item %s\n", usl->value);
256 goto next;
257 }
258 key_len = value - key;
259 value++;
260 uint64_t len = (usl->value + usl->len) - value;
261 uwsgi_wlock(uc->lock);
262 if (!uwsgi_cache_set2(uc, key, key_len, value, len, 0, 0)) {
263 uwsgi_log("[cache] stored \"%.*s\" in \"%s\"\n", key_len, key, uc->name);
264 }
265 else {
266 uwsgi_log("[cache-error] unable to store \"%.*s\" in \"%s\"\n", key_len, key, uc->name);
267 }
268 uwsgi_rwunlock(uc->lock);
269 next:
270 usl = usl->next;
271 }
272 }
273
uwsgi_cache_load_files(struct uwsgi_cache * uc)274 static void uwsgi_cache_load_files(struct uwsgi_cache *uc) {
275
276 struct uwsgi_string_list *usl = uwsgi.load_file_in_cache;
277 while(usl) {
278 size_t len = 0;
279 char *value = NULL;
280 char *key = usl->value;
281 uint16_t key_len = usl->len;
282 char *space = strchr(usl->value, ' ');
283 if (space) {
284 // need to skip ?
285 if (uwsgi_strncmp(uc->name, uc->name_len, usl->value, space-usl->value)) {
286 goto next;
287 }
288 key = space+1;
289 key_len = usl->len - ((space-usl->value)+1);
290 }
291 value = uwsgi_open_and_read(key, &len, 0, NULL);
292 if (value) {
293 uwsgi_wlock(uc->lock);
294 if (!uwsgi_cache_set2(uc, key, key_len, value, len, 0, 0)) {
295 uwsgi_log("[cache] stored \"%.*s\" in \"%s\"\n", key_len, key, uc->name);
296 }
297 else {
298 uwsgi_log("[cache-error] unable to store \"%.*s\" in \"%s\"\n", key_len, key, uc->name);
299 }
300 uwsgi_rwunlock(uc->lock);
301 free(value);
302 }
303 else {
304 uwsgi_log("[cache-error] unable to read file \"%.*s\"\n", key_len, key);
305 }
306 next:
307 usl = usl->next;
308 }
309
310 #ifdef UWSGI_ZLIB
311 usl = uwsgi.load_file_in_cache_gzip;
312 while(usl) {
313 size_t len = 0;
314 char *value = NULL;
315 char *key = usl->value;
316 uint16_t key_len = usl->len;
317 char *space = strchr(usl->value, ' ');
318 if (space) {
319 // need to skip ?
320 if (uwsgi_strncmp(uc->name, uc->name_len, usl->value, space-usl->value)) {
321 goto next2;
322 }
323 key = space+1;
324 key_len = usl->len - ((space-usl->value)+1);
325 }
326 value = uwsgi_open_and_read(key, &len, 0, NULL);
327 if (value) {
328 struct uwsgi_buffer *gzipped = uwsgi_gzip(value, len);
329 if (gzipped) {
330 uwsgi_wlock(uc->lock);
331 if (!uwsgi_cache_set2(uc, key, key_len, gzipped->buf, gzipped->len, 0, 0)) {
332 uwsgi_log("[cache-gzip] stored \"%.*s\" in \"%s\"\n", key_len, key, uc->name);
333 }
334 uwsgi_rwunlock(uc->lock);
335 uwsgi_buffer_destroy(gzipped);
336 }
337 free(value);
338 }
339 next2:
340 usl = usl->next;
341 }
342 #endif
343 }
344
345
346
uwsgi_cache_init(struct uwsgi_cache * uc)347 void uwsgi_cache_init(struct uwsgi_cache *uc) {
348
349 uc->hashtable = uwsgi_calloc_shared(sizeof(uint64_t) * uc->hashsize);
350 uc->unused_blocks_stack = uwsgi_calloc_shared(sizeof(uint64_t) * uc->max_items);
351 uc->unused_blocks_stack_ptr = 0;
352 uc->filesize = ( (sizeof(struct uwsgi_cache_item)+uc->keysize) * uc->max_items) + (uc->blocksize * uc->blocks);
353
354 uint64_t i;
355 for (i = 1; i < uc->max_items; i++) {
356 uc->unused_blocks_stack_ptr++;
357 uc->unused_blocks_stack[uc->unused_blocks_stack_ptr] = i;
358 }
359
360 if (uc->use_blocks_bitmap) {
361 uc->blocks_bitmap_size = uc->blocks/8;
362 uint8_t m = uc->blocks % 8;
363 if (m > 0) uc->blocks_bitmap_size++;
364 uc->blocks_bitmap = uwsgi_calloc_shared(uc->blocks_bitmap_size);
365 if (m > 0) {
366 uc->blocks_bitmap[uc->blocks_bitmap_size-1] = 0xff >> m;
367 }
368 }
369
370 //uwsgi.cache_items = (struct uwsgi_cache_item *) mmap(NULL, sizeof(struct uwsgi_cache_item) * uwsgi.cache_max_items, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANON, -1, 0);
371 if (uc->store) {
372 int cache_fd;
373 struct stat cst;
374
375 if (uc->store_delete && !stat(uc->store, &cst) && ((size_t) cst.st_size != uc->filesize || !S_ISREG(cst.st_mode))) {
376 uwsgi_log("Removing invalid cache store file: %s\n", uc->store);
377 if (unlink(uc->store) != 0) {
378 uwsgi_log("Cannot remove invalid cache store file: %s\n", uc->store);
379 exit(1);
380 }
381 }
382
383 if (stat(uc->store, &cst)) {
384 uwsgi_log("creating a new cache store file: %s\n", uc->store);
385 cache_fd = open(uc->store, O_CREAT | O_RDWR, S_IRUSR | S_IWUSR);
386 if (cache_fd >= 0) {
387 // fill the caching store
388 if (ftruncate(cache_fd, uc->filesize)) {
389 uwsgi_log("ftruncate()");
390 exit(1);
391 }
392 }
393 }
394 else {
395 if ((size_t) cst.st_size != uc->filesize || !S_ISREG(cst.st_mode)) {
396 uwsgi_log("invalid cache store file. Please remove it or fix cache blocksize/items to match its size\n");
397 exit(1);
398 }
399 cache_fd = open(uc->store, O_CREAT | O_RDWR, S_IRUSR | S_IWUSR);
400 uwsgi_log("recovered cache from backing store file: %s\n", uc->store);
401 }
402
403 if (cache_fd < 0) {
404 uwsgi_error_open(uc->store);
405 exit(1);
406 }
407 uc->items = (struct uwsgi_cache_item *) mmap(NULL, uc->filesize, PROT_READ | PROT_WRITE, MAP_SHARED, cache_fd, 0);
408 if (uc->items == MAP_FAILED) {
409 uwsgi_error("uwsgi_cache_init()/mmap() [with store]");
410 exit(1);
411 }
412
413 uwsgi_cache_fix(uc);
414 close(cache_fd);
415 }
416 else {
417 uc->items = (struct uwsgi_cache_item *) mmap(NULL, uc->filesize, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANON, -1, 0);
418 if (uc->items == MAP_FAILED) {
419 uwsgi_error("uwsgi_cache_init()/mmap()");
420 exit(1);
421 }
422 uint64_t i;
423 for (i = 0; i < uc->max_items; i++) {
424 // here we only need to clear the item header
425 memset(cache_item(i), 0, sizeof(struct uwsgi_cache_item));
426 }
427 }
428
429 uc->data = ((char *)uc->items) + ((sizeof(struct uwsgi_cache_item)+uc->keysize) * uc->max_items);
430
431 if (uc->name) {
432 // can't free that until shutdown
433 char *lock_name = uwsgi_concat2("cache_", uc->name);
434 uc->lock = uwsgi_rwlock_init(lock_name);
435 }
436 else {
437 uc->lock = uwsgi_rwlock_init("cache");
438 }
439
440 uwsgi_log("*** Cache \"%s\" initialized: %lluMB (key: %llu bytes, keys: %llu bytes, data: %llu bytes, bitmap: %llu bytes) preallocated ***\n",
441 uc->name,
442 (unsigned long long) uc->filesize / (1024 * 1024),
443 (unsigned long long) sizeof(struct uwsgi_cache_item)+uc->keysize,
444 (unsigned long long) ((sizeof(struct uwsgi_cache_item)+uc->keysize) * uc->max_items), (unsigned long long) (uc->blocksize * uc->blocks),
445 (unsigned long long) uc->blocks_bitmap_size);
446
447 uwsgi_cache_setup_nodes(uc);
448
449 uc->udp_node_socket = socket(AF_INET, SOCK_DGRAM, 0);
450 if (uc->udp_node_socket < 0) {
451 uwsgi_error("[cache-udp-node] socket()");
452 exit(1);
453 }
454 uwsgi_socket_nb(uc->udp_node_socket);
455
456 uwsgi_cache_sync_from_nodes(uc);
457
458 uwsgi_cache_load_files(uc);
459
460 uwsgi_cache_add_items(uc);
461
462 }
463
check_lazy(struct uwsgi_cache * uc,struct uwsgi_cache_item * uci,uint64_t slot)464 static uint64_t check_lazy(struct uwsgi_cache *uc, struct uwsgi_cache_item *uci, uint64_t slot) {
465 if (!uci->expires || !uc->lazy_expire) return slot;
466 uint64_t now = (uint64_t) uwsgi_now();
467 // expired ?
468 if (uci->expires <= now) {
469 uwsgi_cache_del2(uc, NULL, 0, slot, UWSGI_CACHE_FLAG_LOCAL);
470 return 0;
471 }
472 return slot;
473 }
474
uwsgi_cache_get_index(struct uwsgi_cache * uc,char * key,uint16_t keylen)475 static uint64_t uwsgi_cache_get_index(struct uwsgi_cache *uc, char *key, uint16_t keylen) {
476
477 uint32_t hash = uc->hash->func(key, keylen);
478 uint32_t hash_key = hash % uc->hashsize;
479
480 uint64_t slot = uc->hashtable[hash_key];
481
482 // optimization
483 if (slot == 0) return 0;
484
485 //uwsgi_log("hash_key = %lu slot = %llu\n", hash_key, (unsigned long long) slot);
486
487 struct uwsgi_cache_item *uci = cache_item(slot);
488 uint64_t rounds = 0;
489
490 // first round
491 if (uci->hash % uc->hashsize != hash_key)
492 return 0;
493 if (uci->hash != hash)
494 goto cycle;
495 if (uci->keysize != keylen)
496 goto cycle;
497 if (memcmp(uci->key, key, keylen))
498 goto cycle;
499
500 return check_lazy(uc, uci, slot);
501
502 cycle:
503 while (uci->next) {
504 slot = uci->next;
505 uci = cache_item(slot);
506 rounds++;
507 if (rounds > uc->max_items) {
508 uwsgi_log("ALARM !!! cache-loop (and potential deadlock) detected slot = %lu prev = %lu next = %lu\n", slot, uci->prev, uci->next);
509 // terrible case: the whole uWSGI stack can deadlock, leaving only the master alive
510 // if the master is avalable, trigger a brutal reload
511 if (uwsgi.master_process) {
512 kill(uwsgi.workers[0].pid, SIGTERM);
513 }
514 // otherwise kill the current worker (could be pretty useless...)
515 else {
516 exit(1);
517 }
518 }
519 if (uci->hash != hash)
520 continue;
521 if (uci->keysize != keylen)
522 continue;
523 if (!memcmp(uci->key, key, keylen)) {
524 return check_lazy(uc, uci, slot);
525 }
526 }
527
528 return 0;
529 }
530
uwsgi_cache_exists2(struct uwsgi_cache * uc,char * key,uint16_t keylen)531 uint32_t uwsgi_cache_exists2(struct uwsgi_cache *uc, char *key, uint16_t keylen) {
532
533 return uwsgi_cache_get_index(uc, key, keylen);
534 }
535
lru_remove_item(struct uwsgi_cache * uc,uint64_t index)536 static void lru_remove_item(struct uwsgi_cache *uc, uint64_t index)
537 {
538 struct uwsgi_cache_item *prev, *next, *curr = cache_item(index);
539
540 if (curr->lru_next) {
541 next = cache_item(curr->lru_next);
542 next->lru_prev = curr->lru_prev;
543 } else
544 uc->lru_tail = curr->lru_prev;
545
546 if (curr->lru_prev) {
547 prev = cache_item(curr->lru_prev);
548 prev->lru_next = curr->lru_next;
549 } else
550 uc->lru_head = curr->lru_next;
551 }
552
lru_add_item(struct uwsgi_cache * uc,uint64_t index)553 static void lru_add_item(struct uwsgi_cache *uc, uint64_t index)
554 {
555 struct uwsgi_cache_item *prev, *curr = cache_item(index);
556
557 if (uc->lru_tail) {
558 prev = cache_item(uc->lru_tail);
559 prev->lru_next = index;
560 } else
561 uc->lru_head = index;
562
563 curr->lru_next = 0;
564 curr->lru_prev = uc->lru_tail;
565 uc->lru_tail = index;
566 }
567
uwsgi_cache_get2(struct uwsgi_cache * uc,char * key,uint16_t keylen,uint64_t * valsize)568 char *uwsgi_cache_get2(struct uwsgi_cache *uc, char *key, uint16_t keylen, uint64_t * valsize) {
569
570 uint64_t index = uwsgi_cache_get_index(uc, key, keylen);
571
572 if (index) {
573 struct uwsgi_cache_item *uci = cache_item(index);
574 if (uci->flags & UWSGI_CACHE_FLAG_UNGETTABLE)
575 return NULL;
576 *valsize = uci->valsize;
577 if (uc->purge_lru) {
578 lru_remove_item(uc, index);
579 lru_add_item(uc, index);
580 }
581 uci->hits++;
582 uc->hits++;
583 return uc->data + (uci->first_block * uc->blocksize);
584 }
585
586 uc->miss++;
587
588 return NULL;
589 }
590
uwsgi_cache_num2(struct uwsgi_cache * uc,char * key,uint16_t keylen)591 int64_t uwsgi_cache_num2(struct uwsgi_cache *uc, char *key, uint16_t keylen) {
592
593 uint64_t index = uwsgi_cache_get_index(uc, key, keylen);
594
595 if (index) {
596 struct uwsgi_cache_item *uci = cache_item(index);
597 if (uci->flags & UWSGI_CACHE_FLAG_UNGETTABLE)
598 return 0;
599 uci->hits++;
600 uc->hits++;
601 int64_t *num = (int64_t *) (uc->data + (uci->first_block * uc->blocksize));
602 return *num;
603 }
604
605 uc->miss++;
606 return 0;
607 }
608
uwsgi_cache_get3(struct uwsgi_cache * uc,char * key,uint16_t keylen,uint64_t * valsize,uint64_t * expires)609 char *uwsgi_cache_get3(struct uwsgi_cache *uc, char *key, uint16_t keylen, uint64_t * valsize, uint64_t *expires) {
610
611 uint64_t index = uwsgi_cache_get_index(uc, key, keylen);
612
613 if (index) {
614 struct uwsgi_cache_item *uci = cache_item(index);
615 if (uci->flags & UWSGI_CACHE_FLAG_UNGETTABLE)
616 return NULL;
617 *valsize = uci->valsize;
618 if (expires)
619 *expires = uci->expires;
620 if (uc->purge_lru) {
621 lru_remove_item(uc, index);
622 lru_add_item(uc, index);
623 }
624 uci->hits++;
625 uc->hits++;
626 return uc->data + (uci->first_block * uc->blocksize);
627 }
628
629 uc->miss++;
630
631 return NULL;
632 }
633
uwsgi_cache_get4(struct uwsgi_cache * uc,char * key,uint16_t keylen,uint64_t * valsize,uint64_t * hits)634 char *uwsgi_cache_get4(struct uwsgi_cache *uc, char *key, uint16_t keylen, uint64_t * valsize, uint64_t *hits) {
635
636 uint64_t index = uwsgi_cache_get_index(uc, key, keylen);
637
638 if (index) {
639 struct uwsgi_cache_item *uci = cache_item(index);
640 if (uci->flags & UWSGI_CACHE_FLAG_UNGETTABLE)
641 return NULL;
642 *valsize = uci->valsize;
643 if (hits)
644 *hits = uci->hits;
645 uci->hits++;
646 uc->hits++;
647 return uc->data + (uci->first_block * uc->blocksize);
648 }
649
650 uc->miss++;
651
652 return NULL;
653 }
654
655
uwsgi_cache_del2(struct uwsgi_cache * uc,char * key,uint16_t keylen,uint64_t index,uint16_t flags)656 int uwsgi_cache_del2(struct uwsgi_cache *uc, char *key, uint16_t keylen, uint64_t index, uint16_t flags) {
657
658
659 struct uwsgi_cache_item *uci;
660 int ret = -1;
661
662 if (!index) index = uwsgi_cache_get_index(uc, key, keylen);
663
664 if (index) {
665 uci = cache_item(index);
666 if (uci->keysize > 0) {
667 // unmark blocks
668 if (uc->blocks_bitmap) cache_unmark_blocks(uc, uci->first_block, uci->valsize);
669 // put back the block in unused stack
670 uc->unused_blocks_stack_ptr++;
671 uc->unused_blocks_stack[uc->unused_blocks_stack_ptr] = index;
672
673 // unlink prev and next (if any)
674 if (uci->prev) {
675 struct uwsgi_cache_item *ucii = cache_item(uci->prev);
676 ucii->next = uci->next;
677 }
678 else {
679 // set next as the new entry point (could be 0)
680 uc->hashtable[uci->hash % uc->hashsize] = uci->next;
681 }
682
683 if (uci->next) {
684 struct uwsgi_cache_item *ucii = cache_item(uci->next);
685 ucii->prev = uci->prev;
686 }
687
688 if (!uci->prev && !uci->next) {
689 // reset hashtable entry
690 uc->hashtable[uci->hash % uc->hashsize] = 0;
691 }
692
693 if (uc->purge_lru)
694 lru_remove_item(uc, index);
695
696 uc->n_items--;
697 }
698
699 ret = 0;
700
701 uci->keysize = 0;
702 uci->valsize = 0;
703 uci->hash = 0;
704 uci->prev = 0;
705 uci->next = 0;
706 uci->expires = 0;
707
708 if (uc->use_last_modified) {
709 uc->last_modified_at = uwsgi_now();
710 }
711 }
712
713 if (uc->nodes && ret == 0 && !(flags & UWSGI_CACHE_FLAG_LOCAL)) {
714 cache_send_udp_command(uc, key, keylen, NULL, 0, 0, 11);
715 }
716
717 return ret;
718 }
719
uwsgi_cache_fix(struct uwsgi_cache * uc)720 void uwsgi_cache_fix(struct uwsgi_cache *uc) {
721
722 uint64_t i;
723 unsigned long long restored = 0;
724 uint64_t next_scan = 0;
725
726 // reset unused blocks
727 uc->unused_blocks_stack_ptr = 0;
728
729 for (i = 1; i < uc->max_items; i++) {
730 // valid record ?
731 struct uwsgi_cache_item *uci = cache_item(i);
732 if (uci->keysize) {
733 if (!uci->prev) {
734 // put value in hash_table
735 uc->hashtable[uci->hash % uc->hashsize] = i;
736 }
737 if (uci->expires && (!next_scan || next_scan > uci->expires)) {
738 next_scan = uci->expires;
739 }
740 restored++;
741 }
742 else {
743 // put this record in unused stack
744 uc->unused_blocks_stack_ptr++;
745 uc->unused_blocks_stack[uc->unused_blocks_stack_ptr] = i;
746 }
747 }
748
749 uc->next_scan = next_scan;
750 uc->n_items = restored;
751 uwsgi_log("[uwsgi-cache] restored %llu items\n", uc->n_items);
752 }
753
uwsgi_cache_set2(struct uwsgi_cache * uc,char * key,uint16_t keylen,char * val,uint64_t vallen,uint64_t expires,uint64_t flags)754 int uwsgi_cache_set2(struct uwsgi_cache *uc, char *key, uint16_t keylen, char *val, uint64_t vallen, uint64_t expires, uint64_t flags) {
755
756 uint64_t index = 0, last_index = 0;
757
758 struct uwsgi_cache_item *uci, *ucii;
759
760 // used to reset key allocation in bitmap mode
761
762 int ret = -1;
763 time_t now = 0;
764
765 if (!keylen || !vallen)
766 return -1;
767
768 if (keylen > uc->keysize)
769 return -1;
770
771 if (vallen > uc->max_item_size) return -1;
772
773 if ((flags & UWSGI_CACHE_FLAG_MATH) && vallen != 8) return -1;
774
775 //uwsgi_log("putting cache data in key %.*s %d\n", keylen, key, vallen);
776 index = uwsgi_cache_get_index(uc, key, keylen);
777 if (!index) {
778 if (!uc->unused_blocks_stack_ptr) {
779 cache_full(uc);
780 if (!uc->unused_blocks_stack_ptr)
781 goto end;
782 }
783
784 index = uc->unused_blocks_stack[uc->unused_blocks_stack_ptr];
785 uc->unused_blocks_stack_ptr--;
786
787 uci = cache_item(index);
788 if (!uc->blocks_bitmap) {
789 uci->first_block = index;
790 }
791 else {
792 uci->first_block = uwsgi_cache_find_free_blocks(uc, vallen);
793 if (uci->first_block == 0xffffffffffffffffLLU) {
794 uc->unused_blocks_stack_ptr++;
795 cache_full(uc);
796 goto end;
797 }
798 // mark used blocks;
799 uint64_t needed_blocks = cache_mark_blocks(uc, uci->first_block, vallen);
800 // optimize the scan
801 if (uci->first_block + needed_blocks >= uc->blocks) {
802 uc->blocks_bitmap_pos = 0;
803 }
804 else {
805 uc->blocks_bitmap_pos = uci->first_block + needed_blocks;
806 }
807 }
808 if (uc->purge_lru)
809 lru_add_item(uc, index);
810 else if (expires && !(flags & UWSGI_CACHE_FLAG_ABSEXPIRE)) {
811 now = uwsgi_now();
812 expires += now;
813 if (!uc->next_scan || uc->next_scan > expires)
814 uc->next_scan = expires;
815 }
816 uci->expires = expires;
817 uci->hash = uc->hash->func(key, keylen);
818 uci->hits = 0;
819 uci->flags = flags;
820 memcpy(uci->key, key, keylen);
821
822 if ( !(flags & UWSGI_CACHE_FLAG_MATH)) {
823 memcpy(((char *) uc->data) + (uci->first_block * uc->blocksize), val, vallen);
824 }
825 // ok math operations here
826 else {
827 int64_t *num = (int64_t *)(((char *) uc->data) + (uci->first_block * uc->blocksize));
828 *num = uc->math_initial;
829 int64_t *delta = (int64_t *) val;
830 if (flags & UWSGI_CACHE_FLAG_INC) {
831 *num += *delta;
832 }
833 else if (flags & UWSGI_CACHE_FLAG_DEC) {
834 *num -= *delta;
835 }
836 else if (flags & UWSGI_CACHE_FLAG_MUL) {
837 *num *= *delta;
838 }
839 else if (flags & UWSGI_CACHE_FLAG_DIV) {
840 if (*delta == 0) {
841 *num = 0;
842 }
843 else {
844 *num /= *delta;
845 }
846 }
847 }
848
849 // set this as late as possibile (to reduce races risk)
850
851 uci->valsize = vallen;
852 uci->keysize = keylen;
853 ret = 0;
854 // now put the value in the hashtable
855 uint32_t slot = uci->hash % uc->hashsize;
856 // reset values
857 uci->prev = 0;
858 uci->next = 0;
859
860 last_index = uc->hashtable[slot];
861 if (last_index == 0) {
862 uc->hashtable[slot] = index;
863 }
864 else {
865 // append to first available next
866 ucii = cache_item(last_index);
867 while (ucii->next) {
868 last_index = ucii->next;
869 ucii = cache_item(last_index);
870 }
871 ucii->next = index;
872 uci->prev = last_index;
873 }
874
875 uc->n_items++ ;
876 }
877 else if (flags & UWSGI_CACHE_FLAG_UPDATE) {
878 uci = cache_item(index);
879 if (!(flags & UWSGI_CACHE_FLAG_FIXEXPIRE)) {
880 if (uc->purge_lru) {
881 lru_remove_item(uc, index);
882 lru_add_item(uc, index);
883 } else if (expires && !(flags & UWSGI_CACHE_FLAG_ABSEXPIRE)) {
884 now = uwsgi_now();
885 expires += now;
886 if (!uc->next_scan || uc->next_scan > expires)
887 uc->next_scan = expires;
888 }
889 uci->expires = expires;
890 }
891 if (uc->blocks_bitmap) {
892 // we have a special case here, as we need to find a new series of free blocks
893 uint64_t old_first_block = uci->first_block;
894 uci->first_block = uwsgi_cache_find_free_blocks(uc, vallen);
895 if (uci->first_block == 0xffffffffffffffffLLU) {
896 uci->first_block = old_first_block;
897 cache_full(uc);
898 goto end;
899 }
900 // mark used blocks;
901 uint64_t needed_blocks = cache_mark_blocks(uc, uci->first_block, vallen);
902 // optimize the scan
903 if (uci->first_block + needed_blocks >= uc->blocks) {
904 uc->blocks_bitmap_pos = 0;
905 }
906 else {
907 uc->blocks_bitmap_pos = uci->first_block + needed_blocks;
908 }
909 // unmark the old blocks
910 cache_unmark_blocks(uc, old_first_block, uci->valsize);
911 }
912 if ( !(flags & UWSGI_CACHE_FLAG_MATH)) {
913 memcpy(((char *) uc->data) + (uci->first_block * uc->blocksize), val, vallen);
914 }
915 else {
916 int64_t *num = (int64_t *)(((char *) uc->data) + (uci->first_block * uc->blocksize));
917 int64_t *delta = (int64_t *) val;
918 if (flags & UWSGI_CACHE_FLAG_INC) {
919 *num += *delta;
920 }
921 else if (flags & UWSGI_CACHE_FLAG_DEC) {
922 *num -= *delta;
923 }
924 else if (flags & UWSGI_CACHE_FLAG_MUL) {
925 *num *= *delta;
926 }
927 else if (flags & UWSGI_CACHE_FLAG_DIV) {
928 if (*delta == 0) {
929 *num = 0;
930 }
931 else {
932 *num /= *delta;
933 }
934 }
935 }
936 uci->valsize = vallen;
937 ret = 0;
938 }
939
940 if (uc->use_last_modified) {
941 uc->last_modified_at = (now ? now : uwsgi_now());
942 }
943
944 if (uc->nodes && ret == 0 && !(flags & UWSGI_CACHE_FLAG_LOCAL)) {
945 cache_send_udp_command(uc, key, keylen, val, vallen, expires, 10);
946 }
947
948
949 end:
950 return ret;
951
952 }
953
954
cache_send_udp_command(struct uwsgi_cache * uc,char * key,uint16_t keylen,char * val,uint16_t vallen,uint64_t expires,uint8_t cmd)955 static void cache_send_udp_command(struct uwsgi_cache *uc, char *key, uint16_t keylen, char *val, uint16_t vallen, uint64_t expires, uint8_t cmd) {
956
957 struct uwsgi_header uh;
958 uint8_t u_k[2];
959 uint8_t u_v[2];
960 uint8_t u_e[2];
961 uint16_t vallen16 = vallen;
962 struct iovec iov[7];
963 struct msghdr mh;
964
965 memset(&mh, 0, sizeof(struct msghdr));
966 mh.msg_iov = iov;
967 mh.msg_iovlen = 3;
968
969 if (cmd == 10) {
970 mh.msg_iovlen = 7;
971 }
972
973 iov[0].iov_base = &uh;
974 iov[0].iov_len = 4;
975
976 u_k[0] = (uint8_t) (keylen & 0xff);
977 u_k[1] = (uint8_t) ((keylen >> 8) & 0xff);
978
979 iov[1].iov_base = u_k;
980 iov[1].iov_len = 2;
981
982 iov[2].iov_base = key;
983 iov[2].iov_len = keylen;
984
985 uh.pktsize = 2 + keylen;
986
987 if (cmd == 10) {
988 u_v[0] = (uint8_t) (vallen16 & 0xff);
989 u_v[1] = (uint8_t) ((vallen16 >> 8) & 0xff);
990
991 iov[3].iov_base = u_v;
992 iov[3].iov_len = 2;
993
994 iov[4].iov_base = val;
995 iov[4].iov_len = vallen16;
996
997 char es[sizeof(UMAX64_STR) + 1];
998 uint16_t es_size = uwsgi_long2str2n(expires, es, sizeof(UMAX64_STR));
999
1000 u_e[0] = (uint8_t) (es_size & 0xff);
1001 u_e[1] = (uint8_t) ((es_size >> 8) & 0xff);
1002
1003 iov[5].iov_base = u_e;
1004 iov[5].iov_len = 2;
1005
1006 iov[6].iov_base = es;
1007 iov[6].iov_len = es_size;
1008
1009 uh.pktsize += 2 + vallen16 + 2 + es_size;
1010 }
1011
1012 uh.modifier1 = 111;
1013 uh.modifier2 = cmd;
1014
1015 struct uwsgi_string_list *usl = uc->nodes;
1016 while(usl) {
1017 mh.msg_name = usl->custom_ptr;
1018 mh.msg_namelen = usl->custom;
1019 if (sendmsg(uc->udp_node_socket, &mh, 0) <= 0) {
1020 uwsgi_error("[cache-udp-node] sendmsg()");
1021 }
1022 usl = usl->next;
1023 }
1024
1025 }
1026
cache_udp_server_loop(void * ucache)1027 void *cache_udp_server_loop(void *ucache) {
1028 // block all signals
1029 sigset_t smask;
1030 sigfillset(&smask);
1031 pthread_sigmask(SIG_BLOCK, &smask, NULL);
1032
1033 struct uwsgi_cache *uc = (struct uwsgi_cache *) ucache;
1034
1035 int queue = event_queue_init();
1036 struct uwsgi_string_list *usl = uc->udp_servers;
1037 while(usl) {
1038 if (strchr(usl->value, ':')) {
1039 int fd = bind_to_udp(usl->value, 0, 0);
1040 if (fd < 0) {
1041 uwsgi_log("[cache-udp-server] cannot bind to %s\n", usl->value);
1042 exit(1);
1043 }
1044 uwsgi_socket_nb(fd);
1045 event_queue_add_fd_read(queue, fd);
1046 uwsgi_log("*** udp server for cache \"%s\" running on %s ***\n", uc->name, usl->value);
1047 }
1048 usl = usl->next;
1049 }
1050
1051 // allocate 64k chunk to receive messages
1052 char *buf = uwsgi_malloc(UMAX16);
1053
1054 for(;;) {
1055 uint16_t pktsize = 0, ss = 0;
1056 int interesting_fd = -1;
1057 int rlen = event_queue_wait(queue, -1, &interesting_fd);
1058 if (rlen <= 0) continue;
1059 if (interesting_fd < 0) continue;
1060 ssize_t len = read(interesting_fd, buf, UMAX16);
1061 if (len <= 7) {
1062 uwsgi_error("[cache-udp-server] read()");
1063 }
1064 if (buf[0] != 111) continue;
1065 memcpy(&pktsize, buf+1, 2);
1066 if (pktsize != len-4) continue;
1067
1068 memcpy(&ss, buf + 4, 2);
1069 if (4+ss > pktsize) continue;
1070 uint16_t keylen = ss;
1071 char *key = buf + 6;
1072
1073 // cache set/update
1074 if (buf[3] == 10) {
1075 if (keylen + 2 + 2 > pktsize) continue;
1076 memcpy(&ss, buf + 6 + keylen, 2);
1077 if (4+keylen+ss > pktsize) continue;
1078 uint16_t vallen = ss;
1079 char *val = buf + 8 + keylen;
1080 uint64_t expires = 0;
1081 if (2 + keylen + 2 + vallen + 2 < pktsize) {
1082 memcpy(&ss, buf + 8 + keylen + vallen , 2);
1083 if (6+keylen+vallen+ss > pktsize) continue;
1084 expires = uwsgi_str_num(buf + 10 + keylen+vallen, ss);
1085 }
1086 uwsgi_wlock(uc->lock);
1087 if (uwsgi_cache_set2(uc, key, keylen, val, vallen, expires, UWSGI_CACHE_FLAG_UPDATE|UWSGI_CACHE_FLAG_LOCAL|UWSGI_CACHE_FLAG_ABSEXPIRE)) {
1088 uwsgi_log("[cache-udp-server] unable to update cache\n");
1089 }
1090 uwsgi_rwunlock(uc->lock);
1091 }
1092 // cache del
1093 else if (buf[3] == 11) {
1094 uwsgi_wlock(uc->lock);
1095 if (uwsgi_cache_del2(uc, key, keylen, 0, UWSGI_CACHE_FLAG_LOCAL)) {
1096 uwsgi_log("[cache-udp-server] unable to update cache\n");
1097 }
1098 uwsgi_rwunlock(uc->lock);
1099 }
1100 }
1101
1102 return NULL;
1103 }
1104
cache_sweeper_free_items(struct uwsgi_cache * uc)1105 static uint64_t cache_sweeper_free_items(struct uwsgi_cache *uc) {
1106 uint64_t i;
1107 uint64_t freed_items = 0;
1108
1109 if (uc->no_expire || uc->purge_lru || uc->lazy_expire)
1110 return 0;
1111
1112 uwsgi_rlock(uc->lock);
1113 if (!uc->next_scan || uc->next_scan > (uint64_t)uwsgi.current_time) {
1114 uwsgi_rwunlock(uc->lock);
1115 return 0;
1116 }
1117 uwsgi_rwunlock(uc->lock);
1118
1119 // skip the first slot
1120 for (i = 1; i < uc->max_items; i++) {
1121 struct uwsgi_cache_item *uci = cache_item(i);
1122
1123 uwsgi_wlock(uc->lock);
1124 // we reset next scan time first, then we find the least
1125 // expiration time from those that are NOT expired yet.
1126 if (i == 1)
1127 uc->next_scan = 0;
1128
1129 if (uci->expires) {
1130 if (uci->expires <= (uint64_t)uwsgi.current_time) {
1131 uwsgi_cache_del2(uc, NULL, 0, i, UWSGI_CACHE_FLAG_LOCAL);
1132 freed_items++;
1133 } else if (!uc->next_scan || uc->next_scan > uci->expires) {
1134 uc->next_scan = uci->expires;
1135 }
1136 }
1137 uwsgi_rwunlock(uc->lock);
1138 }
1139
1140 return freed_items;
1141 }
1142
cache_sweeper_loop(void * ucache)1143 static void *cache_sweeper_loop(void *ucache) {
1144
1145 // block all signals
1146 sigset_t smask;
1147 sigfillset(&smask);
1148 pthread_sigmask(SIG_BLOCK, &smask, NULL);
1149
1150 if (!uwsgi.cache_expire_freq)
1151 uwsgi.cache_expire_freq = 3;
1152
1153 // remove expired cache items TODO use rb_tree timeouts
1154 for (;;) {
1155 struct uwsgi_cache *uc;
1156
1157 for (uc = (struct uwsgi_cache *)ucache; uc; uc = uc->next) {
1158 uint64_t freed_items = cache_sweeper_free_items(uc);
1159 if (uwsgi.cache_report_freed_items && freed_items)
1160 uwsgi_log("freed %llu items for cache \"%s\"\n", (unsigned long long)freed_items, uc->name);
1161 }
1162
1163 sleep(uwsgi.cache_expire_freq);
1164 }
1165
1166 return NULL;
1167 }
1168
uwsgi_cache_sync_all()1169 void uwsgi_cache_sync_all() {
1170
1171 struct uwsgi_cache *uc = uwsgi.caches;
1172 while(uc) {
1173 if (uc->store && (uwsgi.master_cycles == 0 || (uc->store_sync > 0 && (uwsgi.master_cycles % uc->store_sync) == 0))) {
1174 if (msync(uc->items, uc->filesize, MS_ASYNC)) {
1175 uwsgi_error("uwsgi_cache_sync_all()/msync()");
1176 }
1177 }
1178 uc = uc->next;
1179 }
1180 }
1181
uwsgi_cache_start_sweepers()1182 void uwsgi_cache_start_sweepers() {
1183 struct uwsgi_cache *uc = uwsgi.caches;
1184
1185 if (uwsgi.cache_no_expire)
1186 return;
1187
1188 int need_to_run = 0;
1189 while(uc) {
1190 if (!uc->no_expire && !uc->purge_lru && !uc->lazy_expire) {
1191 need_to_run = 1;
1192 break;
1193 }
1194 uc = uc->next;
1195 }
1196
1197 if (!need_to_run) return;
1198
1199 pthread_t cache_sweeper;
1200 if (pthread_create(&cache_sweeper, NULL, cache_sweeper_loop, uwsgi.caches)) {
1201 uwsgi_error("uwsgi_cache_start_sweepers()/pthread_create()");
1202 uwsgi_log("unable to run the cache sweeper!!!\n");
1203 return;
1204 }
1205 uwsgi_log("cache sweeper thread enabled\n");
1206 }
1207
uwsgi_cache_start_sync_servers()1208 void uwsgi_cache_start_sync_servers() {
1209
1210 struct uwsgi_cache *uc = uwsgi.caches;
1211 while(uc) {
1212 if (!uc->udp_servers) goto next;
1213 pthread_t cache_udp_server;
1214 if (pthread_create(&cache_udp_server, NULL, cache_udp_server_loop, (void *) uc)) {
1215 uwsgi_error("pthread_create()");
1216 uwsgi_log("unable to run the cache udp server !!!\n");
1217 }
1218 else {
1219 uwsgi_log("udp server thread enabled for cache \"%s\"\n", uc->name);
1220 }
1221 next:
1222 uc = uc->next;
1223 }
1224 }
1225
uwsgi_cache_create(char * arg)1226 struct uwsgi_cache *uwsgi_cache_create(char *arg) {
1227 struct uwsgi_cache *old_uc = NULL, *uc = uwsgi.caches;
1228 while(uc) {
1229 old_uc = uc;
1230 uc = uc->next;
1231 }
1232
1233 uc = uwsgi_calloc_shared(sizeof(struct uwsgi_cache));
1234 if (old_uc) {
1235 old_uc->next = uc;
1236 }
1237 else {
1238 uwsgi.caches = uc;
1239 }
1240
1241 // default (old-stye) cache ?
1242 if (!arg) {
1243 uc->name = "default";
1244 uc->name_len = strlen(uc->name);
1245 uc->blocksize = uwsgi.cache_blocksize;
1246 if (!uc->blocksize) uc->blocksize = UMAX16;
1247 uc->max_item_size = uc->blocksize;
1248 uc->max_items = uwsgi.cache_max_items;
1249 uc->blocks = uwsgi.cache_max_items;
1250 uc->keysize = 2048;
1251 uc->hashsize = UMAX16;
1252 uc->hash = uwsgi_hash_algo_get("djb33x");
1253 uc->store = uwsgi.cache_store;
1254 uc->nodes = uwsgi.cache_udp_node;
1255 uc->udp_servers = uwsgi.cache_udp_server;
1256 uc->store_sync = uwsgi.cache_store_sync;
1257 uc->use_last_modified = (uint8_t) uwsgi.cache_use_last_modified;
1258
1259 if (uwsgi.cache_sync) {
1260 uwsgi_string_new_list(&uc->sync_nodes, uwsgi.cache_sync);
1261 }
1262 }
1263 else {
1264 char *c_name = NULL;
1265 char *c_max_items = NULL;
1266 char *c_blocksize = NULL;
1267 char *c_blocks = NULL;
1268 char *c_hash = NULL;
1269 char *c_hashsize = NULL;
1270 char *c_keysize = NULL;
1271 char *c_store = NULL;
1272 char *c_store_sync = NULL;
1273 char *c_store_delete = NULL;
1274 char *c_nodes = NULL;
1275 char *c_sync = NULL;
1276 char *c_udp_servers = NULL;
1277 char *c_bitmap = NULL;
1278 char *c_use_last_modified = NULL;
1279 char *c_math_initial = NULL;
1280 char *c_ignore_full = NULL;
1281 char *c_purge_lru = NULL;
1282 char *c_lazy_expire = NULL;
1283 char *c_sweep_on_full = NULL;
1284 char *c_clear_on_full = NULL;
1285 char *c_no_expire = NULL;
1286
1287 if (uwsgi_kvlist_parse(arg, strlen(arg), ',', '=',
1288 "name", &c_name,
1289 "max_items", &c_max_items,
1290 "maxitems", &c_max_items,
1291 "items", &c_max_items,
1292 "blocksize", &c_blocksize,
1293 "blocks", &c_blocks,
1294 "hash", &c_hash,
1295 "hashsize", &c_hashsize,
1296 "hash_size", &c_hashsize,
1297 "keysize", &c_keysize,
1298 "key_size", &c_keysize,
1299 "store", &c_store,
1300 "store_sync", &c_store_sync,
1301 "storesync", &c_store_sync,
1302 "store_delete", &c_store_delete,
1303 "storedelete", &c_store_delete,
1304 "node", &c_nodes,
1305 "nodes", &c_nodes,
1306 "sync", &c_sync,
1307 "udp", &c_udp_servers,
1308 "udp_servers", &c_udp_servers,
1309 "udp_server", &c_udp_servers,
1310 "udpservers", &c_udp_servers,
1311 "udpserver", &c_udp_servers,
1312 "bitmap", &c_bitmap,
1313 "lastmod", &c_use_last_modified,
1314 "math_initial", &c_math_initial,
1315 "ignore_full", &c_ignore_full,
1316 "purge_lru", &c_purge_lru,
1317 "lru", &c_purge_lru,
1318 "lazy_expire", &c_lazy_expire,
1319 "lazy", &c_lazy_expire,
1320 "sweep_on_full", &c_sweep_on_full,
1321 "clear_on_full", &c_clear_on_full,
1322 "no_expire", &c_no_expire,
1323 NULL)) {
1324 uwsgi_log("unable to parse cache definition\n");
1325 exit(1);
1326 }
1327 if (!c_name) {
1328 uwsgi_log("you have to specify a cache name\n");
1329 exit(1);
1330 }
1331 if (!c_max_items) {
1332 uwsgi_log("you have to specify the maximum number of cache items\n");
1333 exit(1);
1334 }
1335
1336 uc->name = c_name;
1337 uc->name_len = strlen(c_name);
1338 uc->max_items = uwsgi_n64(c_max_items);
1339 if (!uc->max_items) {
1340 uwsgi_log("you have to specify the maximum number of cache items\n");
1341 exit(1);
1342 }
1343
1344 // defaults
1345 uc->blocks = uc->max_items;
1346 uc->blocksize = UMAX16;
1347 uc->keysize = 2048;
1348 uc->hashsize = UMAX16;
1349 uc->hash = uwsgi_hash_algo_get("djb33x");
1350
1351 // customize
1352 if (c_blocksize) uc->blocksize = uwsgi_n64(c_blocksize);
1353 if (!uc->blocksize) { uwsgi_log("invalid cache blocksize for \"%s\"\n", uc->name); exit(1); }
1354 // set the true max size of an item
1355 uc->max_item_size = uc->blocksize;
1356
1357 if (c_blocks) uc->blocks = uwsgi_n64(c_blocks);
1358 if (!uc->blocks) { uwsgi_log("invalid cache blocks for \"%s\"\n", uc->name); exit(1); }
1359 if (c_hash) uc->hash = uwsgi_hash_algo_get(c_hash);
1360 if (!uc->hash) { uwsgi_log("invalid cache hash for \"%s\"\n", uc->name); exit(1); }
1361 if (c_hashsize) uc->hashsize = uwsgi_n64(c_hashsize);
1362 if (!uc->hashsize) { uwsgi_log("invalid cache hashsize for \"%s\"\n", uc->name); exit(1); }
1363 if (c_keysize) uc->keysize = uwsgi_n64(c_keysize);
1364 if (!uc->keysize || uc->keysize >= UMAX16) { uwsgi_log("invalid cache keysize for \"%s\"\n", uc->name); exit(1); }
1365 if (c_bitmap) {
1366 uc->use_blocks_bitmap = 1;
1367 uc->max_item_size = uc->blocksize * uc->blocks;
1368 }
1369 if (c_use_last_modified) uc->use_last_modified = 1;
1370 if (c_ignore_full) uc->ignore_full = 1;
1371
1372 if (c_store_delete) uc->store_delete = 1;
1373
1374 if (c_math_initial) uc->math_initial = strtol(c_math_initial, NULL, 10);
1375
1376 if (c_lazy_expire) uc->lazy_expire = 1;
1377 if (c_sweep_on_full) {
1378 uc->sweep_on_full = uwsgi_n64(c_sweep_on_full);
1379 }
1380 if (c_clear_on_full) uc->clear_on_full = 1;
1381 if (c_no_expire) uc->no_expire = 1;
1382
1383 uc->store_sync = uwsgi.cache_store_sync;
1384 if (c_store_sync) { uc->store_sync = uwsgi_n64(c_store_sync); }
1385
1386 if (uc->blocks < uc->max_items) {
1387 uwsgi_log("invalid number of cache blocks for \"%s\", must be higher than max_items (%llu)\n", uc->name, uc->max_items);
1388 exit(1);
1389 }
1390
1391 uc->store = c_store;
1392
1393 if (c_nodes) {
1394 char *p, *ctx = NULL;
1395 uwsgi_foreach_token(c_nodes, ";", p, ctx) {
1396 uwsgi_string_new_list(&uc->nodes, p);
1397 }
1398 }
1399
1400 if (c_sync) {
1401 char *p, *ctx = NULL;
1402 uwsgi_foreach_token(c_sync, ";", p, ctx) {
1403 uwsgi_string_new_list(&uc->sync_nodes, p);
1404 }
1405 }
1406
1407 if (c_udp_servers) {
1408 char *p, *ctx = NULL;
1409 uwsgi_foreach_token(c_udp_servers, ";", p, ctx) {
1410 uwsgi_string_new_list(&uc->udp_servers, p);
1411 }
1412 }
1413
1414 if (c_purge_lru)
1415 uc->purge_lru = 1;
1416 }
1417
1418 uwsgi_cache_init(uc);
1419 return uc;
1420 }
1421
uwsgi_cache_by_name(char * name)1422 struct uwsgi_cache *uwsgi_cache_by_name(char *name) {
1423 struct uwsgi_cache *uc = uwsgi.caches;
1424 if (!name || *name == 0) {
1425 return uwsgi.caches;
1426 }
1427 while(uc) {
1428 if (uc->name && !strcmp(uc->name, name)) {
1429 return uc;
1430 }
1431 uc = uc->next;
1432 }
1433 return NULL;
1434 }
1435
uwsgi_cache_by_namelen(char * name,uint16_t len)1436 struct uwsgi_cache *uwsgi_cache_by_namelen(char *name, uint16_t len) {
1437 struct uwsgi_cache *uc = uwsgi.caches;
1438 if (!name || *name == 0) {
1439 return uwsgi.caches;
1440 }
1441 while(uc) {
1442 if (uc->name && !uwsgi_strncmp(uc->name, uc->name_len, name, len)) {
1443 return uc;
1444 }
1445 uc = uc->next;
1446 }
1447 return NULL;
1448 }
1449
uwsgi_cache_create_all()1450 void uwsgi_cache_create_all() {
1451
1452 if (uwsgi.cache_setup) return;
1453
1454 // register embedded hash algorithms
1455 uwsgi_hash_algo_register_all();
1456
1457 // setup default cache
1458 if (uwsgi.cache_max_items > 0) {
1459 uwsgi_cache_create(NULL);
1460 }
1461
1462 // setup new generation caches
1463 struct uwsgi_string_list *usl = uwsgi.cache2;
1464 while(usl) {
1465 uwsgi_cache_create(usl->value);
1466 usl = usl->next;
1467 }
1468
1469 uwsgi.cache_setup = 1;
1470 }
1471
1472 /*
1473 * uWSGI cache magic functions. They can be used by plugin to easily access local and remote caches
1474 *
1475 * they generate (when needed) a new memory buffer. Locking is automatically managed
1476 *
1477 * You have to free the returned memory !!!
1478 *
1479 */
1480
uwsgi_cache_magic_context_hook(char * key,uint16_t key_len,char * value,uint16_t vallen,void * data)1481 void uwsgi_cache_magic_context_hook(char *key, uint16_t key_len, char *value, uint16_t vallen, void *data) {
1482 struct uwsgi_cache_magic_context *ucmc = (struct uwsgi_cache_magic_context *) data;
1483
1484 if (!uwsgi_strncmp(key, key_len, "cmd", 3)) {
1485 ucmc->cmd = value;
1486 ucmc->cmd_len = vallen;
1487 return;
1488 }
1489
1490 if (!uwsgi_strncmp(key, key_len, "key", 3)) {
1491 ucmc->key = value;
1492 ucmc->key_len = vallen;
1493 return;
1494 }
1495
1496 if (!uwsgi_strncmp(key, key_len, "expires", 7)) {
1497 ucmc->expires = uwsgi_str_num(value, vallen);
1498 return;
1499 }
1500
1501 if (!uwsgi_strncmp(key, key_len, "size", 4)) {
1502 ucmc->size = uwsgi_str_num(value, vallen);
1503 return;
1504 }
1505
1506 if (!uwsgi_strncmp(key, key_len, "cache", 5)) {
1507 ucmc->cache = value;
1508 ucmc->cache_len = vallen;
1509 return;
1510 }
1511
1512 if (!uwsgi_strncmp(key, key_len, "status", 6)) {
1513 ucmc->status = value;
1514 ucmc->status_len = vallen;
1515 return;
1516 }
1517 }
1518
uwsgi_cache_prepare_magic_get(char * cache_name,uint16_t cache_name_len,char * key,uint16_t key_len)1519 static struct uwsgi_buffer *uwsgi_cache_prepare_magic_get(char *cache_name, uint16_t cache_name_len, char *key, uint16_t key_len) {
1520 struct uwsgi_buffer *ub = uwsgi_buffer_new(uwsgi.page_size);
1521 ub->pos = 4;
1522
1523 if (uwsgi_buffer_append_keyval(ub, "cmd", 3, "get", 3)) goto error;
1524 if (uwsgi_buffer_append_keyval(ub, "key", 3, key, key_len)) goto error;
1525 if (cache_name) {
1526 if (uwsgi_buffer_append_keyval(ub, "cache", 5, cache_name, cache_name_len)) goto error;
1527 }
1528
1529 return ub;
1530 error:
1531 uwsgi_buffer_destroy(ub);
1532 return NULL;
1533 }
1534
uwsgi_cache_prepare_magic_exists(char * cache_name,uint16_t cache_name_len,char * key,uint16_t key_len)1535 struct uwsgi_buffer *uwsgi_cache_prepare_magic_exists(char *cache_name, uint16_t cache_name_len, char *key, uint16_t key_len) {
1536 struct uwsgi_buffer *ub = uwsgi_buffer_new(uwsgi.page_size);
1537 ub->pos = 4;
1538
1539 if (uwsgi_buffer_append_keyval(ub, "cmd", 3, "exists", 6)) goto error;
1540 if (uwsgi_buffer_append_keyval(ub, "key", 3, key, key_len)) goto error;
1541 if (cache_name) {
1542 if (uwsgi_buffer_append_keyval(ub, "cache", 5, cache_name, cache_name_len)) goto error;
1543 }
1544
1545 return ub;
1546 error:
1547 uwsgi_buffer_destroy(ub);
1548 return NULL;
1549 }
1550
uwsgi_cache_prepare_magic_del(char * cache_name,uint16_t cache_name_len,char * key,uint16_t key_len)1551 struct uwsgi_buffer *uwsgi_cache_prepare_magic_del(char *cache_name, uint16_t cache_name_len, char *key, uint16_t key_len) {
1552 struct uwsgi_buffer *ub = uwsgi_buffer_new(uwsgi.page_size);
1553 ub->pos = 4;
1554
1555 if (uwsgi_buffer_append_keyval(ub, "cmd", 3, "del", 3)) goto error;
1556 if (uwsgi_buffer_append_keyval(ub, "key", 3, key, key_len)) goto error;
1557 if (cache_name) {
1558 if (uwsgi_buffer_append_keyval(ub, "cache", 5, cache_name, cache_name_len)) goto error;
1559 }
1560
1561 return ub;
1562 error:
1563 uwsgi_buffer_destroy(ub);
1564 return NULL;
1565 }
1566
uwsgi_cache_prepare_magic_clear(char * cache_name,uint16_t cache_name_len)1567 struct uwsgi_buffer *uwsgi_cache_prepare_magic_clear(char *cache_name, uint16_t cache_name_len) {
1568 struct uwsgi_buffer *ub = uwsgi_buffer_new(uwsgi.page_size);
1569 ub->pos = 4;
1570
1571 if (uwsgi_buffer_append_keyval(ub, "cmd", 3, "clear", 5)) goto error;
1572 if (cache_name) {
1573 if (uwsgi_buffer_append_keyval(ub, "cache", 5, cache_name, cache_name_len)) goto error;
1574 }
1575
1576 return ub;
1577 error:
1578 uwsgi_buffer_destroy(ub);
1579 return NULL;
1580 }
1581
1582
uwsgi_cache_prepare_magic_set(char * cache_name,uint16_t cache_name_len,char * key,uint16_t key_len,uint64_t len,uint64_t expires)1583 struct uwsgi_buffer *uwsgi_cache_prepare_magic_set(char *cache_name, uint16_t cache_name_len, char *key, uint16_t key_len, uint64_t len, uint64_t expires) {
1584 struct uwsgi_buffer *ub = uwsgi_buffer_new(uwsgi.page_size);
1585 ub->pos = 4;
1586
1587 if (uwsgi_buffer_append_keyval(ub, "cmd", 3, "set", 3)) goto error;
1588 if (uwsgi_buffer_append_keyval(ub, "key", 3, key, key_len)) goto error;
1589 if (uwsgi_buffer_append_keynum(ub, "size", 4, len)) goto error;
1590 if (expires > 0) {
1591 if (uwsgi_buffer_append_keynum(ub, "expires", 7, expires)) goto error;
1592 }
1593 if (uwsgi_buffer_append_keynum(ub, "size", 4, len)) goto error;
1594 if (cache_name) {
1595 if (uwsgi_buffer_append_keyval(ub, "cache", 5, cache_name, cache_name_len)) goto error;
1596 }
1597
1598 return ub;
1599 error:
1600 uwsgi_buffer_destroy(ub);
1601 return NULL;
1602 }
1603
uwsgi_cache_prepare_magic_update(char * cache_name,uint16_t cache_name_len,char * key,uint16_t key_len,uint64_t len,uint64_t expires)1604 struct uwsgi_buffer *uwsgi_cache_prepare_magic_update(char *cache_name, uint16_t cache_name_len, char *key, uint16_t key_len, uint64_t len, uint64_t expires) {
1605 struct uwsgi_buffer *ub = uwsgi_buffer_new(uwsgi.page_size);
1606 ub->pos = 4;
1607
1608 if (uwsgi_buffer_append_keyval(ub, "cmd", 3, "update", 6)) goto error;
1609 if (uwsgi_buffer_append_keyval(ub, "key", 3, key, key_len)) goto error;
1610 if (uwsgi_buffer_append_keynum(ub, "size", 4, len)) goto error;
1611 if (expires > 0) {
1612 if (uwsgi_buffer_append_keynum(ub, "expires", 7, expires)) goto error;
1613 }
1614 if (uwsgi_buffer_append_keynum(ub, "size", 4, len)) goto error;
1615 if (cache_name) {
1616 if (uwsgi_buffer_append_keyval(ub, "cache", 5, cache_name, cache_name_len)) goto error;
1617 }
1618
1619 return ub;
1620 error:
1621 uwsgi_buffer_destroy(ub);
1622 return NULL;
1623 }
1624
cache_magic_send_and_manage(int fd,struct uwsgi_buffer * ub,char * stream,uint64_t stream_len,int timeout,struct uwsgi_cache_magic_context * ucmc)1625 static int cache_magic_send_and_manage(int fd, struct uwsgi_buffer *ub, char *stream, uint64_t stream_len, int timeout, struct uwsgi_cache_magic_context *ucmc) {
1626 if (uwsgi_buffer_set_uh(ub, 111, 17)) return -1;
1627
1628 if (stream) {
1629 if (uwsgi_buffer_append(ub, stream, stream_len)) return -1;
1630 }
1631
1632 if (uwsgi_write_true_nb(fd, ub->buf, ub->pos, timeout)) return -1;
1633
1634 // ok now wait for the response, using the same buffer of the request
1635 // NOTE: after using a uwsgi_buffer in that way we basically destroy (even if we can safely free it)
1636 size_t rlen = ub->pos;
1637 if (uwsgi_read_with_realloc(fd, &ub->buf, &rlen, timeout, NULL, NULL)) return -1;
1638 // try to fix the buffer to maintain size info
1639 ub->pos = rlen;
1640
1641 // now we have a uwsgi dictionary with all of the options needed, let's parse it
1642 memset(ucmc, 0, sizeof(struct uwsgi_cache_magic_context));
1643 if (uwsgi_hooked_parse(ub->buf, rlen, uwsgi_cache_magic_context_hook, ucmc)) return -1;
1644 return 0;
1645 }
1646
uwsgi_cache_magic_get(char * key,uint16_t keylen,uint64_t * vallen,uint64_t * expires,char * cache)1647 char *uwsgi_cache_magic_get(char *key, uint16_t keylen, uint64_t *vallen, uint64_t *expires, char *cache) {
1648 struct uwsgi_cache_magic_context ucmc;
1649 struct uwsgi_cache *uc = NULL;
1650 char *cache_server = NULL;
1651 char *cache_name = NULL;
1652 uint16_t cache_name_len = 0;
1653 if (cache) {
1654 char *at = strchr(cache, '@');
1655 if (!at) {
1656 uc = uwsgi_cache_by_name(cache);
1657 }
1658 else {
1659 cache_server = at + 1;
1660 cache_name = cache;
1661 cache_name_len = at - cache;
1662 }
1663 }
1664 // use default (local) cache
1665 else {
1666 uc = uwsgi.caches;
1667 }
1668
1669 // we have a local cache !!!
1670 if (uc) {
1671 if (uc->purge_lru)
1672 uwsgi_wlock(uc->lock);
1673 else
1674 uwsgi_rlock(uc->lock);
1675 char *value = uwsgi_cache_get3(uc, key, keylen, vallen, expires);
1676 if (!value) {
1677 uwsgi_rwunlock(uc->lock);
1678 return NULL;
1679 }
1680 char *buf = uwsgi_malloc(*vallen);
1681 memcpy(buf, value, *vallen);
1682 uwsgi_rwunlock(uc->lock);
1683 return buf;
1684 }
1685
1686 // we have a remote one
1687 if (cache_server) {
1688 int fd = uwsgi_connect(cache_server, 0, 1);
1689 if (fd < 0) return NULL;
1690
1691 int ret = uwsgi.wait_write_hook(fd, uwsgi.socket_timeout);
1692 if (ret <= 0) {
1693 close(fd);
1694 return NULL;
1695 }
1696
1697 struct uwsgi_buffer *ub = uwsgi_cache_prepare_magic_get(cache_name, cache_name_len, key, keylen);
1698 if (!ub) {
1699 close(fd);
1700 return NULL;
1701 }
1702
1703 if (cache_magic_send_and_manage(fd, ub, NULL, 0, uwsgi.socket_timeout, &ucmc)) {
1704 close(fd);
1705 uwsgi_buffer_destroy(ub);
1706 return NULL;
1707 }
1708
1709 if (uwsgi_strncmp(ucmc.status, ucmc.status_len, "ok", 2)) {
1710 close(fd);
1711 uwsgi_buffer_destroy(ub);
1712 return NULL;
1713 }
1714
1715 if (ucmc.size == 0) {
1716 close(fd);
1717 uwsgi_buffer_destroy(ub);
1718 return NULL;
1719 }
1720
1721 // ok we now need to fix our buffer (if needed)
1722 if (ucmc.size > ub->pos) {
1723 char *tmp_buf = realloc(ub->buf, ucmc.size);
1724 if (!tmp_buf) {
1725 uwsgi_error("uwsgi_cache_magic_get()/realloc()");
1726 close(fd);
1727 uwsgi_buffer_destroy(ub);
1728 return NULL;
1729 }
1730 ub->buf = tmp_buf;
1731 }
1732
1733 // read the raw value from the socket
1734 if (uwsgi_read_whole_true_nb(fd, ub->buf, ucmc.size, uwsgi.socket_timeout)) {
1735 close(fd);
1736 uwsgi_buffer_destroy(ub);
1737 return NULL;
1738 }
1739
1740 // now the magic, we dereference the internal buffer and return it to the caller
1741 close(fd);
1742 char *value = ub->buf;
1743 ub->buf = NULL;
1744 uwsgi_buffer_destroy(ub);
1745 *vallen = ucmc.size;
1746 if (expires) {
1747 *expires = ucmc.expires;
1748 }
1749 return value;
1750
1751 }
1752
1753 return NULL;
1754 }
1755
uwsgi_cache_magic_exists(char * key,uint16_t keylen,char * cache)1756 int uwsgi_cache_magic_exists(char *key, uint16_t keylen, char *cache) {
1757 struct uwsgi_cache_magic_context ucmc;
1758 struct uwsgi_cache *uc = NULL;
1759 char *cache_server = NULL;
1760 char *cache_name = NULL;
1761 uint16_t cache_name_len = 0;
1762 if (cache) {
1763 char *at = strchr(cache, '@');
1764 if (!at) {
1765 uc = uwsgi_cache_by_name(cache);
1766 }
1767 else {
1768 cache_server = at + 1;
1769 cache_name = cache;
1770 cache_name_len = at - cache;
1771 }
1772 }
1773 // use default (local) cache
1774 else {
1775 uc = uwsgi.caches;
1776 }
1777
1778 // we have a local cache !!!
1779 if (uc) {
1780 uwsgi_rlock(uc->lock);
1781 if (!uwsgi_cache_exists2(uc, key, keylen)) {
1782 uwsgi_rwunlock(uc->lock);
1783 return 0;
1784 }
1785 uwsgi_rwunlock(uc->lock);
1786 return 1;
1787 }
1788
1789 // we have a remote one
1790 if (cache_server) {
1791 int fd = uwsgi_connect(cache_server, 0, 1);
1792 if (fd < 0) return 0;
1793
1794 int ret = uwsgi.wait_write_hook(fd, uwsgi.socket_timeout);
1795 if (ret <= 0) {
1796 close(fd);
1797 return 0;
1798 }
1799
1800 struct uwsgi_buffer *ub = uwsgi_cache_prepare_magic_exists(cache_name, cache_name_len, key, keylen);
1801 if (!ub) {
1802 close(fd);
1803 return 0;
1804 }
1805
1806 if (cache_magic_send_and_manage(fd, ub, NULL, 0, uwsgi.socket_timeout, &ucmc)) {
1807 close(fd);
1808 uwsgi_buffer_destroy(ub);
1809 return 0;
1810 }
1811
1812 if (uwsgi_strncmp(ucmc.status, ucmc.status_len, "ok", 2)) {
1813 close(fd);
1814 uwsgi_buffer_destroy(ub);
1815 return 0;
1816 }
1817
1818 close(fd);
1819 uwsgi_buffer_destroy(ub);
1820 return 1;
1821 }
1822
1823 return 0;
1824 }
1825
1826
uwsgi_cache_magic_set(char * key,uint16_t keylen,char * value,uint64_t vallen,uint64_t expires,uint64_t flags,char * cache)1827 int uwsgi_cache_magic_set(char *key, uint16_t keylen, char *value, uint64_t vallen, uint64_t expires, uint64_t flags, char *cache) {
1828 struct uwsgi_cache_magic_context ucmc;
1829 struct uwsgi_cache *uc = NULL;
1830 char *cache_server = NULL;
1831 char *cache_name = NULL;
1832 uint16_t cache_name_len = 0;
1833
1834 if (cache) {
1835 char *at = strchr(cache, '@');
1836 if (!at) {
1837 uc = uwsgi_cache_by_name(cache);
1838 }
1839 else {
1840 cache_server = at + 1;
1841 cache_name = cache;
1842 cache_name_len = at - cache;
1843 }
1844 }
1845 // use default (local) cache
1846 else {
1847 uc = uwsgi.caches;
1848 }
1849
1850 // we have a local cache !!!
1851 if (uc) {
1852 uwsgi_wlock(uc->lock);
1853 int ret = uwsgi_cache_set2(uc, key, keylen, value, vallen, expires, flags);
1854 uwsgi_rwunlock(uc->lock);
1855 return ret;
1856 }
1857
1858 // we have a remote one
1859 if (cache_server) {
1860 int fd = uwsgi_connect(cache_server, 0, 1);
1861 if (fd < 0) return -1;
1862
1863 int ret = uwsgi.wait_write_hook(fd, uwsgi.socket_timeout);
1864 if (ret <= 0) {
1865 close(fd);
1866 return -1;
1867 }
1868
1869 struct uwsgi_buffer *ub = NULL;
1870 if (flags & UWSGI_CACHE_FLAG_UPDATE) {
1871 ub = uwsgi_cache_prepare_magic_update(cache_name, cache_name_len, key, keylen, vallen, expires);
1872 }
1873 else {
1874 ub = uwsgi_cache_prepare_magic_set(cache_name, cache_name_len, key, keylen, vallen, expires);
1875 }
1876 if (!ub) {
1877 close(fd);
1878 return -1;
1879 }
1880
1881 if (cache_magic_send_and_manage(fd, ub, value, vallen, uwsgi.socket_timeout, &ucmc)) {
1882 close(fd);
1883 uwsgi_buffer_destroy(ub);
1884 return -1;
1885 }
1886
1887 if (uwsgi_strncmp(ucmc.status, ucmc.status_len, "ok", 2)) {
1888 close(fd);
1889 uwsgi_buffer_destroy(ub);
1890 return -1;
1891 }
1892
1893 close(fd);
1894 uwsgi_buffer_destroy(ub);
1895 return 0;
1896
1897 }
1898
1899 return -1;
1900
1901 }
1902
uwsgi_cache_magic_del(char * key,uint16_t keylen,char * cache)1903 int uwsgi_cache_magic_del(char *key, uint16_t keylen, char *cache) {
1904
1905 struct uwsgi_cache_magic_context ucmc;
1906 struct uwsgi_cache *uc = NULL;
1907 char *cache_server = NULL;
1908 char *cache_name = NULL;
1909 uint16_t cache_name_len = 0;
1910 if (cache) {
1911 char *at = strchr(cache, '@');
1912 if (!at) {
1913 uc = uwsgi_cache_by_name(cache);
1914 }
1915 else {
1916 cache_server = at + 1;
1917 cache_name = cache;
1918 cache_name_len = at - cache;
1919 }
1920 }
1921 // use default (local) cache
1922 else {
1923 uc = uwsgi.caches;
1924 }
1925
1926 // we have a local cache !!!
1927 if (uc) {
1928 uwsgi_wlock(uc->lock);
1929 if (uwsgi_cache_del2(uc, key, keylen, 0, 0)) {
1930 uwsgi_rwunlock(uc->lock);
1931 return -1;
1932 }
1933 uwsgi_rwunlock(uc->lock);
1934 return 0;
1935 }
1936
1937 // we have a remote one
1938 if (cache_server) {
1939 int fd = uwsgi_connect(cache_server, 0, 1);
1940 if (fd < 0) return -1;
1941
1942 int ret = uwsgi.wait_write_hook(fd, uwsgi.socket_timeout);
1943 if (ret <= 0) {
1944 close(fd);
1945 return -1;
1946 }
1947
1948 struct uwsgi_buffer *ub = uwsgi_cache_prepare_magic_del(cache_name, cache_name_len, key, keylen);
1949 if (!ub) {
1950 close(fd);
1951 return -1;
1952 }
1953
1954 if (cache_magic_send_and_manage(fd, ub, NULL, 0, uwsgi.socket_timeout, &ucmc)) {
1955 close(fd);
1956 uwsgi_buffer_destroy(ub);
1957 return -1;
1958 }
1959
1960 if (uwsgi_strncmp(ucmc.status, ucmc.status_len, "ok", 2)) {
1961 close(fd);
1962 uwsgi_buffer_destroy(ub);
1963 return -1;
1964 }
1965
1966 close(fd);
1967 uwsgi_buffer_destroy(ub);
1968 return 0;
1969 }
1970
1971 return -1 ;
1972
1973 }
1974
uwsgi_cache_magic_clear(char * cache)1975 int uwsgi_cache_magic_clear(char *cache) {
1976
1977 struct uwsgi_cache_magic_context ucmc;
1978 struct uwsgi_cache *uc = NULL;
1979 char *cache_server = NULL;
1980 char *cache_name = NULL;
1981 uint16_t cache_name_len = 0;
1982 if (cache) {
1983 char *at = strchr(cache, '@');
1984 if (!at) {
1985 uc = uwsgi_cache_by_name(cache);
1986 }
1987 else {
1988 cache_server = at + 1;
1989 cache_name = cache;
1990 cache_name_len = at - cache;
1991 }
1992 }
1993 // use default (local) cache
1994 else {
1995 uc = uwsgi.caches;
1996 }
1997
1998 // we have a local cache !!!
1999 if (uc) {
2000 uint64_t i;
2001 uwsgi_wlock(uc->lock);
2002 for (i = 1; i < uc->max_items; i++) {
2003 if (uwsgi_cache_del2(uc, NULL, 0, i, 0)) {
2004 uwsgi_rwunlock(uc->lock);
2005 return -1;
2006 }
2007 }
2008 uwsgi_rwunlock(uc->lock);
2009 return 0;
2010 }
2011
2012 // we have a remote one
2013 if (cache_server) {
2014 int fd = uwsgi_connect(cache_server, 0, 1);
2015 if (fd < 0) return -1;
2016
2017 int ret = uwsgi.wait_write_hook(fd, uwsgi.socket_timeout);
2018 if (ret <= 0) {
2019 close(fd);
2020 return -1;
2021 }
2022
2023 struct uwsgi_buffer *ub = uwsgi_cache_prepare_magic_clear(cache_name, cache_name_len);
2024 if (!ub) {
2025 close(fd);
2026 return -1;
2027 }
2028
2029 if (cache_magic_send_and_manage(fd, ub, NULL, 0, uwsgi.socket_timeout, &ucmc)) {
2030 close(fd);
2031 uwsgi_buffer_destroy(ub);
2032 return -1;
2033 }
2034
2035 if (uwsgi_strncmp(ucmc.status, ucmc.status_len, "ok", 2)) {
2036 close(fd);
2037 uwsgi_buffer_destroy(ub);
2038 return -1;
2039 }
2040
2041 close(fd);
2042 uwsgi_buffer_destroy(ub);
2043 return 0;
2044 }
2045
2046 return -1 ;
2047
2048 }
2049
2050
uwsgi_cache_sync_from_nodes(struct uwsgi_cache * uc)2051 void uwsgi_cache_sync_from_nodes(struct uwsgi_cache *uc) {
2052 struct uwsgi_string_list *usl = uc->sync_nodes;
2053 while(usl) {
2054 uwsgi_log("[cache-sync] getting cache dump from %s ...\n", usl->value);
2055 int fd = uwsgi_connect(usl->value, 0, 0);
2056 if (fd < 0) {
2057 uwsgi_log("[cache-sync] unable to connect to the cache server\n");
2058 goto next;
2059 }
2060
2061 struct uwsgi_buffer *ub = uwsgi_buffer_new(uwsgi.page_size + uc->filesize);
2062 ub->pos = 4;
2063 if (uc->name && uwsgi_buffer_append(ub, uc->name, uc->name_len)) {
2064 uwsgi_buffer_destroy(ub);
2065 close(fd);
2066 goto next;
2067 }
2068
2069 if (uwsgi_buffer_set_uh(ub, 111, 6)) {
2070 uwsgi_buffer_destroy(ub);
2071 close(fd);
2072 goto next;
2073 }
2074
2075 if (uwsgi_write_nb(fd, ub->buf, ub->pos, uwsgi.socket_timeout)) {
2076 uwsgi_buffer_destroy(ub);
2077 uwsgi_log("[cache-sync] unable to write to the cache server\n");
2078 close(fd);
2079 goto next;
2080 }
2081
2082 size_t rlen = ub->pos;
2083 if (uwsgi_read_with_realloc(fd, &ub->buf, &rlen, uwsgi.socket_timeout, NULL, NULL)) {
2084 uwsgi_buffer_destroy(ub);
2085 uwsgi_log("[cache-sync] unable to read from the cache server\n");
2086 close(fd);
2087 goto next;
2088 }
2089
2090 uwsgi_hooked_parse(ub->buf, rlen, cache_sync_hook, uc);
2091
2092 if (uwsgi_read_nb(fd, (char *) uc->items, uc->filesize, uwsgi.socket_timeout)) {
2093 uwsgi_buffer_destroy(ub);
2094 close(fd);
2095 uwsgi_log("[cache-sync] unable to read from the cache server\n");
2096 goto next;
2097 }
2098
2099 // reset the hashtable
2100 memset(uc->hashtable, 0, sizeof(uint64_t) * UMAX16);
2101 // re-fill the hashtable
2102 uwsgi_cache_fix(uc);
2103
2104 uwsgi_buffer_destroy(ub);
2105 close(fd);
2106 break;
2107 next:
2108 if (!usl->next) {
2109 exit(1);
2110 }
2111 uwsgi_log("[cache-sync] trying with the next sync node...\n");
2112 usl = usl->next;
2113 }
2114 }
2115
2116
uwsgi_cache_setup_nodes(struct uwsgi_cache * uc)2117 void uwsgi_cache_setup_nodes(struct uwsgi_cache *uc) {
2118 struct uwsgi_string_list *usl = uc->nodes;
2119 while(usl) {
2120 char *port = strchr(usl->value, ':');
2121 if (!port) {
2122 uwsgi_log("[cache-udp-node] invalid udp address: %s\n", usl->value);
2123 exit(1);
2124 }
2125 // no need to zero the memory, socket_to_in_addr will do that
2126 struct sockaddr_in *sin = uwsgi_malloc(sizeof(struct sockaddr_in));
2127 usl->custom = socket_to_in_addr(usl->value, port, 0, sin);
2128 usl->custom_ptr = sin;
2129 uwsgi_log("added udp node %s for cache \"%s\"\n", usl->value, uc->name);
2130 usl = usl->next;
2131 }
2132 }
2133
uwsgi_cache_keys(struct uwsgi_cache * uc,uint64_t * pos,struct uwsgi_cache_item ** uci)2134 struct uwsgi_cache_item *uwsgi_cache_keys(struct uwsgi_cache *uc, uint64_t *pos, struct uwsgi_cache_item **uci) {
2135
2136 // security check
2137 if (*pos >= uc->hashsize) return NULL;
2138 // iterate hashtable
2139 uint64_t orig_pos = *pos;
2140 for(;*pos<uc->hashsize;(*pos)++) {
2141 // get the cache slot
2142 uint64_t slot = uc->hashtable[*pos];
2143 if (*pos == orig_pos && *uci) {
2144 slot = (*uci)->next;
2145 }
2146 if (slot == 0) continue;
2147
2148 *uci = cache_item(slot);
2149 return *uci;
2150 }
2151
2152 (*pos)++;
2153 return NULL;
2154 }
2155
uwsgi_cache_rlock(struct uwsgi_cache * uc)2156 void uwsgi_cache_rlock(struct uwsgi_cache *uc) {
2157 uwsgi_rlock(uc->lock);
2158 }
2159
uwsgi_cache_rwunlock(struct uwsgi_cache * uc)2160 void uwsgi_cache_rwunlock(struct uwsgi_cache *uc) {
2161 uwsgi_rwunlock(uc->lock);
2162 }
2163
uwsgi_cache_item_key(struct uwsgi_cache_item * uci)2164 char *uwsgi_cache_item_key(struct uwsgi_cache_item *uci) {
2165 return uci->key;
2166 }
2167