1 /* -*-pgsql-c-*- */
2 /*
3 * pgpool: a language independent connection pool server for PostgreSQL
4 * written by Tatsuo Ishii
5 *
6 * Copyright (c) 2003-2020 PgPool Global Development Group
7 *
8 * Permission to use, copy, modify, and distribute this software and
9 * its documentation for any purpose and without fee is hereby
10 * granted, provided that the above copyright notice appear in all
11 * copies and that both that copyright notice and this permission
12 * notice appear in supporting documentation, and that the name of the
13 * author not be used in advertising or publicity pertaining to
14 * distribution of the software without specific, written prior
15 * permission. The author makes no representations about the
16 * suitability of this software for any purpose. It is provided "as
17 * is" without express or implied warranty.
18 *
19 * pool_memqcache.c: query cache on shmem or memcached
20 *
21 */
22 #define DATABASE_TO_OID_QUERY "SELECT oid FROM pg_database WHERE datname = '%s'"
23
24 #include "pool.h"
25
26 #include <stdio.h>
27 #include <string.h>
28 #include <unistd.h>
29 #include <sys/file.h>
30 #include <sys/stat.h>
31 #include <sys/types.h>
32 #include <fcntl.h>
33 #include <errno.h>
34 #include <string.h>
35 #include <stdlib.h>
36 #include <ctype.h>
37 #include <arpa/inet.h>
38 #include <dirent.h>
39
40 #ifdef USE_MEMCACHED
41 #include <libmemcached/memcached.h>
42 #endif
43
44 #include "auth/md5.h"
45 #include "pool_config.h"
46 #include "protocol/pool_proto_modules.h"
47 #include "parser/parsenodes.h"
48 #include "context/pool_session_context.h"
49 #include "query_cache/pool_memqcache.h"
50 #include "utils/pool_relcache.h"
51 #include "utils/pool_select_walker.h"
52 #include "utils/pool_stream.h"
53 #include "utils/pool_stream.h"
54 #include "utils/elog.h"
55 #include "utils/palloc.h"
56 #include "utils/memutils.h"
57
58 #ifdef USE_MEMCACHED
59 memcached_st *memc;
60 #endif
61
62 static char *encode_key(const char *s, char *buf, POOL_CONNECTION_POOL * backend);
63 #ifdef DEBUG
64 static void dump_cache_data(const char *data, size_t len);
65 #endif
66 static int pool_commit_cache(POOL_CONNECTION_POOL * backend, char *query, char *data, size_t datalen, int num_oids, int *oids);
67 static int send_cached_messages(POOL_CONNECTION * frontend, const char *qcache, int qcachelen);
68 static void send_message(POOL_CONNECTION * conn, char kind, int len, const char *data);
69 #ifdef USE_MEMCACHED
70 static int delete_cache_on_memcached(const char *key);
71 #endif
72 static int pool_get_dml_table_oid(int **oid);
73 static int pool_get_dropdb_table_oids(int **oids, int dboid);
74 static void pool_discard_dml_table_oid(void);
75 static void pool_invalidate_query_cache(int num_table_oids, int *table_oid, bool unlink, int dboid);
76 static int pool_get_database_oid(void);
77 static void pool_add_table_oid_map(POOL_CACHEKEY * cachkey, int num_table_oids, int *table_oids);
78 static void pool_reset_memqcache_buffer(bool reset_dml_oids);
79 static POOL_CACHEID * pool_add_item_shmem_cache(POOL_QUERY_HASH * query_hash, char *data, int size, time_t expire);
80 static POOL_CACHEID * pool_find_item_on_shmem_cache(POOL_QUERY_HASH * query_hash);
81 static char *pool_get_item_shmem_cache(POOL_QUERY_HASH * query_hash, int *size, int *sts);
82 static POOL_QUERY_CACHE_ARRAY * pool_add_query_cache_array(POOL_QUERY_CACHE_ARRAY * cache_array, POOL_TEMP_QUERY_CACHE * cache);
83 static void pool_add_temp_query_cache(POOL_TEMP_QUERY_CACHE * temp_cache, char kind, char *data, int data_len);
84 static void pool_add_oids_temp_query_cache(POOL_TEMP_QUERY_CACHE * temp_cache, int num_oids, int *oids);
85 static POOL_INTERNAL_BUFFER * pool_create_buffer(void);
86 static void pool_discard_buffer(POOL_INTERNAL_BUFFER * buffer);
87 static void pool_add_buffer(POOL_INTERNAL_BUFFER * buffer, void *data, size_t len);
88 static void *pool_get_buffer(POOL_INTERNAL_BUFFER * buffer, size_t *len);
89 #ifdef NOT_USED
90 static char *pool_get_buffer_pointer(POOL_INTERNAL_BUFFER * buffer);
91 #endif
92 static char *pool_get_current_cache_buffer(size_t *len);
93 static size_t pool_get_buffer_length(POOL_INTERNAL_BUFFER * buffer);
94 static void pool_check_and_discard_cache_buffer(int num_oids, int *oids);
95
96 static void pool_set_memqcache_blocks(int num_blocks);
97 static int pool_get_memqcache_blocks(void);
98 static void *pool_memory_cache_address(void);
99 static void pool_reset_fsmm(size_t size);
100 static void *pool_fsmm_address(void);
101 static void pool_update_fsmm(POOL_CACHE_BLOCKID blockid, size_t free_space);
102 static POOL_CACHE_BLOCKID pool_get_block(size_t free_space);
103 static POOL_CACHE_ITEM_HEADER * pool_cache_item_header(POOL_CACHEID * cacheid);
104 static int pool_init_cache_block(POOL_CACHE_BLOCKID blockid);
105 #if NOT_USED
106 static void pool_wipe_out_cache_block(POOL_CACHE_BLOCKID blockid);
107 #endif
108 static int pool_delete_item_shmem_cache(POOL_CACHEID * cacheid);
109 static char *block_address(int blockid);
110 static POOL_CACHE_ITEM_POINTER * item_pointer(char *block, int i);
111 static POOL_CACHE_ITEM_HEADER * item_header(char *block, int i);
112 static POOL_CACHE_BLOCKID pool_reuse_block(void);
113 #ifdef SHMEMCACHE_DEBUG
114 static void dump_shmem_cache(POOL_CACHE_BLOCKID blockid);
115 #endif
116
117 static int pool_hash_reset(int nelements);
118 static int pool_hash_insert(POOL_QUERY_HASH * key, POOL_CACHEID * cacheid, bool update);
119 static uint32 create_hash_key(POOL_QUERY_HASH * key);
120 static volatile POOL_HASH_ELEMENT *get_new_hash_element(void);
121 static void put_back_hash_element(volatile POOL_HASH_ELEMENT * element);
122 static bool is_free_hash_element(void);
123 static void inject_cached_message(POOL_CONNECTION * backend, char *qcache, int qcachelen);
124
125 /*
126 * if true, shared memory is locked in this process now.
127 */
128 static int is_shmem_locked;
129
130 /*
131 * Connect to Memcached
132 */
133 int
memcached_connect(void)134 memcached_connect(void)
135 {
136 char *memqcache_memcached_host;
137 int memqcache_memcached_port;
138 #ifdef USE_MEMCACHED
139 memcached_server_st *servers;
140 memcached_return rc;
141
142 /* Already connected? */
143 if (memc)
144 {
145 return 0;
146 }
147 #endif
148
149 memqcache_memcached_host = pool_config->memqcache_memcached_host;
150 memqcache_memcached_port = pool_config->memqcache_memcached_port;
151
152 ereport(DEBUG1,
153 (errmsg("connecting to memcached on Host:\"%s:%d\"", memqcache_memcached_host, memqcache_memcached_port)));
154
155 #ifdef USE_MEMCACHED
156 memc = memcached_create(NULL);
157 servers = memcached_server_list_append(NULL,
158 memqcache_memcached_host,
159 memqcache_memcached_port,
160 &rc);
161
162 rc = memcached_server_push(memc, servers);
163 if (rc != MEMCACHED_SUCCESS)
164 {
165 ereport(WARNING,
166 (errmsg("failed to connect to memcached, server push error:\"%s\"\n", memcached_strerror(memc, rc))));
167 memc = (memcached_st *) - 1;
168 return -1;
169 }
170 memcached_server_list_free(servers);
171 #else
172 ereport(WARNING,
173 (errmsg("failed to connect to memcached, memcached support is not enabled")));
174 return -1;
175 #endif
176 return 0;
177 }
178
179 /*
180 * Disconnect to Memcached
181 */
182 void
memcached_disconnect(void)183 memcached_disconnect(void)
184 {
185 #ifdef USE_MEMCACHED
186 if (!memc)
187 {
188 return;
189 }
190 memcached_free(memc);
191 #else
192 ereport(WARNING,
193 (errmsg("failed to disconnect from memcached, memcached support is not enabled")));
194 #endif
195 }
196
197 /*
198 * Register buffer data for query cache in memory cache
199 */
200 void
memqcache_register(char kind,POOL_CONNECTION * frontend,char * data,int data_len)201 memqcache_register(char kind,
202 POOL_CONNECTION * frontend,
203 char *data,
204 int data_len)
205 {
206 POOL_TEMP_QUERY_CACHE *cache;
207 POOL_SESSION_CONTEXT *session_context;
208 POOL_QUERY_CONTEXT *query_context;
209
210 cache = pool_get_current_cache();
211
212 if (cache == NULL)
213 {
214 session_context = pool_get_session_context(true);
215
216 if (session_context && pool_is_query_in_progress())
217 {
218 char *query = pool_get_query_string();
219
220 query_context = session_context->query_context;
221
222 if (query)
223 query_context->temp_cache = pool_create_temp_query_cache(query);
224 }
225 }
226
227 pool_add_temp_query_cache(cache, kind, data, data_len);
228 }
229
230 /*
231 * Commit SELECT results to cache storage.
232 */
233 static int
pool_commit_cache(POOL_CONNECTION_POOL * backend,char * query,char * data,size_t datalen,int num_oids,int * oids)234 pool_commit_cache(POOL_CONNECTION_POOL * backend, char *query, char *data, size_t datalen, int num_oids, int *oids)
235 {
236 #ifdef USE_MEMCACHED
237 memcached_return rc;
238 #endif
239 POOL_CACHEKEY cachekey;
240 char tmpkey[MAX_KEY];
241 time_t memqcache_expire;
242
243 /*
244 * get_buflen() will return -1 if query result exceeds memqcache_maxcache
245 */
246 if (datalen == -1)
247 {
248 return -1;
249 }
250
251 /* query disabled */
252 if (strlen(query) <= 0)
253 {
254 return -1;
255 }
256 ereport(DEBUG1,
257 (errmsg("commiting SELECT results to cache storage"),
258 errdetail("Query=\"%s\"", query)));
259 #ifdef DEBUG
260 dump_cache_data(data, datalen);
261 #endif
262
263
264 /* encode md5key for memcached */
265 encode_key(query, tmpkey, backend);
266 ereport(DEBUG2,
267 (errmsg("commiting SELECT results to cache storage"),
268 errdetail("search key : \"%s\"", tmpkey)));
269
270 memcpy(cachekey.hashkey, tmpkey, 32);
271
272 memqcache_expire = pool_config->memqcache_expire;
273 ereport(DEBUG1,
274 (errmsg("commiting SELECT results to cache storage"),
275 errdetail("memqcache_expire = %ld", memqcache_expire)));
276
277 if (pool_is_shmem_cache())
278 {
279 POOL_CACHEID *cacheid;
280 POOL_QUERY_HASH query_hash;
281
282 memcpy(query_hash.query_hash, tmpkey, sizeof(query_hash.query_hash));
283
284 cacheid = pool_hash_search(&query_hash);
285
286 if (cacheid != NULL)
287 {
288 ereport(DEBUG1,
289 (errmsg("commiting SELECT results to cache storage"),
290 errdetail("item already exists")));
291
292 return 0;
293 }
294 else
295 {
296 cacheid = pool_add_item_shmem_cache(&query_hash, data, datalen,memqcache_expire);
297 if (cacheid == NULL)
298 {
299 ereport(LOG,
300 (errmsg("failed to add item to shmem cache")));
301 return -1;
302 }
303 else
304 {
305 ereport(DEBUG2,
306 (errmsg("commiting SELECT results to cache storage"),
307 errdetail("blockid: %d itemid: %d",
308 cacheid->blockid, cacheid->itemid)));
309 }
310 cachekey.cacheid.blockid = cacheid->blockid;
311 cachekey.cacheid.itemid = cacheid->itemid;
312 }
313 }
314
315 #ifdef USE_MEMCACHED
316 else
317 {
318 rc = memcached_set(memc, tmpkey, 32,
319 data, datalen, (time_t) memqcache_expire, 0);
320 if (rc != MEMCACHED_SUCCESS)
321 {
322 ereport(WARNING,
323 (errmsg("cache commit failed with error:\"%s\"", memcached_strerror(memc, rc))));
324 return -1;
325 }
326 ereport(DEBUG1,
327 (errmsg("commiting SELECT results to cache storage"),
328 errdetail("set cache succeeded")));
329 }
330 #endif
331
332 /*
333 * Register cache id to oid map
334 */
335 pool_add_table_oid_map(&cachekey, num_oids, oids);
336
337 return 0;
338 }
339
340 /*
341 * Commit SELECT system catalog results to cache storage.
342 */
343 int
pool_catalog_commit_cache(POOL_CONNECTION_POOL * backend,char * query,char * data,size_t datalen)344 pool_catalog_commit_cache(POOL_CONNECTION_POOL * backend, char *query, char *data, size_t datalen)
345 {
346 #ifdef USE_MEMCACHED
347 memcached_return rc;
348 #endif
349 POOL_CACHEKEY cachekey;
350 char tmpkey[MAX_KEY];
351 time_t memqcache_expire;
352
353 /*
354 * get_buflen() will return -1 if query result exceeds memqcache_maxcache
355 */
356 if (datalen == -1)
357 {
358 return -1;
359 }
360
361 /* query disabled */
362 if (strlen(query) <= 0)
363 {
364 return -1;
365 }
366 ereport(DEBUG1,
367 (errmsg("commiting relation cache to cache storage"),
368 errdetail("Query=\"%s\"", query)));
369
370 #ifdef DEBUG
371 dump_cache_data(data, datalen);
372 #endif
373
374 /* encode md5key for memcached */
375 encode_key(query, tmpkey, backend);
376 ereport(DEBUG2,
377 (errmsg("commiting relation cache to cache storage"),
378 errdetail("search key : \"%s\"", tmpkey)));
379
380 memcpy(cachekey.hashkey, tmpkey, 32);
381
382 memqcache_expire = pool_config->relcache_expire;
383 ereport(DEBUG1,
384 (errmsg("commiting relation cache to cache storage"),
385 errdetail("memqcache_expire = %ld", memqcache_expire)));
386
387 if (pool_is_shmem_cache())
388 {
389 POOL_CACHEID *cacheid;
390 POOL_QUERY_HASH query_hash;
391
392 memcpy(query_hash.query_hash, tmpkey, sizeof(query_hash.query_hash));
393
394 cacheid = pool_hash_search(&query_hash);
395
396 if (cacheid != NULL)
397 {
398 ereport(DEBUG1,
399 (errmsg("commiting relation cache to cache storage"),
400 errdetail("item already exists")));
401
402 return 0;
403 }
404 else
405 {
406 cacheid = pool_add_item_shmem_cache(&query_hash, data, datalen, memqcache_expire);
407 if (cacheid == NULL)
408 {
409 ereport(LOG,
410 (errmsg("failed to add item to shmem cache")));
411 return -1;
412 }
413 else
414 {
415 ereport(DEBUG2,
416 (errmsg("commiting relation cache to cache storage"),
417 errdetail("blockid: %d itemid: %d",
418 cacheid->blockid, cacheid->itemid)));
419 }
420 cachekey.cacheid.blockid = cacheid->blockid;
421 cachekey.cacheid.itemid = cacheid->itemid;
422 }
423 }
424
425 #ifdef USE_MEMCACHED
426 else
427 {
428 rc = memcached_set(memc, tmpkey, 32,
429 data, datalen, (time_t) memqcache_expire, 0);
430 if (rc != MEMCACHED_SUCCESS)
431 {
432 ereport(WARNING,
433 (errmsg("cache commit failed with error:\"%s\"", memcached_strerror(memc, rc))));
434 return -1;
435 }
436 ereport(DEBUG1,
437 (errmsg("commiting relation cache to cache storage"),
438 errdetail("set cache succeeded")));
439 }
440 #endif
441 return 0;
442 }
443
444
445 /*
446 * Fetch from memory cache.
447 * Return:
448 * 0: fetch success,
449 * 1: not found
450 */
451 int
pool_fetch_cache(POOL_CONNECTION_POOL * backend,const char * query,char ** buf,size_t * len)452 pool_fetch_cache(POOL_CONNECTION_POOL * backend, const char *query, char **buf, size_t *len)
453 {
454 char *ptr;
455 char tmpkey[MAX_KEY];
456 int sts;
457 char *p;
458
459 if (strlen(query) <= 0)
460 ereport(ERROR,
461 (errmsg("fetching from cache storage, no query")));
462
463 /* encode md5key for memcached */
464 encode_key(query, tmpkey, backend);
465 ereport(DEBUG1,
466 (errmsg("fetching from cache storage"),
467 errdetail("search key \"%s\"", tmpkey)));
468
469
470 if (pool_is_shmem_cache())
471 {
472 POOL_QUERY_HASH query_hash;
473 int mylen;
474
475 memcpy(query_hash.query_hash, tmpkey, sizeof(query_hash.query_hash));
476
477 ptr = pool_get_item_shmem_cache(&query_hash, &mylen, &sts);
478 if (ptr == NULL)
479 {
480 ereport(DEBUG1,
481 (errmsg("fetching from cache storage"),
482 errdetail("cache not found on shared memory")));
483
484 return 1;
485 }
486 *len = mylen;
487 }
488 #ifdef USE_MEMCACHED
489 else
490 {
491 memcached_return rc;
492 unsigned int flags;
493
494 ptr = memcached_get(memc, tmpkey, strlen(tmpkey), len, &flags, &rc);
495
496 if (rc != MEMCACHED_SUCCESS)
497 {
498 if (rc != MEMCACHED_NOTFOUND)
499 {
500 ereport(LOG,
501 (errmsg("fetching from cache storage, memcached_get failed with error: \"%s\"", memcached_strerror(memc, rc))));
502
503 /*
504 * Turn off memory cache support to prevent future errors.
505 */
506 pool_config->memory_cache_enabled = 0;
507 /* Behave as if cache not found */
508 return 1;
509 }
510 else
511 {
512 /* Not found */
513 ereport(DEBUG1,
514 (errmsg("fetching from cache storage"),
515 errdetail("cache item not found for key: \"%s\" and query:\"%s\"", tmpkey, query)));
516 return 1;
517 }
518 }
519 }
520 #else
521 else
522 {
523 ereport(ERROR,
524 (errmsg("memcached support is not enabled")));
525 }
526 #endif
527
528 p = palloc(*len);
529
530 memcpy(p, ptr, *len);
531
532 if (!pool_is_shmem_cache())
533 {
534 free(ptr);
535 }
536
537 ereport(DEBUG1,
538 (errmsg("fetching from cache storage"),
539 errdetail("query=\"%s\" len:%zd", query, *len)));
540 #ifdef DEBUG
541 dump_cache_data(p, *len);
542 #endif
543
544 *buf = p;
545
546 return 0;
547 }
548
549 /*
550 * encode key.
551 * create cache key as md5(username + query string + database name)
552 */
553 static char *
encode_key(const char * s,char * buf,POOL_CONNECTION_POOL * backend)554 encode_key(const char *s, char *buf, POOL_CONNECTION_POOL * backend)
555 {
556 char *strkey;
557 int u_length;
558 int d_length;
559 int q_length;
560 int length;
561
562 u_length = strlen(backend->info->user);
563 ereport(DEBUG1,
564 (errmsg("memcache encode key"),
565 errdetail("username: \"%s\" database_name: \"%s\"", backend->info->user, backend->info->database)));
566
567 d_length = strlen(backend->info->database);
568
569 q_length = strlen(s);
570 ereport(DEBUG1,
571 (errmsg("memcache encode key"),
572 errdetail("query: \"%s\"", s)));
573
574 length = u_length + d_length + q_length + 1;
575
576 strkey = (char *) palloc(sizeof(char) * length);
577
578 snprintf(strkey, length, "%s%s%s", backend->info->user, s, backend->info->database);
579
580 pool_md5_hash(strkey, strlen(strkey), buf);
581 ereport(DEBUG1,
582 (errmsg("memcache encode key"),
583 errdetail("`%s' -> `%s'", strkey, buf)));
584 pfree(strkey);
585 return buf;
586 }
587
588 #ifdef DEBUG
589 /*
590 * dump cache data
591 */
592 static void
dump_cache_data(const char * data,size_t len)593 dump_cache_data(const char *data, size_t len)
594 {
595 int i;
596 int plen;
597
598
599 fprintf(stderr, "shmem: len = %zd\n", len);
600
601 while (len > 0)
602 {
603 fprintf(stderr, "shmem: kind:%c\n", *data++);
604 len--;
605 memmove(&plen, data, 4);
606 len -= 4;
607 data += 4;
608 plen = ntohl(plen);
609 fprintf(stderr, "shmem: len:%d\n", plen);
610 plen -= 4;
611
612 fprintf(stderr, "shmem: ");
613 for (i = 0; i < plen; i++)
614 {
615 fprintf(stderr, "%02x ", (unsigned char) (*data++));
616 len--;
617 }
618 fprintf(stderr, "\n");
619 }
620 }
621 #endif
622
623 /*
624 * send cached messages
625 */
626 static int
send_cached_messages(POOL_CONNECTION * frontend,const char * qcache,int qcachelen)627 send_cached_messages(POOL_CONNECTION * frontend, const char *qcache, int qcachelen)
628 {
629 int msg = 0;
630 int i = 0;
631 int is_prepared_stmt = 0;
632 int len;
633 const char *p;
634
635 while (i < qcachelen)
636 {
637 char tmpkind;
638 int tmplen;
639
640 tmpkind = qcache[i];
641 i++;
642
643 memcpy(&tmplen, qcache + i, sizeof(tmplen));
644 i += sizeof(tmplen);
645 len = ntohl(tmplen);
646 p = qcache + i;
647 i += len - sizeof(tmplen);
648
649 /* No need to cache PARSE and BIND responses */
650 if (tmpkind == '1' || tmpkind == '2')
651 {
652 is_prepared_stmt = 1;
653 continue;
654 }
655
656 /*
657 * In the prepared statement execution, there is no need to send 'T'
658 * response to the frontend.
659 */
660 if (is_prepared_stmt && tmpkind == 'T')
661 {
662 continue;
663 }
664
665 /* send message to frontend */
666 ereport(DEBUG1,
667 (errmsg("memcache: sending cached messages: '%c' len: %d", tmpkind, len)));
668 send_message(frontend, tmpkind, len, p);
669
670 msg++;
671 }
672
673 return msg;
674 }
675
676 /*
677 * send message to frontend
678 */
679 static void
send_message(POOL_CONNECTION * conn,char kind,int len,const char * data)680 send_message(POOL_CONNECTION * conn, char kind, int len, const char *data)
681 {
682 ereport(DEBUG2,
683 (errmsg("memcache: sending messages: kind '%c', len=%d, data=%p", kind, len, data)));
684
685 pool_write(conn, &kind, 1);
686
687 len = htonl(len);
688 pool_write(conn, &len, sizeof(len));
689
690 len = ntohl(len);
691 pool_write(conn, (void *) data, len - sizeof(len));
692 }
693
694 #ifdef USE_MEMCACHED
695 /*
696 * delete query cache on memcached
697 */
698 static int
delete_cache_on_memcached(const char * key)699 delete_cache_on_memcached(const char *key)
700 {
701
702 memcached_return rc;
703
704 ereport(DEBUG2,
705 (errmsg("memcache: deleteing cache on memcached with key: \"%s\"", key)));
706
707
708 /* delete cache data on memcached. key is md5 hash query */
709 rc = memcached_delete(memc, key, 32, (time_t) 0);
710
711 /* delete cache data on memcached is failed */
712 if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_BUFFERED)
713 {
714 ereport(LOG,
715 (errmsg("failed to delete cache on memcached, error:\"%s\"", memcached_strerror(memc, rc))));
716 return 0;
717 }
718 return 1;
719
720 }
721 #endif
722
723 /*
724 * Fetch SELECT data from cache if possible.
725 */
726 POOL_STATUS
pool_fetch_from_memory_cache(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend,char * contents,bool * foundp)727 pool_fetch_from_memory_cache(POOL_CONNECTION * frontend,
728 POOL_CONNECTION_POOL * backend,
729 char *contents, bool *foundp)
730 {
731 char *qcache;
732 size_t qcachelen;
733 int sts;
734 pool_sigset_t oldmask;
735
736 ereport(DEBUG1,
737 (errmsg("pool_fetch_from_memory_cache called")));
738
739 *foundp = false;
740
741 POOL_SETMASK2(&BlockSig, &oldmask);
742 pool_shmem_lock();
743
744 PG_TRY();
745 {
746 sts = pool_fetch_cache(backend, contents, &qcache, &qcachelen);
747 }
748 PG_CATCH();
749 {
750 pool_shmem_unlock();
751 POOL_SETMASK(&oldmask);
752 PG_RE_THROW();
753 }
754 PG_END_TRY();
755
756 pool_shmem_unlock();
757 POOL_SETMASK(&oldmask);
758
759 if (sts != 0)
760 /* Cache not found */
761 return POOL_CONTINUE;
762
763 /*
764 * Cache found. If we are doing extended query and in streaming
765 * replication mode, we need to retrieve any responses from backend and
766 * forward them to frontend.
767 */
768 if (pool_is_doing_extended_query_message() && SL_MODE)
769 {
770 POOL_SESSION_CONTEXT *session_context;
771 POOL_CONNECTION *target_backend;
772
773 ereport(DEBUG1,
774 (errmsg("memcache: injecting cache data")));
775
776 session_context = pool_get_session_context(true);
777 target_backend = CONNECTION(backend, session_context->load_balance_node_id);
778 inject_cached_message(target_backend, qcache, qcachelen);
779 }
780 else
781 {
782 /*
783 * Send each messages to frontend
784 */
785 send_cached_messages(frontend, qcache, qcachelen);
786 }
787
788 pfree(qcache);
789
790 /*
791 * Send a "READY FOR QUERY" if not in extended query.
792 */
793 if (!pool_is_doing_extended_query_message() && MAJOR(backend) == PROTO_MAJOR_V3)
794 {
795 signed char state;
796
797 /*
798 * We keep previous transaction state.
799 */
800 state = MASTER(backend)->tstate;
801 send_message(frontend, 'Z', 5, (char *) &state);
802 }
803
804 if (!pool_is_doing_extended_query_message() || !SL_MODE)
805 {
806 if (pool_flush(frontend))
807 {
808 return POOL_END;
809 }
810 }
811
812 *foundp = true;
813
814 if (pool_config->log_per_node_statement)
815 ereport(LOG,
816 (errmsg("fetch from memory cache"),
817 errdetail("query result fetched from cache. statement: %s", contents)));
818
819 ereport(DEBUG1,
820 (errmsg("fetch from memory cache"),
821 errdetail("query result found in the query cache, %s", contents)));
822
823 return POOL_CONTINUE;
824 }
825
826 /*
827 * Simple and rough (thus unreliable) check if the query is likely
828 * SELECT. Just check if the query starts with SELECT or WITH. This
829 * can be used before parse tree is available.
830 */
831 bool
pool_is_likely_select(char * query)832 pool_is_likely_select(char *query)
833 {
834 bool do_continue = false;
835
836 if (query == NULL)
837 return false;
838
839 if (pool_config->ignore_leading_white_space)
840 {
841 /* Ignore leading white spaces */
842 while (*query && isspace(*query))
843 query++;
844 }
845 if (!*query)
846 {
847 return false;
848 }
849
850 /*
851 * Get rid of head comment. It is sure that the query is in correct
852 * format, because the parser has rejected bad queries such as the one
853 * with not-ended comment.
854 */
855 while (*query)
856 {
857 /* Ignore spaces and return marks */
858 do_continue = false;
859 while (*query && isspace(*query))
860 {
861 query++;
862 do_continue = true;
863 }
864 if (do_continue)
865 {
866 continue;
867 }
868
869 while (*query && !strncmp(query, "\n", 2))
870 {
871 query++;
872 do_continue = true;
873 }
874 if (do_continue)
875 {
876 query += 2;
877 continue;
878 }
879
880 /* Ignore comments like C */
881 if (!strncmp(query, "/*", 2))
882 {
883 while (*query && strncmp(query, "*/", 2))
884 query++;
885
886 query += 2;
887 continue;
888 }
889
890 /* Ignore SQL comments */
891 if (!strncmp(query, "--", 2))
892 {
893 while (*query && strncmp(query, "\n", 2))
894 query++;
895
896 query += 2;
897 continue;
898 }
899
900 if (!strncasecmp(query, "SELECT", 6) || !strncasecmp(query, "WITH", 4))
901 {
902 return true;
903 }
904
905 query++;
906 }
907
908 return false;
909 }
910
911 /*
912 * Return true if SELECT can be cached. "node" is the parse tree for
913 * the query and "query" is the query string.
914 * The query must be SELECT or WITH.
915 */
916 bool
pool_is_allow_to_cache(Node * node,char * query)917 pool_is_allow_to_cache(Node *node, char *query)
918 {
919 int i = 0;
920 int num_oids = -1;
921 SelectContext ctx;
922
923 /*
924 * If NO QUERY CACHE comment exists, do not cache.
925 */
926 if (!strncasecmp(query, NO_QUERY_CACHE, NO_QUERY_CACHE_COMMENT_SZ))
927 return false;
928
929 /*
930 * Check black table list first.
931 */
932 if (pool_config->num_black_memqcache_table_list > 0)
933 {
934 /*
935 * Extract oids in from clause of SELECT, and check if SELECT to them
936 * could be cached.
937 */
938 num_oids = pool_extract_table_oids_from_select_stmt(node, &ctx);
939 if (num_oids > 0)
940 {
941 for (i = 0; i < num_oids; i++)
942 {
943 ereport(DEBUG1,
944 (errmsg("memcache: checking if node is allowed to cache: check table_names[%d] = \"%s\"", i, ctx.table_names[i])));
945 if (pool_is_table_in_black_list(ctx.table_names[i]) == true)
946 {
947 ereport(DEBUG1,
948 (errmsg("memcache: node is not allowed to cache")));
949 return false;
950 }
951 }
952 }
953 }
954
955 /* SELECT INTO or SELECT FOR SHARE or UPDATE cannot be cached */
956 if (pool_has_insertinto_or_locking_clause(node))
957 return false;
958
959 /*
960 * If SELECT uses non immutable functions, it's not allowed to cache.
961 */
962 if (pool_has_non_immutable_function_call(node))
963 return false;
964
965 /*
966 * If SELECT uses temporary tables it's not allowed to cache.
967 */
968 if (pool_config->check_temp_table && pool_has_temp_table(node))
969 return false;
970
971 /*
972 * If SELECT uses system catalogs, it's not allowed to cache.
973 */
974 if (pool_has_system_catalog(node))
975 return false;
976
977 /*
978 * TABLESAMPLE is not allowed to cache.
979 */
980 if (IsA(node, SelectStmt) &&((SelectStmt *) node)->fromClause)
981 {
982 List *tbl_list = ((SelectStmt *) node)->fromClause;
983 ListCell *tbl;
984
985 foreach(tbl, tbl_list)
986 {
987 if (IsA(lfirst(tbl), RangeTableSample))
988 return false;
989 }
990 }
991
992
993 /*
994 * If the table is in the while list, allow to cache even if it is VIEW or
995 * unlogged table.
996 */
997 if (pool_config->num_white_memqcache_table_list > 0)
998 {
999 if (num_oids < 0)
1000 num_oids = pool_extract_table_oids_from_select_stmt(node, &ctx);
1001
1002 if (num_oids > 0)
1003 {
1004 for (i = 0; i < num_oids; i++)
1005 {
1006 char *table = ctx.table_names[i];
1007
1008 ereport(DEBUG1,
1009 (errmsg("memcache: checking if node is allowed to cache: check table_names[%d] = \"%s\"", i, table)));
1010 if (is_view(table) || is_unlogged_table(table))
1011 {
1012 if (pool_is_table_in_white_list(table) == false)
1013 {
1014 ereport(DEBUG1,
1015 (errmsg("memcache: node is not allowed to cache")));
1016 return false;
1017 }
1018 }
1019 }
1020 }
1021 }
1022 else
1023 {
1024 /*
1025 * If SELECT uses views, it's not allowed to cache.
1026 */
1027 if (pool_has_view(node))
1028 return false;
1029
1030 /*
1031 * If SELECT uses unlogged tables, it's not allowed to cache.
1032 */
1033 if (pool_has_unlogged_table(node))
1034 return false;
1035 }
1036
1037 /*
1038 * If Data-modifying statements in WITH clause, it's not allowed to cache.
1039 */
1040 if(IsA(node, SelectStmt) && ((SelectStmt *) node)->withClause)
1041 {
1042 ListCell *lc;
1043 WithClause *withClause = ((SelectStmt *) node)->withClause;
1044
1045 foreach(lc, withClause->ctes)
1046 {
1047 CommonTableExpr *cte = (CommonTableExpr *)lfirst(lc);
1048 if(IsA(cte->ctequery, InsertStmt) ||
1049 IsA(cte->ctequery, DeleteStmt) ||
1050 IsA(cte->ctequery, UpdateStmt))
1051 {
1052 return false;
1053 }
1054 }
1055 }
1056
1057 return true;
1058 }
1059
1060
1061 /*
1062 * Return true If the SELECTed table is in back list.
1063 */
1064 bool
pool_is_table_in_black_list(const char * table_name)1065 pool_is_table_in_black_list(const char *table_name)
1066 {
1067
1068 if (pool_config->num_black_memqcache_table_list > 0 &&
1069 pattern_compare((char *) table_name, BLACKLIST, "black_memqcache_table_list") == 1)
1070 {
1071 return true;
1072 }
1073
1074 return false;
1075 }
1076
1077 /*
1078 * Return true If the SELECTed table is in white list.
1079 */
1080 bool
pool_is_table_in_white_list(const char * table_name)1081 pool_is_table_in_white_list(const char *table_name)
1082 {
1083 if (pool_config->num_white_memqcache_table_list > 0 &&
1084 pattern_compare((char *) table_name, WHITELIST, "white_memqcache_table_list") == 1)
1085 {
1086 return true;
1087 }
1088
1089 return false;
1090 }
1091
1092 /*
1093 * Extract table oid from INSERT/UPDATE/DELETE/TRUNCATE/
1094 * DROP TABLE/ALTER TABLE/COPY FROM statement.
1095 * For SELECT, if Data-modifying statements in its WITH clause,
1096 * extract table oid from Data-modifying statements too.
1097 * Returns number of oids.
1098 * In case of error, returns 0 (InvalidOid).
1099 * oids buffer (oidsp) will be discarded by subsequent call.
1100 */
1101 int
pool_extract_table_oids(Node * node,int ** oidsp)1102 pool_extract_table_oids(Node *node, int **oidsp)
1103 {
1104 #define POOL_MAX_DML_OIDS 128
1105 char *table;
1106 static int oids[POOL_MAX_DML_OIDS];
1107 int num_oids;
1108 int oid;
1109
1110 if (node == NULL)
1111 {
1112 ereport(LOG,
1113 (errmsg("memcache: error while extracting table oids. statement is NULL")));
1114 return 0;
1115 }
1116
1117 num_oids = 0;
1118 *oidsp = oids;
1119
1120 if (IsA(node, InsertStmt))
1121 {
1122 InsertStmt *stmt = (InsertStmt *) node;
1123
1124 num_oids = pool_extract_withclause_oids((Node *) stmt->withClause, *oidsp);
1125 table = make_table_name_from_rangevar(stmt->relation);
1126 }
1127 else if (IsA(node, UpdateStmt))
1128 {
1129 UpdateStmt *stmt = (UpdateStmt *) node;
1130
1131 num_oids = pool_extract_withclause_oids((Node *) stmt->withClause, *oidsp);
1132 table = make_table_name_from_rangevar(stmt->relation);
1133 }
1134 else if (IsA(node, DeleteStmt))
1135 {
1136 DeleteStmt *stmt = (DeleteStmt *) node;
1137
1138 num_oids = pool_extract_withclause_oids((Node *) stmt->withClause, *oidsp);
1139 table = make_table_name_from_rangevar(stmt->relation);
1140 }
1141 else if(IsA(node, SelectStmt))
1142 {
1143 SelectStmt *stmt = (SelectStmt *) node;
1144 num_oids = pool_extract_withclause_oids((Node *) stmt->withClause, *oidsp);
1145 table = NULL;
1146 }
1147 #ifdef NOT_USED
1148
1149 /*
1150 * We do not handle CREATE TABLE here. It is possible that
1151 * pool_extract_table_oids() is called before CREATE TABLE gets executed.
1152 */
1153 else if (IsA(node, CreateStmt))
1154 {
1155 CreateStmt *stmt = (CreateStmt *) node;
1156
1157 table = make_table_name_from_rangevar(stmt->relation);
1158 }
1159 #endif
1160
1161 else if (IsA(node, AlterTableStmt))
1162 {
1163 AlterTableStmt *stmt = (AlterTableStmt *) node;
1164
1165 table = make_table_name_from_rangevar(stmt->relation);
1166 }
1167
1168 else if (IsA(node, CopyStmt))
1169 {
1170 CopyStmt *stmt = (CopyStmt *) node;
1171
1172 if (stmt->is_from) /* COPY FROM? */
1173 {
1174 table = make_table_name_from_rangevar(stmt->relation);
1175 }
1176 else
1177 {
1178 return 0;
1179 }
1180 }
1181
1182 else if (IsA(node, DropStmt))
1183 {
1184 ListCell *cell;
1185
1186 DropStmt *stmt = (DropStmt *) node;
1187
1188 if (stmt->removeType != OBJECT_TABLE)
1189 {
1190 return 0;
1191 }
1192
1193 /*
1194 * Here, stmt->objects is list of target relation info. The first
1195 * cell of target relation info is a list (possibly) consists of
1196 * database, schema and relation. We need to call
1197 * makeRangeVarFromNameList() before passing to
1198 * make_table_name_from_rangevar. Otherwise we get weird excessively
1199 * decorated relation name (''table_name'').
1200 */
1201 foreach(cell, stmt->objects)
1202 {
1203 if (num_oids >= POOL_MAX_DML_OIDS)
1204 {
1205 ereport(LOG,
1206 (errmsg("memcache: error while extracting table oids. too many oids:%d", num_oids)));
1207 return 0;
1208 }
1209
1210 table = make_table_name_from_rangevar(makeRangeVarFromNameList(lfirst(cell)));
1211 oid = pool_table_name_to_oid(table);
1212 if (oid > 0)
1213 {
1214 oids[num_oids++] = pool_table_name_to_oid(table);
1215 ereport(DEBUG1,
1216 (errmsg("memcache: extracting table oids: table: \"%s\" oid:%d", table, oids[num_oids - 1])));
1217 }
1218 }
1219 return num_oids;
1220 }
1221 else if (IsA(node, TruncateStmt))
1222 {
1223 ListCell *cell;
1224
1225 TruncateStmt *stmt = (TruncateStmt *) node;
1226
1227 foreach(cell, stmt->relations)
1228 {
1229 if (num_oids >= POOL_MAX_DML_OIDS)
1230 {
1231 ereport(LOG,
1232 (errmsg("memcache: error while extracting table oids. too many oids:%d", num_oids)));
1233 return 0;
1234 }
1235
1236 table = make_table_name_from_rangevar(lfirst(cell));
1237 oid = pool_table_name_to_oid(table);
1238 if (oid > 0)
1239 {
1240 oids[num_oids++] = pool_table_name_to_oid(table);
1241 ereport(DEBUG1,
1242 (errmsg("memcache: extracting table oids: table: \"%s\" oid:%d", table, oids[num_oids - 1])));
1243 }
1244 }
1245 return num_oids;
1246 }
1247 else if (IsA(node, ExplainStmt))
1248 {
1249 ListCell *cell;
1250 DefElem *def;
1251 ExplainStmt *stmt = (ExplainStmt *) node;
1252
1253 foreach(cell, stmt->options)
1254 {
1255 def = lfirst(cell);
1256 if (strncmp("analyze", def->defname, 7) == 0)
1257 {
1258 return pool_extract_table_oids(stmt->query, oidsp);
1259 }
1260 }
1261
1262 table = NULL;
1263 }
1264 else
1265 {
1266 ereport(DEBUG1,
1267 (errmsg("memcache: extracting table oids: statment is different from INSERT/UPDATE/DELETE/TRUNCATE/DROP TABLE/ALTER TABLE")));
1268 return 0;
1269 }
1270
1271 oid = pool_table_name_to_oid(table);
1272 if (oid > 0)
1273 {
1274 if (num_oids >= POOL_MAX_DML_OIDS)
1275 {
1276 ereport(LOG,
1277 (errmsg("memcache: error while extracting table oids. too many oids:%d", num_oids)));
1278 return 0;
1279 }
1280
1281 oids[num_oids++] = pool_table_name_to_oid(table);
1282 ereport(DEBUG1,
1283 (errmsg("memcache: extracting table oids: table: \"%s\" oid:%d", table, oid)));
1284 }
1285 return num_oids;
1286 }
1287
1288 /*
1289 * Extract table oid from INSERT/UPDATE/DELETE
1290 * FROM statement in WITH clause.
1291 * Returns number of oids.
1292 * oids buffer (oidsp) will be discarded by subsequent call.
1293 */
1294 int
pool_extract_withclause_oids(Node * node,int * oidsp)1295 pool_extract_withclause_oids(Node *node, int *oidsp)
1296 {
1297 int num_oids = 0;
1298 int oid;
1299 char *table;
1300 ListCell *lc;
1301 WithClause *with;
1302
1303 if(oidsp == NULL)
1304 {
1305 return 0;
1306 }
1307
1308 if(!node || !IsA(node, WithClause))
1309 {
1310 return 0;
1311 }
1312
1313 with = (WithClause *) node;
1314 foreach(lc, with->ctes)
1315 {
1316 CommonTableExpr *cte = (CommonTableExpr *)lfirst(lc);
1317 if(IsA(cte->ctequery, InsertStmt))
1318 {
1319 InsertStmt *stmt = (InsertStmt *) cte->ctequery;
1320 table = make_table_name_from_rangevar(stmt->relation);
1321 }
1322 else if(IsA(cte->ctequery, DeleteStmt))
1323 {
1324 DeleteStmt *stmt = (DeleteStmt *) cte->ctequery;
1325 table = make_table_name_from_rangevar(stmt->relation);
1326 }
1327 else if(IsA(cte->ctequery, UpdateStmt))
1328 {
1329 UpdateStmt *stmt = (UpdateStmt *) cte->ctequery;
1330 table = make_table_name_from_rangevar(stmt->relation);
1331 }
1332 else
1333 {
1334 /* only check INSERT/DELETE/UPDATE in WITH clause */
1335 table = NULL;
1336 }
1337
1338 oid = pool_table_name_to_oid(table);
1339 if (oid > 0)
1340 {
1341 if (num_oids >= POOL_MAX_DML_OIDS)
1342 {
1343 break;
1344 }
1345
1346 oidsp[num_oids++] = pool_table_name_to_oid(table);
1347 ereport(DEBUG1,
1348 (errmsg("memcache: extracting table oids: table: \"%s\" oid:%d", table, oidsp[num_oids - 1])));
1349 }
1350 }
1351
1352 return num_oids;
1353 }
1354
1355 #define POOL_OIDBUF_SIZE 1024
1356 static int *oidbuf;
1357 static int oidbufp;
1358 static int oidbuf_size;
1359
1360 /*
1361 * Add table oid to internal buffer
1362 */
1363 void
pool_add_dml_table_oid(int oid)1364 pool_add_dml_table_oid(int oid)
1365 {
1366 int i;
1367 int *tmp;
1368
1369 if (oid == 0)
1370 return;
1371
1372 if (oidbufp >= oidbuf_size)
1373 {
1374 MemoryContext oldcxt;
1375
1376 oidbuf_size += POOL_OIDBUF_SIZE;
1377
1378 /*
1379 * This need to live throughout the life of child so home it in
1380 * TopMemoryContext
1381 */
1382 oldcxt = MemoryContextSwitchTo(TopMemoryContext);
1383 tmp = repalloc(oidbuf, sizeof(int) * oidbuf_size);
1384 MemoryContextSwitchTo(oldcxt);
1385 if (tmp == NULL)
1386 return;
1387
1388 oidbuf = tmp;
1389 }
1390
1391 for (i = 0; i < oidbufp; i++)
1392 {
1393 if (oidbuf[i] == oid)
1394 /* Already same oid exists */
1395 return;
1396 }
1397 oidbuf[oidbufp++] = oid;
1398 }
1399
1400
1401 /*
1402 * Get table oid buffer
1403 */
1404 static int
pool_get_dml_table_oid(int ** oid)1405 pool_get_dml_table_oid(int **oid)
1406 {
1407 *oid = oidbuf;
1408 return oidbufp;
1409 }
1410
1411 static int
pool_get_dropdb_table_oids(int ** oids,int dboid)1412 pool_get_dropdb_table_oids(int **oids, int dboid)
1413 {
1414 int *rtn = 0;
1415 int oids_size = 0;
1416 int *tmp;
1417
1418 int num_oids = 0;
1419 DIR *dir;
1420 struct dirent *dp;
1421 char path[1024];
1422
1423 snprintf(path, sizeof(path), "%s/%d", pool_config->memqcache_oiddir, dboid);
1424 if ((dir = opendir(path)) == NULL)
1425 {
1426 ereport(DEBUG1,
1427 (errmsg("memcache: getting drop table oids"),
1428 errdetail("Failed to open dir: %s", path)));
1429 return 0;
1430 }
1431
1432 while ((dp = readdir(dir)) != NULL)
1433 {
1434 if (strcmp(dp->d_name, ".") == 0 || strcmp(dp->d_name, "..") == 0)
1435 continue;
1436
1437 if (num_oids >= oids_size)
1438 {
1439 oids_size += POOL_OIDBUF_SIZE;
1440 tmp = repalloc(rtn, sizeof(int) * oids_size);
1441 if (tmp == NULL)
1442 {
1443 closedir(dir);
1444 return 0;
1445 }
1446 rtn = tmp;
1447 }
1448
1449 rtn[num_oids] = atol(dp->d_name);
1450 num_oids++;
1451 }
1452
1453 closedir(dir);
1454 *oids = rtn;
1455
1456 return num_oids;
1457 }
1458
1459 /* Discard oid internal buffer */
1460 static void
pool_discard_dml_table_oid(void)1461 pool_discard_dml_table_oid(void)
1462 {
1463 oidbufp = 0;
1464 }
1465
1466 /*
1467 * Management modules for oid map. When caching SELECT results, we
1468 * record table oids to file, which has following structure.
1469 *
1470 * memqcache_oiddir -+- database_oid -+-table_oid_file1
1471 * |
1472 * +-table_oid_file2
1473 * |
1474 * +-table_oid_file3...
1475 *
1476 * table_oid_file's name is table oid, which was used by the SELECT
1477 * statement. The file has 1 or more cacheid(s). When SELECT result is
1478 * cached, the file is created and cache id is appended. Later SELECT
1479 * using same table oid will add to the same file. If the SELECT uses
1480 * multiple tables, multiple table_oid_file will be created. When
1481 * INSERT/UPDATE/DELETE is executed, corresponding caches must be
1482 * deleted (cache invalidation) (when DROP TABLE, ALTER TABLE is
1483 * executed, the caches must be deleted as well). When database is
1484 * dropped, all caches belonging to the database must be deleted.
1485 */
1486
1487 /*
1488 * Get oid of current database
1489 */
1490 static int
pool_get_database_oid(void)1491 pool_get_database_oid(void)
1492 {
1493 /*
1494 * Query to convert table name to oid
1495 */
1496 int oid = 0;
1497 static POOL_RELCACHE * relcache;
1498 POOL_CONNECTION_POOL *backend;
1499
1500 backend = pool_get_session_context(false)->backend;
1501
1502 /*
1503 * If relcache does not exist, create it.
1504 */
1505 if (!relcache)
1506 {
1507 relcache = pool_create_relcache(pool_config->relcache_size, DATABASE_TO_OID_QUERY,
1508 int_register_func, int_unregister_func,
1509 false);
1510 if (relcache == NULL)
1511 {
1512 ereport(LOG,
1513 (errmsg("memcache: error creating relcache while getting database OID")));
1514 return oid;
1515 }
1516 }
1517
1518 /*
1519 * Search relcache.
1520 */
1521 oid = (int) (intptr_t) pool_search_relcache(relcache, backend,
1522 MASTER_CONNECTION(backend)->sp->database);
1523 return oid;
1524 }
1525
1526 /*
1527 * Get oid of current database for discarding cache files
1528 * after executing DROP DATABASE
1529 */
1530 int
pool_get_database_oid_from_dbname(char * dbname)1531 pool_get_database_oid_from_dbname(char *dbname)
1532 {
1533 int dboid = 0;
1534 POOL_SELECT_RESULT *res;
1535 char query[1024];
1536
1537 POOL_CONNECTION_POOL *backend;
1538
1539 backend = pool_get_session_context(false)->backend;
1540
1541 snprintf(query, sizeof(query), DATABASE_TO_OID_QUERY, dbname);
1542 do_query(MASTER(backend), query, &res, MAJOR(backend));
1543
1544 if (res->numrows != 1)
1545 {
1546 ereport(DEBUG1,
1547 (errmsg("memcache: getting oid of current database"),
1548 errdetail("received %d rows", res->numrows)));
1549 free_select_result(res);
1550 return 0;
1551 }
1552
1553 dboid = atol(res->data[0]);
1554 free_select_result(res);
1555
1556 return dboid;
1557 }
1558
1559 /*
1560 * Add cache id (shmem case) or hash key (memcached case) to table oid
1561 * map file. Caller must hold shmem lock before calling this function
1562 * to avoid file extension conflict among different pgpool child
1563 * process.
1564 * As of pgpool-II 3.2, pool_handle_query_cache is responsible for that.
1565 * (pool_handle_query_cache -> pool_commit_cache -> pool_add_table_oid_map)
1566 */
1567 static void
pool_add_table_oid_map(POOL_CACHEKEY * cachekey,int num_table_oids,int * table_oids)1568 pool_add_table_oid_map(POOL_CACHEKEY * cachekey, int num_table_oids, int *table_oids)
1569 {
1570 char *dir;
1571 int dboid;
1572 char path[1024];
1573 int i;
1574 int len;
1575
1576 /*
1577 * Create memqcache_oiddir
1578 */
1579 dir = pool_config->memqcache_oiddir;
1580
1581 if (mkdir(dir, S_IREAD | S_IWRITE | S_IEXEC) == -1)
1582 {
1583 if (errno != EEXIST)
1584 {
1585 ereport(WARNING,
1586 (errmsg("memcache: adding table oid maps, failed to create directory:\"%s\"", dir),
1587 errdetail("%m")));
1588 return;
1589 }
1590 }
1591
1592 /*
1593 * Create memqcache_oiddir/database_oid
1594 */
1595 dboid = pool_get_database_oid();
1596 ereport(DEBUG1,
1597 (errmsg("memcache: adding table oid maps"),
1598 errdetail("dboid %d", dboid)));
1599
1600 if (dboid <= 0)
1601 {
1602 ereport(WARNING,
1603 (errmsg("memcache: adding table oid maps, failed to get database OID")));
1604 return;
1605 }
1606
1607 snprintf(path, sizeof(path), "%s/%d", dir, dboid);
1608 if (mkdir(path, S_IREAD | S_IWRITE | S_IEXEC) == -1)
1609 {
1610 if (errno != EEXIST)
1611 {
1612 ereport(WARNING,
1613 (errmsg("memcache: adding table oid maps, failed to create directory:\"%s\"", path),
1614 errdetail("%m")));
1615 return;
1616 }
1617 }
1618
1619 if (pool_is_shmem_cache())
1620 {
1621 len = sizeof(cachekey->cacheid);
1622 }
1623 else
1624 {
1625 len = sizeof(cachekey->hashkey);
1626 }
1627
1628 for (i = 0; i < num_table_oids; i++)
1629 {
1630 int fd;
1631 int oid = table_oids[i];
1632 int sts;
1633 struct flock fl;
1634
1635 /*
1636 * Create or open each memqcache_oiddir/database_oid/table_oid
1637 */
1638 snprintf(path, sizeof(path), "%s/%d/%d", dir, dboid, oid);
1639 if ((fd = open(path, O_CREAT | O_RDWR, S_IRUSR | S_IWUSR)) == -1)
1640 {
1641 ereport(WARNING,
1642 (errmsg("memcache: adding table oid maps, failed to open file:\"%s\"", path),
1643 errdetail("%m")));
1644 return;
1645 }
1646
1647 fl.l_type = F_WRLCK;
1648 fl.l_whence = SEEK_SET;
1649 fl.l_start = 0; /* Offset from l_whence */
1650 fl.l_len = 0; /* length, 0 = to EOF */
1651
1652 sts = fcntl(fd, F_SETLKW, &fl);
1653 if (sts == -1)
1654 {
1655 ereport(WARNING,
1656 (errmsg("memcache: adding table oid maps, failed to lock file:\"%s\"", path),
1657 errdetail("%m")));
1658
1659 close(fd);
1660 return;
1661 }
1662
1663 /*
1664 * Below was ifdef-out because of a performance reason. Looking for
1665 * duplicate cache entries in a file needed unacceptably high cost. So
1666 * we gave up this and decided not to care about duplicate entries in
1667 * the file.
1668 */
1669 #ifdef NOT_USED
1670 for (;;)
1671 {
1672 sts = read(fd, (char *) &buf, len);
1673 if (sts == -1)
1674 {
1675 ereport(WARNING,
1676 (errmsg("memcache: adding table oid maps, failed to read file:\"%s\"", path),
1677 errdetail("%m")));
1678 close(fd);
1679 return;
1680 }
1681 else if (sts == len)
1682 {
1683 if (memcmp(cachekey, &buf, len) == 0)
1684 {
1685 /* Same key found. Skip this */
1686 close(fd);
1687 return;
1688 }
1689 continue;
1690 }
1691
1692 /*
1693 * Must be EOF
1694 */
1695 if (sts != 0)
1696 {
1697 ereport(WARNING,
1698 (errmsg("memcache: adding table oid maps, invalid data length:%d in file:\"%s\". error:\"%s\"", sts, path)));
1699 close(fd);
1700 return;
1701 }
1702 break;
1703 }
1704 #endif
1705
1706 if (lseek(fd, 0, SEEK_END) == -1)
1707 {
1708 ereport(WARNING,
1709 (errmsg("memcache: adding table oid maps, failed seek on file:\"%s\"", path),
1710 errdetail("%m")));
1711 close(fd);
1712 return;
1713 }
1714
1715 /*
1716 * Write cache_id or cache key at the end of file
1717 */
1718 sts = write(fd, (char *) cachekey, len);
1719 if (sts == -1 || sts != len)
1720 {
1721 ereport(WARNING,
1722 (errmsg("memcache: adding table oid maps, failed to write file:\"%s\"", path),
1723 errdetail("%m")));
1724 close(fd);
1725 return;
1726 }
1727 close(fd);
1728 }
1729 }
1730
1731 /*
1732 * Discard all oid maps at pgpool-II startup.
1733 * This is necessary for shmem case.
1734 */
1735 void
pool_discard_oid_maps(void)1736 pool_discard_oid_maps(void)
1737 {
1738 char command[1024];
1739
1740 snprintf(command, sizeof(command), "/bin/rm -fr %s/[0-9]*",
1741 pool_config->memqcache_oiddir);
1742 if (system(command) == -1)
1743 ereport(WARNING,
1744 (errmsg("unable to execute command \"%s\"", command),
1745 errdetail("system() command failed with error \"%m\"")));
1746
1747
1748 }
1749
1750 void
pool_discard_oid_maps_by_db(int dboid)1751 pool_discard_oid_maps_by_db(int dboid)
1752 {
1753 char command[1024];
1754
1755 if (pool_is_shmem_cache())
1756 {
1757 snprintf(command, sizeof(command), "/bin/rm -fr %s/%d/",
1758 pool_config->memqcache_oiddir, dboid);
1759
1760 ereport(DEBUG1,
1761 (errmsg("memcache: discarding oid maps by db"),
1762 errdetail("command: '%s\'", command)));
1763
1764 if (system(command) == -1)
1765 ereport(WARNING,
1766 (errmsg("unable to execute command \"%s\"", command),
1767 errdetail("system() command failed with error \"%m\"")));
1768 }
1769 }
1770
1771 /*
1772 * Read cache id (shmem case) or hash key (memcached case) from table
1773 * oid map file according to table_oids and discard cache entries. If
1774 * unlink is true, the file will be unlinked after successful cache
1775 * removal.
1776 */
1777 static void
pool_invalidate_query_cache(int num_table_oids,int * table_oid,bool unlinkp,int dboid)1778 pool_invalidate_query_cache(int num_table_oids, int *table_oid, bool unlinkp, int dboid)
1779 {
1780 char *dir;
1781 char path[1024];
1782 int i;
1783 int len;
1784 POOL_CACHEKEY buf;
1785
1786 /*
1787 * Create memqcache_oiddir
1788 */
1789 dir = pool_config->memqcache_oiddir;
1790 if (mkdir(dir, S_IREAD | S_IWRITE | S_IEXEC) == -1)
1791 {
1792 if (errno != EEXIST)
1793 {
1794 ereport(WARNING,
1795 (errmsg("memcache: invalidating query cache, failed to create directory:\"%s\"", dir),
1796 errdetail("%m")));
1797 return;
1798 }
1799 }
1800
1801 /*
1802 * Create memqcache_oiddir/database_oid
1803 */
1804 if (dboid == 0)
1805 {
1806 dboid = pool_get_database_oid();
1807 ereport(DEBUG1,
1808 (errmsg("memcache invalidating query cache"),
1809 errdetail("dboid %d", dboid)));
1810
1811 if (dboid <= 0)
1812 {
1813 ereport(WARNING,
1814 (errmsg("memcache: invalidating query cache, could not get database OID")));
1815 return;
1816 }
1817 }
1818
1819 snprintf(path, sizeof(path), "%s/%d", dir, dboid);
1820 if (mkdir(path, S_IREAD | S_IWRITE | S_IEXEC) == -1)
1821 {
1822 if (errno != EEXIST)
1823 {
1824 ereport(WARNING,
1825 (errmsg("memcache: invalidating query cache, failed to create directory:\"%s\"", path),
1826 errdetail("%m")));
1827 return;
1828 }
1829 }
1830
1831 if (pool_is_shmem_cache())
1832 {
1833 len = sizeof(buf.cacheid);
1834 }
1835 else
1836 {
1837 len = sizeof(buf.hashkey);
1838 }
1839
1840 for (i = 0; i < num_table_oids; i++)
1841 {
1842 int fd;
1843 int oid = table_oid[i];
1844 int sts;
1845 struct flock fl;
1846
1847 /*
1848 * Open each memqcache_oiddir/database_oid/table_oid
1849 */
1850 snprintf(path, sizeof(path), "%s/%d/%d", dir, dboid, oid);
1851 if ((fd = open(path, O_RDONLY)) == -1)
1852 {
1853 /*
1854 * This may be normal. It is possible that no SELECT has been
1855 * issued since the table has been created or since pgpool-II
1856 * started up.
1857 */
1858 ereport(DEBUG1,
1859 (errmsg("memcache invalidating query cache"),
1860 errdetail("failed to open \"%s\". reason:\"%m\"", path)));
1861 continue;
1862 }
1863
1864 fl.l_type = F_RDLCK;
1865 fl.l_whence = SEEK_SET;
1866 fl.l_start = 0; /* Offset from l_whence */
1867 fl.l_len = 0; /* length, 0 = to EOF */
1868
1869 sts = fcntl(fd, F_SETLKW, &fl);
1870 if (sts == -1)
1871 {
1872 ereport(WARNING,
1873 (errmsg("memcache: invalidating query cache, failed to lock file:\"%s\"", path),
1874 errdetail("%m")));
1875 close(fd);
1876 return;
1877 }
1878 for (;;)
1879 {
1880 sts = read(fd, (char *) &buf, len);
1881 if (sts == -1)
1882 {
1883 ereport(WARNING,
1884 (errmsg("memcache: invalidating query cache, failed to read file:\"%s\"", path),
1885 errdetail("%m")));
1886
1887 close(fd);
1888 return;
1889 }
1890 else if (sts == len)
1891 {
1892 if (pool_is_shmem_cache())
1893 {
1894 ereport(DEBUG1,
1895 (errmsg("memcache invalidating query cache"),
1896 errdetail("deleting cacheid:%d itemid:%d",
1897 buf.cacheid.blockid, buf.cacheid.itemid)));
1898 pool_delete_item_shmem_cache(&buf.cacheid);
1899 }
1900 #ifdef USE_MEMCACHED
1901 else
1902 {
1903 char delbuf[33];
1904
1905 memcpy(delbuf, buf.hashkey, 32);
1906 delbuf[32] = 0;
1907 ereport(DEBUG1,
1908 (errmsg("memcache invalidating query cache"),
1909 errdetail("deleting %s", delbuf)));
1910
1911 delete_cache_on_memcached(delbuf);
1912 }
1913 #endif
1914 continue;
1915 }
1916
1917 /*
1918 * Must be EOF
1919 */
1920 if (sts != 0)
1921 {
1922 ereport(WARNING,
1923 (errmsg("memcache: invalidating query cache, invalid data length:%d in file:\"%s\"", sts, path)));
1924 close(fd);
1925 return;
1926 }
1927 break;
1928 }
1929
1930 if (unlinkp)
1931 {
1932 unlink(path);
1933 }
1934 close(fd);
1935 }
1936 #ifdef SHMEMCACHE_DEBUG
1937 dump_shmem_cache(0);
1938 #endif
1939 }
1940
1941 /*
1942 * Reset SELECT data buffers. If reset_dml_oids is true, call
1943 * pool_discard_dml_table_oid() to reset table oids used in DML statements.
1944 */
1945 static void
pool_reset_memqcache_buffer(bool reset_dml_oids)1946 pool_reset_memqcache_buffer(bool reset_dml_oids)
1947 {
1948 POOL_SESSION_CONTEXT *session_context;
1949
1950 session_context = pool_get_session_context(true);
1951
1952 if (session_context && session_context->query_cache_array)
1953 {
1954 POOL_TEMP_QUERY_CACHE *cache;
1955
1956 ereport(DEBUG1,
1957 (errmsg("memcache reset buffer"),
1958 errdetail("discard: %p", session_context->query_cache_array)));
1959
1960 pool_discard_query_cache_array(session_context->query_cache_array);
1961 session_context->query_cache_array = pool_create_query_cache_array();
1962
1963 ereport(DEBUG1,
1964 (errmsg("memcache reset buffer"),
1965 errdetail("create: %p", session_context->query_cache_array)));
1966
1967 /*
1968 * if the query context is still under use, we cannot discard
1969 * temporary cache.
1970 */
1971 if ((SL_MODE && pool_is_doing_extended_query_message()) ||
1972 can_query_context_destroy(session_context->query_context))
1973 {
1974 ereport(DEBUG1,
1975 (errmsg("memcache reset buffer"),
1976 errdetail("discard temp buffer of %p (%s)",
1977 session_context->query_context, session_context->query_context->original_query)));
1978
1979 cache = pool_get_current_cache();
1980 pool_discard_temp_query_cache(cache);
1981
1982 /*
1983 * Reset temp_cache pointer in the current query context so that
1984 * we don't double free memory.
1985 */
1986 session_context->query_context->temp_cache = NULL;
1987 }
1988 }
1989
1990 if (reset_dml_oids)
1991 pool_discard_dml_table_oid();
1992
1993 pool_tmp_stats_reset_num_selects();
1994 }
1995
1996 /*
1997 * Return true if memory cache method is "shmem". The purpose of this
1998 * function is to cache the result of stcmp and to save a few cycle.
1999 */
2000 bool
pool_is_shmem_cache(void)2001 pool_is_shmem_cache(void)
2002 {
2003 return pool_config->memqcache_method == SHMEM_CACHE;
2004 }
2005
2006 /*
2007 * Remember memory cache number of blocks.
2008 */
2009 static int memqcache_num_blocks;
2010 static void
pool_set_memqcache_blocks(int num_blocks)2011 pool_set_memqcache_blocks(int num_blocks)
2012 {
2013 memqcache_num_blocks = num_blocks;
2014 }
2015
2016 /*
2017 * Return memory cache number of blocks.
2018 */
2019 static int
pool_get_memqcache_blocks(void)2020 pool_get_memqcache_blocks(void)
2021 {
2022 return memqcache_num_blocks;
2023 }
2024
2025 /*
2026 * Query cache on shared memory management modules.
2027 */
2028
2029 /*
2030 * Calculate necessary shared memory size.
2031 */
2032 size_t
pool_shared_memory_cache_size(void)2033 pool_shared_memory_cache_size(void)
2034 {
2035 int64 num_blocks;
2036 size_t size;
2037
2038 if (pool_config->memqcache_maxcache > pool_config->memqcache_cache_block_size)
2039 ereport(FATAL,
2040 (errmsg("invalid memory cache configuration"),
2041 errdetail("memqcache_cache_block_size %d should be greater or equal to memqcache_maxcache %d",
2042 pool_config->memqcache_cache_block_size,
2043 pool_config->memqcache_maxcache)));
2044
2045
2046 num_blocks = pool_config->memqcache_total_size /
2047 pool_config->memqcache_cache_block_size;
2048 if (num_blocks == 0)
2049 ereport(FATAL,
2050 (errmsg("invalid memory cache configuration"),
2051 errdetail("memqcache_total_size %ld should be greater or equal to memqcache_cache_block_size %d",
2052 pool_config->memqcache_total_size,
2053 pool_config->memqcache_cache_block_size)));
2054
2055 ereport(LOG,
2056 (errmsg("memory cache initialized"),
2057 errdetail("memcache blocks :%ld", num_blocks)));
2058 /* Remember # of blocks */
2059 pool_set_memqcache_blocks(num_blocks);
2060 size = pool_config->memqcache_cache_block_size * num_blocks;
2061 return size;
2062 }
2063
2064 /*
2065 * Acquire and initialize shared memory cache. This should be called
2066 * only once from pgpool main process at the process staring up time.
2067 */
2068 static void *shmem;
2069 int
pool_init_memory_cache(size_t size)2070 pool_init_memory_cache(size_t size)
2071 {
2072 ereport(DEBUG1,
2073 (errmsg("memory cache request size : %zd", size)));
2074
2075 shmem = pool_shared_memory_create(size);
2076 return 0;
2077 }
2078
2079 /*
2080 * Clear all the shared memory cache and reset FSMM and hash table.
2081 */
2082 void
pool_clear_memory_cache(void)2083 pool_clear_memory_cache(void)
2084 {
2085 size_t size;
2086 pool_sigset_t oldmask;
2087
2088 POOL_SETMASK2(&BlockSig, &oldmask);
2089 pool_shmem_lock();
2090
2091 PG_TRY();
2092 {
2093 size = pool_shared_memory_cache_size();
2094 memset(shmem, 0, size);
2095
2096 size = pool_shared_memory_fsmm_size();
2097 pool_reset_fsmm(size);
2098
2099 pool_discard_oid_maps();
2100
2101 pool_hash_reset(pool_config->memqcache_max_num_cache);
2102 }
2103 PG_CATCH();
2104 {
2105 pool_shmem_unlock();
2106 POOL_SETMASK(&oldmask);
2107 PG_RE_THROW();
2108 }
2109 PG_END_TRY();
2110
2111 pool_shmem_unlock();
2112 POOL_SETMASK(&oldmask);
2113 }
2114
2115 /*
2116 * Return shared memory cache address
2117 */
2118 static void *
pool_memory_cache_address(void)2119 pool_memory_cache_address(void)
2120 {
2121 return shmem;
2122 }
2123
2124 /*
2125 * Initialize new block
2126 */
2127
2128 /*
2129 * Free space management map
2130 *
2131 * Free space management map (FSMM) consists of bytes. Each byte
2132 * corresponds to block id. For example, if you have 1GB cache and
2133 * block size is 8Kb, number of blocks = 131,072, thus total size of
2134 * FSMM is 128Kb. Each FSMM entry has value from 0 to 255. Those
2135 * values describes total free space in each block.
2136 * For example, if the value is 2, the free space can be between 64
2137 * bytes and 95 bytes.
2138 *
2139 * value free space(in bytes)
2140 * 0 0-31
2141 * 1 32-63
2142 * 2 64-95
2143 * 3 96-127
2144 * :
2145 * 255 8160-8192
2146 */
2147
2148 /*
2149 * Calculate necessary shared memory size for FSMM. Should be called after
2150 * pool_shared_memory_cache_size.
2151 */
2152 size_t
pool_shared_memory_fsmm_size(void)2153 pool_shared_memory_fsmm_size(void)
2154 {
2155 size_t size;
2156
2157 size = pool_get_memqcache_blocks() * sizeof(char);
2158 return size;
2159 }
2160
2161 /*
2162 * Acquire and initialize shared memory cache for FSMM. This should be
2163 * called after pool_shared_memory_cache_size only once from pgpool
2164 * main process at the process staring up time.
2165 */
2166 static void *fsmm;
2167 int
pool_init_fsmm(size_t size)2168 pool_init_fsmm(size_t size)
2169 {
2170 int maxblock = pool_get_memqcache_blocks();
2171 int encode_value;
2172
2173 fsmm = pool_shared_memory_create(size);
2174 encode_value = POOL_MAX_FREE_SPACE / POOL_FSMM_RATIO;
2175 memset(fsmm, encode_value, maxblock);
2176 return 0;
2177 }
2178
2179 /*
2180 * Return shared memory fsmm address
2181 */
2182 static void *
pool_fsmm_address(void)2183 pool_fsmm_address(void)
2184 {
2185 return fsmm;
2186 }
2187
2188 /*
2189 * Clock algorithm shared query cache management modules.
2190 */
2191
2192 /*
2193 * Clock hand pointing to next victim block
2194 */
2195 static int *pool_fsmm_clock_hand;
2196
2197 /*
2198 * Allocate and initialize clock hand on shmem
2199 */
2200 void
pool_allocate_fsmm_clock_hand(void)2201 pool_allocate_fsmm_clock_hand(void)
2202 {
2203 pool_fsmm_clock_hand = pool_shared_memory_create(sizeof(*pool_fsmm_clock_hand));
2204 *pool_fsmm_clock_hand = 0;
2205 }
2206
2207 /*
2208 * Reset FSMM.
2209 */
2210 static void
pool_reset_fsmm(size_t size)2211 pool_reset_fsmm(size_t size)
2212 {
2213 int encode_value;
2214
2215 encode_value = POOL_MAX_FREE_SPACE / POOL_FSMM_RATIO;
2216 memset(fsmm, encode_value, size);
2217
2218 *pool_fsmm_clock_hand = 0;
2219 }
2220
2221 /*
2222 * Find victim block using clock algorithm and make it free.
2223 * Returns new free block id.
2224 */
pool_reuse_block(void)2225 static POOL_CACHE_BLOCKID pool_reuse_block(void)
2226 {
2227 int maxblock = pool_get_memqcache_blocks();
2228 char *block = block_address(*pool_fsmm_clock_hand);
2229 POOL_CACHE_BLOCK_HEADER *bh = (POOL_CACHE_BLOCK_HEADER *) block;
2230 POOL_CACHE_BLOCKID reused_block;
2231 POOL_CACHE_ITEM_POINTER *cip;
2232 char *p;
2233 int i;
2234
2235 bh->flags = 0;
2236 reused_block = *pool_fsmm_clock_hand;
2237 p = block_address(reused_block);
2238
2239 for (i = 0; i < bh->num_items; i++)
2240 {
2241 cip = item_pointer(p, i);
2242
2243 if (!(POOL_ITEM_DELETED & cip->flags))
2244 {
2245 pool_hash_delete(&cip->query_hash);
2246 ereport(DEBUG1,
2247 (errmsg("pool_reuse_block: blockid: %d item: %d", reused_block, i)));
2248 }
2249 }
2250
2251 pool_init_cache_block(reused_block);
2252 pool_update_fsmm(reused_block, POOL_MAX_FREE_SPACE);
2253
2254 (*pool_fsmm_clock_hand)++;
2255 if (*pool_fsmm_clock_hand >= maxblock)
2256 *pool_fsmm_clock_hand = 0;
2257
2258 ereport(LOG,
2259 (errmsg("pool_reuse_block: blockid: %d", reused_block)));
2260
2261 return reused_block;
2262 }
2263
2264 /*
2265 * Get block id which has enough space
2266 */
pool_get_block(size_t free_space)2267 static POOL_CACHE_BLOCKID pool_get_block(size_t free_space)
2268 {
2269 int encode_value;
2270 unsigned char *p = pool_fsmm_address();
2271 int i;
2272 int maxblock = pool_get_memqcache_blocks();
2273 POOL_CACHE_BLOCK_HEADER *bh;
2274
2275 if (p == NULL)
2276 {
2277 ereport(WARNING,
2278 (errmsg("memcache: getting block: FSMM is not initialized")));
2279 return -1;
2280 }
2281
2282 if (free_space > POOL_MAX_FREE_SPACE)
2283 {
2284 ereport(WARNING,
2285 (errmsg("memcache: getting block: invalid free space:%zd", free_space),
2286 errdetail("requested free space: %zd is more than maximum allowed space:%lu", free_space, POOL_MAX_FREE_SPACE)));
2287 return -1;
2288 }
2289
2290 encode_value = free_space / POOL_FSMM_RATIO;
2291
2292 for (i = 0; i < maxblock; i++)
2293 {
2294 if (p[i] >= encode_value)
2295 {
2296 /*
2297 * This block *may" have enough space. We need to make sure it
2298 * actually has enough space.
2299 */
2300 bh = (POOL_CACHE_BLOCK_HEADER *) block_address(i);
2301 if (bh->free_bytes >= free_space)
2302 {
2303 return (POOL_CACHE_BLOCKID) i;
2304 }
2305 }
2306 }
2307
2308 /*
2309 * No enough space found. Reuse victim block
2310 */
2311 return pool_reuse_block();
2312 }
2313
2314 /*
2315 * Update free space info for specified block
2316 */
2317 static void
pool_update_fsmm(POOL_CACHE_BLOCKID blockid,size_t free_space)2318 pool_update_fsmm(POOL_CACHE_BLOCKID blockid, size_t free_space)
2319 {
2320 int encode_value;
2321 char *p = pool_fsmm_address();
2322
2323 if (p == NULL)
2324 {
2325 ereport(WARNING,
2326 (errmsg("memcache: updating free space in block: FSMM is not initialized")));
2327 return;
2328 }
2329
2330 if (blockid >= pool_get_memqcache_blocks())
2331 {
2332 ereport(WARNING,
2333 (errmsg("memcache: updating free space in block: block id:%d in invalid", blockid)));
2334 return;
2335 }
2336
2337 if (free_space > POOL_MAX_FREE_SPACE)
2338 {
2339 ereport(WARNING,
2340 (errmsg("memcache: updating free space in block: invalid free space:%zd", free_space),
2341 errdetail("requested free space: %zd is more than maximum allowed space:%lu", free_space, POOL_MAX_FREE_SPACE)));
2342
2343 return;
2344 }
2345
2346 encode_value = free_space / POOL_FSMM_RATIO;
2347
2348 p[blockid] = encode_value;
2349
2350 return;
2351 }
2352
2353 /*
2354 * Add item data to shared memory cache.
2355 * On successful registration, returns cache id.
2356 * The cache id is overwritten by the subsequent call to this function.
2357 * On error returns NULL.
2358 */
pool_add_item_shmem_cache(POOL_QUERY_HASH * query_hash,char * data,int size,time_t expire)2359 static POOL_CACHEID * pool_add_item_shmem_cache(POOL_QUERY_HASH * query_hash, char *data, int size, time_t expire)
2360 {
2361 static POOL_CACHEID cacheid;
2362 POOL_CACHE_BLOCKID blockid;
2363 POOL_CACHE_BLOCK_HEADER *bh;
2364 POOL_CACHE_ITEM_POINTER *cip;
2365
2366 POOL_CACHE_ITEM ci;
2367 POOL_CACHE_ITEM_POINTER cip_body;
2368 char *item;
2369
2370 int request_size;
2371 char *p;
2372 int i;
2373 char *src;
2374 char *dst;
2375 int num_deleted;
2376 char *dcip;
2377 char *dci;
2378 bool need_pack;
2379 char *work_buffer;
2380 int index;
2381
2382 if (query_hash == NULL)
2383 {
2384 ereport(LOG,
2385 (errmsg("error while adding item to shmem cache, query hash is NULL")));
2386 return NULL;
2387 }
2388
2389 if (data == NULL)
2390 {
2391 ereport(LOG,
2392 (errmsg("error while adding item to shmem cache, data is NULL")));
2393 return NULL;
2394 }
2395
2396 if (size <= 0)
2397 {
2398 ereport(LOG,
2399 (errmsg("error while adding item to shmem cache, invalid request size: %d", size)));
2400 return NULL;
2401 }
2402
2403 /* Add overhead */
2404 request_size = size + sizeof(POOL_CACHE_ITEM_POINTER) + sizeof(POOL_CACHE_ITEM_HEADER);
2405
2406 /* Get cache block which has enough space */
2407 blockid = pool_get_block(request_size);
2408
2409 if (blockid == -1)
2410 {
2411 return NULL;
2412 }
2413
2414 /*
2415 * Initialize the block if necessary. If no live items are remained, we
2416 * also initialize the block. If there's contiguous deleted items, we turn
2417 * them into free space as well.
2418 */
2419 pool_init_cache_block(blockid);
2420
2421 /*
2422 * Make sure that we have at least one free hash element.
2423 */
2424 while (!is_free_hash_element())
2425 {
2426 /* If not, reuse next victim block */
2427 blockid = pool_reuse_block();
2428 pool_init_cache_block(blockid);
2429 }
2430
2431 /* Get block address on shmem */
2432 p = block_address(blockid);
2433 bh = (POOL_CACHE_BLOCK_HEADER *) p;
2434
2435 /*
2436 * Create contiguous free space. We assume that item bodies are ordered
2437 * from bottom to top of the block, and corresponding item pointers are
2438 * ordered from the youngest to the oldest in the beginning of the block.
2439 */
2440
2441 /*
2442 * Optimization. If there's no deleted items, we don't need to pack it to
2443 * create contiguous free space.
2444 */
2445 need_pack = false;
2446 for (i = 0; i < bh->num_items; i++)
2447 {
2448 cip = item_pointer(p, i);
2449
2450 if (POOL_ITEM_DELETED & cip->flags) /* Deleted item? */
2451 {
2452 need_pack = true;
2453 ereport(DEBUG1,
2454 (errmsg("memcache adding item"),
2455 errdetail("start creating contiguous space")));
2456 break;
2457 }
2458 }
2459
2460 /*
2461 * We disable packing for now. Revisit and remove following code fragment
2462 * later.
2463 */
2464 need_pack = false;
2465
2466 if (need_pack)
2467 {
2468 /*
2469 * Pack and create contiguous free space.
2470 */
2471 dcip = calloc(1, pool_config->memqcache_cache_block_size);
2472 if (!dcip)
2473 {
2474 ereport(WARNING,
2475 (errmsg("memcache: adding item to cache: calloc failed")));
2476 return NULL;
2477 }
2478
2479 work_buffer = dcip;
2480 dci = dcip + pool_config->memqcache_cache_block_size;
2481 num_deleted = 0;
2482 index = 0;
2483
2484 for (i = 0; i < bh->num_items; i++)
2485 {
2486 int total_length;
2487 POOL_CACHEID cid;
2488
2489 cip = item_pointer(p, i);
2490
2491 if (POOL_ITEM_DELETED & cip->flags) /* Deleted item? */
2492 {
2493 num_deleted++;
2494 continue;
2495 }
2496
2497 /* Copy item body */
2498 src = p + cip->offset;
2499 total_length = item_header(p, i)->total_length;
2500 dst = dci - total_length;
2501 cip->offset = dst - work_buffer;
2502 memcpy(dst, src, total_length);
2503
2504 dci -= total_length;
2505
2506 /* Copy item pointer */
2507 src = (char *) cip;
2508 dst = (char *) item_pointer(dcip, index);
2509 memcpy(dst, src, sizeof(POOL_CACHE_ITEM_POINTER));
2510
2511 /* Update hash index */
2512 cid.blockid = blockid;
2513 cid.itemid = index;
2514 pool_hash_insert(&cip->query_hash, &cid, true);
2515 ereport(DEBUG1,
2516 (errmsg("memcache adding item"),
2517 errdetail("item cid updated. old:%d %d new:%d %d",
2518 blockid, i, blockid, index)));
2519 index++;
2520 }
2521
2522 /* All items deleted? */
2523 if (num_deleted > 0 && num_deleted == bh->num_items)
2524 {
2525 ereport(DEBUG1,
2526 (errmsg("memcache adding item"),
2527 errdetail("all items deleted, total deleted:%d", num_deleted)));
2528 bh->flags = 0;
2529 pool_init_cache_block(blockid);
2530 pool_update_fsmm(blockid, POOL_MAX_FREE_SPACE);
2531 }
2532 else
2533 {
2534 /* Update number of items */
2535 bh->num_items -= num_deleted;
2536
2537 /* Copy back the packed block except block header */
2538 memcpy(p + sizeof(POOL_CACHE_BLOCK_HEADER),
2539 work_buffer + sizeof(POOL_CACHE_BLOCK_HEADER),
2540 pool_config->memqcache_cache_block_size - sizeof(POOL_CACHE_BLOCK_HEADER));
2541 }
2542 free(work_buffer);
2543 }
2544
2545 /*
2546 * Make sure that we have enough free space
2547 */
2548 if (bh->free_bytes < request_size)
2549 {
2550 /* This should not happen */
2551 ereport(WARNING,
2552 (errmsg("memcache: adding item to cache: not enough space"),
2553 errdetail("free space: %d required: %d block id:%d",
2554 bh->free_bytes, request_size, blockid)));
2555 return NULL;
2556 }
2557
2558 /*
2559 * At this point, new item can be located at block_header->num_items
2560 */
2561
2562 /* Fill in cache item header */
2563 ci.header.timestamp = time(NULL);
2564 ci.header.expire = expire;
2565 ci.header.total_length = sizeof(POOL_CACHE_ITEM_HEADER) + size;
2566
2567 /* Calculate item body address */
2568 if (bh->num_items == 0)
2569 {
2570 /*
2571 * This is the #0 item. So address is block_bottom - data_length
2572 */
2573 item = p + pool_config->memqcache_cache_block_size - ci.header.total_length;
2574
2575 /* Mark this block used */
2576 bh->flags = POOL_BLOCK_USED;
2577 }
2578 else
2579 {
2580 cip = item_pointer(p, bh->num_items - 1);
2581 item = p + cip->offset - ci.header.total_length;
2582 }
2583
2584 /* Copy item header */
2585 memcpy(item, &ci, sizeof(POOL_CACHE_ITEM_HEADER));
2586 bh->free_bytes -= sizeof(POOL_CACHE_ITEM_HEADER);
2587
2588 /* Copy item body */
2589 memcpy(item + sizeof(POOL_CACHE_ITEM_HEADER), data, size);
2590 bh->free_bytes -= size;
2591
2592 /* Copy cache item pointer */
2593 memcpy(&cip_body.query_hash, query_hash, sizeof(POOL_QUERY_HASH));
2594 memset(&cip_body.next, 0, sizeof(POOL_CACHEID));
2595 cip_body.offset = item - p;
2596 cip_body.flags = POOL_ITEM_USED;
2597 memcpy(item_pointer(p, bh->num_items), &cip_body, sizeof(POOL_CACHE_ITEM_POINTER));
2598 bh->free_bytes -= sizeof(POOL_CACHE_ITEM_POINTER);
2599
2600 /* Update FSMM */
2601 pool_update_fsmm(blockid, bh->free_bytes);
2602
2603 cacheid.blockid = blockid;
2604 cacheid.itemid = bh->num_items;
2605 ereport(DEBUG1,
2606 (errmsg("memcache adding item"),
2607 errdetail("new item inserted. blockid: %d itemid:%d",
2608 cacheid.blockid, cacheid.itemid)));
2609
2610 /* Add up number of items */
2611 bh->num_items++;
2612
2613 /* Update hash table */
2614 if (pool_hash_insert(query_hash, &cacheid, false) < 0)
2615 {
2616 ereport(LOG,
2617 (errmsg("error while adding item to shmem cache, hash insert failed")));
2618
2619 /*
2620 * Since we have failed to insert hash index entry, we need to undo
2621 * the addition of cache entry.
2622 */
2623 pool_delete_item_shmem_cache(&cacheid);
2624 return NULL;
2625 }
2626 ereport(DEBUG1,
2627 (errmsg("memcache adding item"),
2628 errdetail("block: %d item: %d", cacheid.blockid, cacheid.itemid)));
2629
2630 #ifdef SHMEMCACHE_DEBUG
2631 dump_shmem_cache(blockid);
2632 #endif
2633 return &cacheid;
2634 }
2635
2636 /*
2637 * Returns item data address on shared memory cache specified by query hash.
2638 * Also data length is set to *size.
2639 * On error or data not found case returns NULL.
2640 * Detail is set to *sts. (0: success, 1: not found, -1: error)
2641 */
2642 static char *
pool_get_item_shmem_cache(POOL_QUERY_HASH * query_hash,int * size,int * sts)2643 pool_get_item_shmem_cache(POOL_QUERY_HASH * query_hash, int *size, int *sts)
2644 {
2645 POOL_CACHEID *cacheid;
2646 POOL_CACHE_ITEM_HEADER *cih;
2647
2648 if (sts == NULL)
2649 {
2650 ereport(LOG,
2651 (errmsg("error while getting item from shmem cache, sts is NULL")));
2652 return NULL;
2653 }
2654
2655 *sts = -1;
2656
2657 if (query_hash == NULL)
2658 {
2659 ereport(LOG,
2660 (errmsg("error while getting item from shmem cache, query hash is NULL")));
2661 return NULL;
2662 }
2663
2664 if (size == NULL)
2665 {
2666 ereport(LOG,
2667 (errmsg("error while getting item from shmem cache, size is NULL")));
2668 return NULL;
2669 }
2670
2671 /*
2672 * Find cache header by using hash table
2673 */
2674 cacheid = pool_find_item_on_shmem_cache(query_hash);
2675 if (cacheid == NULL)
2676 {
2677 /* Not found */
2678 *sts = 1;
2679 return NULL;
2680 }
2681
2682 cih = pool_cache_item_header(cacheid);
2683
2684 *size = cih->total_length - sizeof(POOL_CACHE_ITEM_HEADER);
2685 return (char *) cih + sizeof(POOL_CACHE_ITEM_HEADER);
2686 }
2687
2688 /*
2689 * Find data on shared memory cache specified query hash.
2690 * On success returns cache id.
2691 * The cache id is overwritten by the subsequent call to this function.
2692 */
pool_find_item_on_shmem_cache(POOL_QUERY_HASH * query_hash)2693 static POOL_CACHEID * pool_find_item_on_shmem_cache(POOL_QUERY_HASH * query_hash)
2694 {
2695 static POOL_CACHEID cacheid;
2696 POOL_CACHEID *c;
2697 POOL_CACHE_ITEM_HEADER *cih;
2698 time_t now;
2699
2700 c = pool_hash_search(query_hash);
2701 if (!c)
2702 {
2703 return NULL;
2704 }
2705
2706 cih = item_header(block_address(c->blockid), c->itemid);
2707
2708 /* Check cache expiration */
2709 if (cih->expire > 0)
2710 {
2711 now = time(NULL);
2712 if (now > (cih->timestamp + cih->expire))
2713 {
2714 ereport(DEBUG1,
2715 (errmsg("memcache finding item"),
2716 errdetail("cache expired: now: %ld timestamp: %ld",
2717 now, cih->timestamp + cih->expire)));
2718 pool_delete_item_shmem_cache(c);
2719 return NULL;
2720 }
2721 }
2722
2723 cacheid.blockid = c->blockid;
2724 cacheid.itemid = c->itemid;
2725 return &cacheid;
2726 }
2727
2728 /*
2729 * Delete item data specified cache id from shmem.
2730 * On successful deletion, returns 0.
2731 * Other wise return -1.
2732 * FSMM is also updated.
2733 */
2734 static int
pool_delete_item_shmem_cache(POOL_CACHEID * cacheid)2735 pool_delete_item_shmem_cache(POOL_CACHEID * cacheid)
2736 {
2737 POOL_CACHE_BLOCK_HEADER *bh;
2738 POOL_CACHE_ITEM_POINTER *cip;
2739 POOL_CACHE_ITEM_HEADER *cih;
2740 POOL_QUERY_HASH key;
2741 int size;
2742
2743 ereport(DEBUG1,
2744 (errmsg("memcache deleting item data"),
2745 errdetail("cacheid:%d itemid:%d", cacheid->blockid, cacheid->itemid)));
2746
2747 if (cacheid->blockid >= pool_get_memqcache_blocks())
2748 {
2749 ereport(LOG,
2750 (errmsg("error while deleting item from shmem cache, invalid block: %d",
2751 cacheid->blockid)));
2752 return -1;
2753 }
2754
2755 bh = (POOL_CACHE_BLOCK_HEADER *) block_address(cacheid->blockid);
2756 if (!(bh->flags & POOL_BLOCK_USED))
2757 {
2758 ereport(LOG,
2759 (errmsg("error while deleting item from shmem cache, block: %d is not used",
2760 cacheid->blockid)));
2761 return -1;
2762 }
2763
2764 if (cacheid->itemid >= bh->num_items)
2765 {
2766 /*
2767 * This could happen if the block is reused. Since contents of oid
2768 * map file is not updated when the block is reused.
2769 */
2770 ereport(DEBUG1,
2771 (errmsg("memcache error deleting item data"),
2772 errdetail("invalid item id %d in block:%d",
2773 cacheid->itemid, cacheid->blockid)));
2774
2775 return -1;
2776 }
2777
2778 cip = item_pointer(block_address(cacheid->blockid), cacheid->itemid);
2779 if (!(cip->flags & POOL_ITEM_USED))
2780 {
2781 ereport(LOG,
2782 (errmsg("error while deleting item from shmem cache, item: %d was not used",
2783 cacheid->itemid)));
2784 return -1;
2785 }
2786
2787 if (cip->flags & POOL_ITEM_DELETED)
2788 {
2789 ereport(LOG,
2790 (errmsg("error while deleting item from shmem cache, item: %d was already deleted",
2791 cacheid->itemid)));
2792 return -1;
2793 }
2794
2795 /* Save cache key */
2796 memcpy(&key, &cip->query_hash, sizeof(POOL_QUERY_HASH));
2797
2798 cih = pool_cache_item_header(cacheid);
2799 size = cih->total_length + sizeof(POOL_CACHE_ITEM_POINTER);
2800
2801 /* Delete item pointer */
2802 cip->flags |= POOL_ITEM_DELETED;
2803
2804 /*
2805 * We do NOT count down bh->num_items here. The deleted space will be
2806 * recycled by pool_add_item_shmem_cache(). However, if this is the last
2807 * item, we can recycle whole block.
2808 *
2809 * 2012/4/1: Now we do not pack data in pool_add_item_shmem_cache() for
2810 * performance reason. Also we count down num_items if it is the last one.
2811 */
2812 if ((bh->num_items - 1) == 0)
2813 {
2814 ereport(DEBUG1,
2815 (errmsg("memcache deleting item data"),
2816 errdetail("no item remains. initialize block")));
2817 bh->flags = 0;
2818 pool_init_cache_block(cacheid->blockid);
2819 }
2820
2821 /* Remove hash index */
2822 pool_hash_delete(&key);
2823
2824 /*
2825 * If the deleted item is last one in the block, we add it to the free
2826 * space.
2827 */
2828 if (cacheid->itemid == (bh->num_items - 1))
2829 {
2830 bh->free_bytes += size;
2831 ereport(DEBUG1,
2832 (errmsg("memcache deleting item data"),
2833 errdetail("deleted %d bytes, freebytes is = %d",
2834 size, bh->free_bytes)));
2835
2836 bh->num_items--;
2837 }
2838
2839 /* Update FSMM */
2840 pool_update_fsmm(cacheid->blockid, bh->free_bytes);
2841
2842 return 0;
2843 }
2844
2845 /*
2846 * Returns item header specified by cache id.
2847 */
pool_cache_item_header(POOL_CACHEID * cacheid)2848 static POOL_CACHE_ITEM_HEADER * pool_cache_item_header(POOL_CACHEID * cacheid)
2849 {
2850 POOL_CACHE_BLOCK_HEADER *bh;
2851
2852 if (cacheid->blockid >= pool_get_memqcache_blocks())
2853 {
2854 ereport(WARNING,
2855 (errmsg("error while getting cache item header, invalid block id: %d", cacheid->blockid)));
2856 return NULL;
2857 }
2858
2859 bh = (POOL_CACHE_BLOCK_HEADER *) block_address(cacheid->blockid);
2860 if (cacheid->itemid >= bh->num_items)
2861 {
2862 ereport(WARNING,
2863 (errmsg("error while getting cache item header, invalid item id: %d", cacheid->itemid)));
2864 return NULL;
2865 }
2866
2867 return item_header((char *) bh, cacheid->itemid);
2868 }
2869
2870 /*
2871 * Initialize specified block.
2872 */
2873 static int
pool_init_cache_block(POOL_CACHE_BLOCKID blockid)2874 pool_init_cache_block(POOL_CACHE_BLOCKID blockid)
2875 {
2876 char *p;
2877 POOL_CACHE_BLOCK_HEADER *bh;
2878
2879 if (blockid >= pool_get_memqcache_blocks())
2880 {
2881 ereport(WARNING,
2882 (errmsg("error while initializing cache block, invalid block id: %d", blockid)));
2883 return -1;
2884 }
2885
2886 p = block_address(blockid);
2887 bh = (POOL_CACHE_BLOCK_HEADER *) p;
2888
2889 /* Is this block used? */
2890 if (!(bh->flags & POOL_BLOCK_USED))
2891 {
2892 /* Initialize empty block */
2893 memset(p, 0, pool_config->memqcache_cache_block_size);
2894 bh->free_bytes = pool_config->memqcache_cache_block_size -
2895 sizeof(POOL_CACHE_BLOCK_HEADER);
2896 }
2897 return 0;
2898 }
2899
2900 #if NOT_USED
2901 /*
2902 * Delete all items in the block.
2903 */
2904 static void
pool_wipe_out_cache_block(POOL_CACHE_BLOCKID blockid)2905 pool_wipe_out_cache_block(POOL_CACHE_BLOCKID blockid)
2906 {
2907 char *p;
2908 POOL_CACHE_BLOCK_HEADER *bh;
2909 POOL_CACHE_ITEM_POINTER *cip;
2910 POOL_CACHEID cacheid;
2911 int i;
2912
2913 /* Get block address on shmem */
2914 p = block_address(blockid);
2915 bh = (POOL_CACHE_BLOCK_HEADER *) p;
2916 cacheid.blockid = blockid;
2917
2918 for (i = 0; i < bh->num_items; i++)
2919 {
2920 cip = item_pointer(p, i);
2921
2922 if ((POOL_ITEM_DELETED & cip->flags) == 0) /* Not deleted item? */
2923 {
2924 cacheid.itemid = i;
2925 pool_delete_item_shmem_cache(&cacheid);
2926 }
2927 }
2928
2929 bh->flags = 0;
2930 pool_init_cache_block(blockid);
2931 pool_update_fsmm(blockid, POOL_MAX_FREE_SPACE);
2932 }
2933 #endif
2934
2935 /*
2936 * Acquire lock: XXX giant lock
2937 */
2938 void
pool_shmem_lock(void)2939 pool_shmem_lock(void)
2940 {
2941 if (pool_is_shmem_cache() && !is_shmem_locked)
2942 {
2943 pool_semaphore_lock(SHM_CACHE_SEM);
2944 is_shmem_locked = true;
2945 }
2946 }
2947
2948 /*
2949 * Release lock
2950 */
2951 void
pool_shmem_unlock(void)2952 pool_shmem_unlock(void)
2953 {
2954 if (pool_is_shmem_cache() && is_shmem_locked)
2955 {
2956 pool_semaphore_unlock(SHM_CACHE_SEM);
2957 is_shmem_locked = false;
2958 }
2959 }
2960
2961 /*
2962 * check lock
2963 */
2964 bool
pool_is_shmem_lock(void)2965 pool_is_shmem_lock(void)
2966 {
2967 return is_shmem_locked;
2968 }
2969
2970 /*
2971 * Returns cache block address specified by block id
2972 */
2973 static char *
block_address(int blockid)2974 block_address(int blockid)
2975 {
2976 char *p;
2977
2978 p = pool_memory_cache_address() +
2979 blockid * pool_config->memqcache_cache_block_size;
2980 return p;
2981 }
2982
2983 /*
2984 * Returns i th item pointer in block address block
2985 */
item_pointer(char * block,int i)2986 static POOL_CACHE_ITEM_POINTER * item_pointer(char *block, int i)
2987 {
2988 return (POOL_CACHE_ITEM_POINTER *) (block + sizeof(POOL_CACHE_BLOCK_HEADER) +
2989 sizeof(POOL_CACHE_ITEM_POINTER) * i);
2990 }
2991
2992 /*
2993 * Returns i th item header in block address block
2994 */
item_header(char * block,int i)2995 static POOL_CACHE_ITEM_HEADER * item_header(char *block, int i)
2996 {
2997 POOL_CACHE_ITEM_POINTER *cip;
2998
2999 cip = item_pointer(block, i);
3000 return (POOL_CACHE_ITEM_HEADER *) (block + cip->offset);
3001 }
3002
3003 #ifdef SHMEMCACHE_DEBUG
3004 /*
3005 * Dump shmem cache block
3006 */
3007 static void
dump_shmem_cache(POOL_CACHE_BLOCKID blockid)3008 dump_shmem_cache(POOL_CACHE_BLOCKID blockid)
3009 {
3010 POOL_CACHE_BLOCK_HEADER *bh;
3011 POOL_CACHE_ITEM_POINTER *cip;
3012 POOL_CACHE_ITEM_HEADER *cih;
3013 int i;
3014
3015 bh = (POOL_CACHE_BLOCK_HEADER *) block_address(blockid);
3016 fprintf(stderr, "shmem: block header(%lu bytes): flags:%x num_items:%d free_bytes:%d\n",
3017 sizeof(*bh), bh->flags, bh->num_items, bh->free_bytes);
3018 for (i = 0; i < bh->num_items; i++)
3019 {
3020 cip = item_pointer((char *) bh, i);
3021 fprintf(stderr, "shmem: block: %d %d th item pointer(%lu bytes): offset:%d flags:%x\n",
3022 blockid, i, sizeof(*cip), cip->offset, cip->flags);
3023 cih = item_header((char *) bh, i);
3024 fprintf(stderr, "shmem: block: %d %d th item header(%lu bytes): timestamp:%ld length:%d\n",
3025 blockid, i, sizeof(*cih), cih->timestamp, cih->total_length);
3026 }
3027 }
3028 #endif
3029
3030 /*
3031 * SELECT query result array modules
3032 */
3033 POOL_QUERY_CACHE_ARRAY *
pool_create_query_cache_array(void)3034 pool_create_query_cache_array(void)
3035 {
3036 #define POOL_QUERY_CACHE_ARRAY_ALLOCATE_NUM 128
3037 #define POOL_QUERY_CACHE_ARRAY_HEADER_SIZE (sizeof(int)+sizeof(int))
3038
3039 size_t size;
3040 POOL_QUERY_CACHE_ARRAY *p;
3041 POOL_SESSION_CONTEXT *session_context = pool_get_session_context(false);
3042 MemoryContext old_context = MemoryContextSwitchTo(session_context->memory_context);
3043
3044 size = POOL_QUERY_CACHE_ARRAY_HEADER_SIZE + POOL_QUERY_CACHE_ARRAY_ALLOCATE_NUM *
3045 sizeof(POOL_TEMP_QUERY_CACHE *);
3046 p = palloc(size);
3047 p->num_caches = 0;
3048 p->array_size = POOL_QUERY_CACHE_ARRAY_ALLOCATE_NUM;
3049 MemoryContextSwitchTo(old_context);
3050 return p;
3051 }
3052
3053 /*
3054 * Discard query cache array
3055 */
3056 void
pool_discard_query_cache_array(POOL_QUERY_CACHE_ARRAY * cache_array)3057 pool_discard_query_cache_array(POOL_QUERY_CACHE_ARRAY * cache_array)
3058 {
3059 int i;
3060
3061 if (!cache_array)
3062 return;
3063
3064 ereport(DEBUG1,
3065 (errmsg("memcache discarding query cache array"),
3066 errdetail("num_caches: %d", cache_array->num_caches)));
3067
3068 for (i = 0; i < cache_array->num_caches; i++)
3069 {
3070 ereport(DEBUG2,
3071 (errmsg("memcache discarding query cache array"),
3072 errdetail("cache no: %d cache: %p", i, cache_array->caches[i])));
3073 pool_discard_temp_query_cache(cache_array->caches[i]);
3074 }
3075 pfree(cache_array);
3076 }
3077
3078 /*
3079 * Add query cache array
3080 */
pool_add_query_cache_array(POOL_QUERY_CACHE_ARRAY * cache_array,POOL_TEMP_QUERY_CACHE * cache)3081 static POOL_QUERY_CACHE_ARRAY * pool_add_query_cache_array(POOL_QUERY_CACHE_ARRAY * cache_array, POOL_TEMP_QUERY_CACHE * cache)
3082 {
3083 size_t size;
3084 POOL_QUERY_CACHE_ARRAY *cp = cache_array;
3085
3086 if (!cache_array)
3087 return cp;
3088
3089 ereport(DEBUG2,
3090 (errmsg("memcache adding query cache array"),
3091 errdetail("num_caches: %d cache: %p", cache_array->num_caches, cache)));
3092 if (cache_array->num_caches >= cache_array->array_size)
3093 {
3094 cache_array->array_size += POOL_QUERY_CACHE_ARRAY_ALLOCATE_NUM;
3095 size = POOL_QUERY_CACHE_ARRAY_HEADER_SIZE + cache_array->array_size *
3096 sizeof(POOL_TEMP_QUERY_CACHE *);
3097 cache_array = repalloc(cache_array, size);
3098 }
3099 cache_array->caches[cache_array->num_caches++] = cache;
3100 return cache_array;
3101 }
3102
3103 /*
3104 * SELECT query result temporary cache modules
3105 */
3106
3107 /*
3108 * Create SELECT result temporary cache
3109 */
3110 POOL_TEMP_QUERY_CACHE *
pool_create_temp_query_cache(char * query)3111 pool_create_temp_query_cache(char *query)
3112 {
3113 POOL_SESSION_CONTEXT *session_context = pool_get_session_context(false);
3114 MemoryContext old_context = MemoryContextSwitchTo(session_context->memory_context);
3115 POOL_TEMP_QUERY_CACHE *p;
3116
3117 p = palloc(sizeof(*p));
3118 p->query = pstrdup(query);
3119
3120 p->buffer = pool_create_buffer();
3121 p->oids = pool_create_buffer();
3122 p->num_oids = 0;
3123 p->is_exceeded = false;
3124 p->is_discarded = false;
3125
3126 MemoryContextSwitchTo(old_context);
3127
3128 ereport(DEBUG1,
3129 (errmsg("pool_create_temp_query_cache: cache created: %p", p)));
3130
3131 return p;
3132 }
3133
3134 /*
3135 * Discard temp query cache
3136 */
3137 void
pool_discard_temp_query_cache(POOL_TEMP_QUERY_CACHE * temp_cache)3138 pool_discard_temp_query_cache(POOL_TEMP_QUERY_CACHE * temp_cache)
3139 {
3140 if (!temp_cache)
3141 return;
3142
3143 if (temp_cache->query)
3144 pfree(temp_cache->query);
3145 if (temp_cache->buffer)
3146 pool_discard_buffer(temp_cache->buffer);
3147 if (temp_cache->oids)
3148 pool_discard_buffer(temp_cache->oids);
3149
3150 ereport(DEBUG1,
3151 (errmsg("pool_discard_temp_query_cache: cache discarded: %p", temp_cache)));
3152
3153 pfree(temp_cache);
3154 }
3155
3156 /*
3157 * Add data to temp query cache.
3158 * Data must be FE/BE protocol packet.
3159 */
3160 static void
pool_add_temp_query_cache(POOL_TEMP_QUERY_CACHE * temp_cache,char kind,char * data,int data_len)3161 pool_add_temp_query_cache(POOL_TEMP_QUERY_CACHE * temp_cache, char kind, char *data, int data_len)
3162 {
3163 POOL_INTERNAL_BUFFER *buffer;
3164 size_t buflen;
3165 int send_len;
3166
3167 if (temp_cache == NULL)
3168 {
3169 /*
3170 * This could happen if cache exceeded in previous query execution in
3171 * the same unnamed portal.
3172 */
3173 ereport(DEBUG1,
3174 (errmsg("memcache adding temporary query cache"),
3175 errdetail("POOL_TEMP_QUERY_CACHE is NULL")));
3176 return;
3177 }
3178
3179 if (temp_cache->is_exceeded)
3180 {
3181 ereport(DEBUG1,
3182 (errmsg("memcache adding temporary query cache"),
3183 errdetail("memqcache_maxcache exceeds")));
3184 return;
3185 }
3186
3187 /*
3188 * We only store T(Table Description), D(Data row), C(Command Complete),
3189 * 1(ParseComplete), 2(BindComplete)
3190 */
3191 if (kind != 'T' && kind != 'D' && kind != 'C' && kind != '1' && kind != '2')
3192 {
3193 return;
3194 }
3195
3196 /* Check data limit */
3197 buffer = temp_cache->buffer;
3198 buflen = pool_get_buffer_length(buffer);
3199
3200 if ((buflen + data_len + sizeof(int) + 1) > pool_config->memqcache_maxcache)
3201 {
3202 ereport(DEBUG1,
3203 (errmsg("memcache adding temporary query cache"),
3204 errdetail("data size exceeds memqcache_maxcache. current:%zd requested:%zd memq_maxcache:%d",
3205 buflen, data_len + sizeof(int) + 1, pool_config->memqcache_maxcache)));
3206 temp_cache->is_exceeded = true;
3207 return;
3208 }
3209
3210 pool_add_buffer(buffer, &kind, 1);
3211 send_len = htonl(data_len + sizeof(int));
3212 pool_add_buffer(buffer, (char *) &send_len, sizeof(int));
3213 pool_add_buffer(buffer, data, data_len);
3214
3215 return;
3216 }
3217
3218 /*
3219 * Add table oids used by SELECT to temp query cache.
3220 */
3221 static void
pool_add_oids_temp_query_cache(POOL_TEMP_QUERY_CACHE * temp_cache,int num_oids,int * oids)3222 pool_add_oids_temp_query_cache(POOL_TEMP_QUERY_CACHE * temp_cache, int num_oids, int *oids)
3223 {
3224 POOL_INTERNAL_BUFFER *buffer;
3225
3226 if (!temp_cache || num_oids <= 0)
3227 return;
3228
3229 buffer = temp_cache->oids;
3230 pool_add_buffer(buffer, oids, num_oids * sizeof(int));
3231 temp_cache->num_oids = num_oids;
3232 }
3233
3234 /*
3235 * Internal buffer management modules.
3236 * Usage:
3237 * 1) Create buffer using pool_create_buffer().
3238 * 2) Add data to buffer using pool_add_buffer().
3239 * 3) Extract (copied) data from buffer using pool_get_buffer().
3240 * 4) Optionally you can:
3241 * Obtain buffer length by using pool_get_buffer_length().
3242 * Obtain buffer pointer by using pool_get_buffer_pointer().
3243 * 5) Discard buffer using pool_discard_buffer().
3244 */
3245
3246 /*
3247 * Create and return internal buffer
3248 */
pool_create_buffer(void)3249 static POOL_INTERNAL_BUFFER * pool_create_buffer(void)
3250 {
3251 POOL_INTERNAL_BUFFER *p;
3252
3253 p = palloc0(sizeof(*p));
3254 return p;
3255 }
3256
3257 /*
3258 * Discard internal buffer
3259 */
3260 static void
pool_discard_buffer(POOL_INTERNAL_BUFFER * buffer)3261 pool_discard_buffer(POOL_INTERNAL_BUFFER * buffer)
3262 {
3263 if (buffer)
3264 {
3265 if (buffer->buf)
3266 pfree(buffer->buf);
3267 pfree(buffer);
3268 }
3269 }
3270
3271 /*
3272 * Add data to internal buffer
3273 */
3274 static void
pool_add_buffer(POOL_INTERNAL_BUFFER * buffer,void * data,size_t len)3275 pool_add_buffer(POOL_INTERNAL_BUFFER * buffer, void *data, size_t len)
3276 {
3277 #define POOL_ALLOCATE_UNIT 8192
3278
3279 /* Sanity check */
3280 if (!buffer || !data || len == 0)
3281 return;
3282 POOL_SESSION_CONTEXT *session_context = pool_get_session_context(false);
3283 MemoryContext old_context = MemoryContextSwitchTo(session_context->memory_context);
3284
3285 /* Check if we need to increase the buffer size */
3286 if ((buffer->buflen + len) > buffer->bufsize)
3287 {
3288 size_t allocate_size = ((buffer->buflen + len) / POOL_ALLOCATE_UNIT + 1) * POOL_ALLOCATE_UNIT;
3289
3290 ereport(DEBUG2,
3291 (errmsg("memcache adding data to internal buffer"),
3292 errdetail("realloc old size:%zd new size:%zd",
3293 buffer->bufsize, allocate_size)));
3294 buffer->bufsize = allocate_size;
3295 buffer->buf = (char *) repalloc(buffer->buf, buffer->bufsize);
3296 }
3297 /* Add data to buffer */
3298 memcpy(buffer->buf + buffer->buflen, data, len);
3299 buffer->buflen += len;
3300 ereport(DEBUG2,
3301 (errmsg("memcache adding data to internal buffer"),
3302 errdetail("len:%zd, total:%zd bufsize:%zd",
3303 len, buffer->buflen, buffer->bufsize)));
3304 MemoryContextSwitchTo(old_context);
3305 return;
3306 }
3307
3308 /*
3309 * Get data from internal buffer.
3310 * Data is stored in newly malloc memory.
3311 * Data length is returned to len.
3312 */
3313 static void *
pool_get_buffer(POOL_INTERNAL_BUFFER * buffer,size_t * len)3314 pool_get_buffer(POOL_INTERNAL_BUFFER * buffer, size_t *len)
3315 {
3316 void *p;
3317
3318 if (buffer->bufsize == 0 || buffer->buflen == 0 ||
3319 buffer->buf == NULL)
3320 {
3321 *len = 0;
3322 return NULL;
3323 }
3324
3325 p = palloc(buffer->buflen);
3326 memcpy(p, buffer->buf, buffer->buflen);
3327 *len = buffer->buflen;
3328 return p;
3329 }
3330
3331 /*
3332 * Get internal buffer length.
3333 */
3334 static size_t
pool_get_buffer_length(POOL_INTERNAL_BUFFER * buffer)3335 pool_get_buffer_length(POOL_INTERNAL_BUFFER * buffer)
3336 {
3337 if (buffer == NULL)
3338 return 0;
3339
3340 return buffer->buflen;
3341 }
3342
3343 #ifdef NOT_USED
3344 /*
3345 * Get internal buffer pointer.
3346 */
3347 static char *
pool_get_buffer_pointer(POOL_INTERNAL_BUFFER * buffer)3348 pool_get_buffer_pointer(POOL_INTERNAL_BUFFER * buffer)
3349 {
3350 if (buffer == NULL)
3351 return NULL;
3352 return buffer->buf;
3353 }
3354 #endif
3355 /*
3356 * Get query cache buffer struct of current query context
3357 */
3358 POOL_TEMP_QUERY_CACHE *
pool_get_current_cache(void)3359 pool_get_current_cache(void)
3360 {
3361 POOL_SESSION_CONTEXT *session_context;
3362 POOL_QUERY_CONTEXT *query_context;
3363 POOL_TEMP_QUERY_CACHE *p = NULL;
3364
3365 session_context = pool_get_session_context(true);
3366 if (session_context)
3367 {
3368 query_context = session_context->query_context;
3369 if (query_context)
3370 {
3371 p = query_context->temp_cache;
3372 }
3373 }
3374 return p;
3375 }
3376
3377 /*
3378 * Get query cache buffer of current query context
3379 */
3380 static char *
pool_get_current_cache_buffer(size_t * len)3381 pool_get_current_cache_buffer(size_t *len)
3382 {
3383 char *p = NULL;
3384
3385 *len = 0;
3386 POOL_TEMP_QUERY_CACHE *cache;
3387
3388 cache = pool_get_current_cache();
3389 if (cache)
3390 {
3391 p = pool_get_buffer(cache->buffer, len);
3392 }
3393 return p;
3394 }
3395
3396 /*
3397 * Mark this temporary query cache buffer discarded if the SELECT
3398 * uses the table oid specified by oids.
3399 */
3400 static void
pool_check_and_discard_cache_buffer(int num_oids,int * oids)3401 pool_check_and_discard_cache_buffer(int num_oids, int *oids)
3402 {
3403 POOL_SESSION_CONTEXT *session_context;
3404 POOL_TEMP_QUERY_CACHE *cache;
3405 int num_caches;
3406 size_t len;
3407 int *soids;
3408 int i,
3409 j,
3410 k;
3411
3412 session_context = pool_get_session_context(true);
3413
3414 if (!session_context || !session_context->query_cache_array)
3415 return;
3416
3417 num_caches = session_context->query_cache_array->num_caches;
3418
3419 for (i = 0; i < num_caches; i++)
3420 {
3421 cache = session_context->query_cache_array->caches[i];
3422 if (!cache || cache->is_discarded)
3423 continue;
3424
3425 soids = (int *) pool_get_buffer(cache->oids, &len);
3426 if (!soids || !len)
3427 continue;
3428
3429 for (j = 0; j < cache->num_oids; j++)
3430 {
3431 if (cache->is_discarded)
3432 break;
3433
3434 for (k = 0; k < num_oids; k++)
3435 {
3436 if (soids[j] == oids[k])
3437 {
3438 ereport(DEBUG1,
3439 (errmsg("discard cache for \"%s\"", cache->query)));
3440 cache->is_discarded = true;
3441 break;
3442 }
3443 }
3444 }
3445 pfree(soids);
3446 }
3447 }
3448
3449 /*
3450 * At Ready for Query or Comand Complete handle query cache. For streaming
3451 * replication mode and extended query at Comand Complete handle query cache.
3452 * For other case At Ready for Query handle query cache.
3453 */
3454 void
pool_handle_query_cache(POOL_CONNECTION_POOL * backend,char * query,Node * node,char state)3455 pool_handle_query_cache(POOL_CONNECTION_POOL * backend, char *query, Node *node, char state)
3456 {
3457 POOL_SESSION_CONTEXT *session_context;
3458 pool_sigset_t oldmask;
3459 char *cache_buffer;
3460 size_t len;
3461 int num_oids;
3462 int *oids;
3463 int i;
3464
3465 session_context = pool_get_session_context(true);
3466
3467 /* Ok to cache SELECT result? */
3468 if (pool_is_cache_safe())
3469 {
3470 SelectContext ctx;
3471 MemoryContext old_context;
3472
3473 old_context = MemoryContextSwitchTo(session_context->memory_context);
3474 num_oids = pool_extract_table_oids_from_select_stmt(node, &ctx);
3475 MemoryContextSwitchTo(old_context);
3476 oids = ctx.table_oids;
3477 ereport(DEBUG2,
3478 (errmsg("query cache handler for ReadyForQuery"),
3479 errdetail("num_oids: %d oid: %d", num_oids, *oids)));
3480
3481 if (state == 'I') /* Not inside a transaction? */
3482 {
3483 /*
3484 * Make sure that temporary cache is not exceeded.
3485 */
3486 if (!pool_is_cache_exceeded())
3487 {
3488 POOL_TEMP_QUERY_CACHE *cache;
3489
3490 /*
3491 * If we are not inside a transaction, we can immediately
3492 * register to cache storage.
3493 */
3494 /* Register to memcached or shmem */
3495 POOL_SETMASK2(&BlockSig, &oldmask);
3496 pool_shmem_lock();
3497
3498 cache_buffer = pool_get_current_cache_buffer(&len);
3499 if (cache_buffer)
3500 {
3501 if (session_context->query_context->skip_cache_commit == false)
3502 {
3503 if (pool_commit_cache(backend, query, cache_buffer, len, num_oids, oids) != 0)
3504 {
3505 ereport(WARNING,
3506 (errmsg("ReadyForQuery: pool_commit_cache failed")));
3507 }
3508 }
3509
3510 /*
3511 * Reset temporary query cache buffer. This is necessary
3512 * if extended query protocol is used and a bind/execute
3513 * message arrives which uses a statement created by prior
3514 * parse message. In this case since the temp_cache is not
3515 * initialized by a parse message, messages are added to
3516 * pre existing temp cache buffer. The problem was found
3517 * in bug#152.
3518 * http://www.pgpool.net/mantisbt/view.php?id=152
3519 */
3520 cache = pool_get_current_cache();
3521 ereport(DEBUG1,
3522 (errmsg("pool_handle_query_cache: temp_cache: %p", cache)));
3523 pool_discard_temp_query_cache(cache);
3524
3525 if (SL_MODE && pool_is_doing_extended_query_message())
3526 session_context->query_context->temp_cache = NULL;
3527 else
3528 session_context->query_context->temp_cache = pool_create_temp_query_cache(query);
3529 pfree(cache_buffer);
3530 }
3531 pool_shmem_unlock();
3532 POOL_SETMASK(&oldmask);
3533 }
3534
3535 /* Count up SELECT stats */
3536 pool_stats_count_up_num_selects(1);
3537
3538 /* Reset temp buffer */
3539 pool_reset_memqcache_buffer(true);
3540 }
3541 else
3542 {
3543 POOL_TEMP_QUERY_CACHE *cache = pool_get_current_cache();
3544
3545 /* In transaction. Keep to temp query cache array */
3546 pool_add_oids_temp_query_cache(cache, num_oids, oids);
3547
3548 /*
3549 * If temp cache has been overflowed, just trash the half baked
3550 * temp cache.
3551 */
3552 if (pool_is_cache_exceeded())
3553 {
3554 POOL_TEMP_QUERY_CACHE *cache;
3555
3556 cache = pool_get_current_cache();
3557 pool_discard_temp_query_cache(cache);
3558
3559 /*
3560 * Reset temp_cache pointer in the current query context so
3561 * that we don't double free memory.
3562 */
3563 session_context->query_context->temp_cache = NULL;
3564
3565 }
3566
3567 /*
3568 * Otherwise add to the temp cache array.
3569 */
3570 else
3571 {
3572 session_context->query_cache_array =
3573 pool_add_query_cache_array(session_context->query_cache_array, cache);
3574
3575 /*
3576 * Reset temp_cache pointer in the current query context so
3577 * that we don't add the same temp cache to the cache array.
3578 * This is necessary such that case when next query is just a
3579 * "bind message", without "parse message". In the case the
3580 * query context is reused and same cache pointer will be
3581 * added to the query_cache_array which we do not want.
3582 */
3583 session_context->query_context->temp_cache = NULL;
3584 }
3585
3586 /* Count up temporary SELECT stats */
3587 pool_tmp_stats_count_up_num_selects();
3588 }
3589 }
3590 else if (is_rollback_query(node)) /* Rollback? */
3591 {
3592 /* Discard buffered data */
3593 pool_reset_memqcache_buffer(true);
3594 }
3595 else if (is_commit_query(node)) /* Commit? */
3596 {
3597 int num_caches;
3598
3599 POOL_SETMASK2(&BlockSig, &oldmask);
3600 pool_shmem_lock();
3601
3602 /* Invalidate query cache */
3603 if (pool_config->memqcache_auto_cache_invalidation)
3604 {
3605 num_oids = pool_get_dml_table_oid(&oids);
3606 pool_invalidate_query_cache(num_oids, oids, true, 0);
3607 }
3608
3609 /*--------------------------------------------------------------------
3610 * If we have something in the query cache buffer, that means either:
3611 * - We only had SELECTs in the transaction
3612 * - We had only SELECTs after the last DML
3613 * Thus we can register SELECT results to cache storage.
3614 *--------------------------------------------------------------------
3615 */
3616 num_caches = session_context->query_cache_array->num_caches;
3617 for (i = 0; i < num_caches; i++)
3618 {
3619 POOL_TEMP_QUERY_CACHE *cache;
3620
3621 cache = session_context->query_cache_array->caches[i];
3622 if (!cache || cache->is_discarded)
3623 continue;
3624
3625 num_oids = cache->num_oids;
3626 oids = pool_get_buffer(cache->oids, &len);
3627 cache_buffer = pool_get_buffer(cache->buffer, &len);
3628
3629 if (pool_commit_cache(backend, cache->query, cache_buffer, len, num_oids, oids) != 0)
3630 {
3631 ereport(WARNING,
3632 (errmsg("ReadyForQuery: pool_commit_cache failed")));
3633 }
3634 if (oids)
3635 pfree(oids);
3636 if (cache_buffer)
3637 pfree(cache_buffer);
3638 }
3639 pool_shmem_unlock();
3640 POOL_SETMASK(&oldmask);
3641
3642 /* Count up number of SELECT stats */
3643 pool_stats_count_up_num_selects(pool_tmp_stats_get_num_selects());
3644
3645 pool_reset_memqcache_buffer(true);
3646 }
3647 else /* Non cache safe queries */
3648 {
3649 /* Non cachable SELECT */
3650 if (node && IsA(node, SelectStmt))
3651 {
3652 /* Extract table oids from buffer */
3653 num_oids = pool_get_dml_table_oid(&oids);
3654
3655 if (state == 'I')
3656 {
3657 /*
3658 * If Data-modifying statements in SELECT's WITH clause,
3659 * invalidate query cache.
3660 */
3661 if (num_oids > 0 && pool_config->memqcache_auto_cache_invalidation)
3662 {
3663 POOL_SETMASK2(&BlockSig, &oldmask);
3664 pool_shmem_lock();
3665 pool_invalidate_query_cache(num_oids, oids, true, 0);
3666 pool_shmem_unlock();
3667 POOL_SETMASK(&oldmask);
3668 }
3669
3670 /* Count up SELECT stats */
3671 pool_stats_count_up_num_selects(1);
3672 pool_reset_memqcache_buffer(true);
3673 }
3674 else
3675 {
3676 /*
3677 * If we are inside a transaction, we cannot invalidate
3678 * query cache yet. However we can clear cache buffer, if
3679 * DML/DDL modifies the TABLE which SELECT uses.
3680 */
3681 if (num_oids > 0 && pool_config->memqcache_auto_cache_invalidation)
3682 {
3683 pool_check_and_discard_cache_buffer(num_oids, oids);
3684 pool_reset_memqcache_buffer(false);
3685 }
3686
3687 /* Count up temporary SELECT stats */
3688 pool_tmp_stats_count_up_num_selects();
3689 }
3690 }
3691
3692 /*
3693 * If the query is DROP DATABASE, discard both of caches in
3694 * shmem/memcached and oidmap in memqcache_oiddir.
3695 */
3696 else if (is_drop_database(node) && session_context->query_context->dboid != 0)
3697 {
3698 int dboid = session_context->query_context->dboid;
3699
3700 num_oids = pool_get_dropdb_table_oids(&oids, dboid);
3701
3702 if (num_oids > 0 && pool_config->memqcache_auto_cache_invalidation)
3703 {
3704 pool_shmem_lock();
3705 pool_invalidate_query_cache(num_oids, oids, true, dboid);
3706 pool_discard_oid_maps_by_db(dboid);
3707 pool_shmem_unlock();
3708 pool_reset_memqcache_buffer(true);
3709
3710 pfree(oids);
3711 ereport(DEBUG2,
3712 (errmsg("query cache handler for ReadyForQuery"),
3713 errdetail("deleted all cache files for the DROPped DB")));
3714 }
3715 }
3716 else
3717 {
3718 /*
3719 * DML/DCL/DDL case
3720 */
3721
3722 /* Extract table oids from buffer */
3723 num_oids = pool_get_dml_table_oid(&oids);
3724 if (num_oids > 0 && pool_config->memqcache_auto_cache_invalidation)
3725 {
3726 /*
3727 * If we are not inside a transaction, we can immediately
3728 * invalidate query cache.
3729 */
3730 if (state == 'I')
3731 {
3732 POOL_SETMASK2(&BlockSig, &oldmask);
3733 pool_shmem_lock();
3734 pool_invalidate_query_cache(num_oids, oids, true, 0);
3735 pool_shmem_unlock();
3736 POOL_SETMASK(&oldmask);
3737 pool_reset_memqcache_buffer(true);
3738 }
3739 else
3740 {
3741 /*
3742 * If we are inside a transaction, we cannot invalidate
3743 * query cache yet. However we can clear cache buffer, if
3744 * DML/DDL modifies the TABLE which SELECT uses.
3745 */
3746 pool_check_and_discard_cache_buffer(num_oids, oids);
3747 pool_reset_memqcache_buffer(false);
3748 }
3749 }
3750 else if (num_oids == 0)
3751 {
3752 /*
3753 * It is also necessary to clear cache buffers in case of no
3754 * oid queries (like BEGIN, CHECKPOINT, VACUUM, etc) too.
3755 */
3756 pool_reset_memqcache_buffer(true);
3757 }
3758 }
3759 }
3760 }
3761
3762 /*
3763 * Create and initialize query cache stats
3764 */
3765 static POOL_QUERY_CACHE_STATS * stats;
3766 int
pool_init_memqcache_stats(void)3767 pool_init_memqcache_stats(void)
3768 {
3769 stats = pool_shared_memory_create(sizeof(POOL_QUERY_CACHE_STATS));
3770 pool_reset_memqcache_stats();
3771 return 0;
3772 }
3773
3774 /*
3775 * Returns copy of stats area. The copy is in static area and will be
3776 * overwritten by next call to this function.
3777 */
3778 POOL_QUERY_CACHE_STATS *
pool_get_memqcache_stats(void)3779 pool_get_memqcache_stats(void)
3780 {
3781 static POOL_QUERY_CACHE_STATS mystats;
3782 pool_sigset_t oldmask;
3783
3784 memset(&mystats, 0, sizeof(POOL_QUERY_CACHE_STATS));
3785
3786 if (stats)
3787 {
3788 POOL_SETMASK2(&BlockSig, &oldmask);
3789 pool_semaphore_lock(QUERY_CACHE_STATS_SEM);
3790 memcpy(&mystats, stats, sizeof(POOL_QUERY_CACHE_STATS));
3791 pool_semaphore_unlock(QUERY_CACHE_STATS_SEM);
3792 POOL_SETMASK(&oldmask);
3793 }
3794
3795 return &mystats;
3796 }
3797
3798 /*
3799 * Reset query cache stats. Caller must lock QUERY_CACHE_STATS_SEM if
3800 * necessary.
3801 */
3802 void
pool_reset_memqcache_stats(void)3803 pool_reset_memqcache_stats(void)
3804 {
3805 memset(stats, 0, sizeof(POOL_QUERY_CACHE_STATS));
3806 stats->start_time = time(NULL);
3807 }
3808
3809 /*
3810 * Count up number of successful SELECTs and returns the number.
3811 * QUERY_CACHE_STATS_SEM lock is acquired in this function.
3812 */
3813 long long int
pool_stats_count_up_num_selects(long long int num)3814 pool_stats_count_up_num_selects(long long int num)
3815 {
3816 pool_sigset_t oldmask;
3817
3818 POOL_SETMASK2(&BlockSig, &oldmask);
3819 pool_semaphore_lock(QUERY_CACHE_STATS_SEM);
3820 stats->num_selects += num;
3821 pool_semaphore_unlock(QUERY_CACHE_STATS_SEM);
3822 POOL_SETMASK(&oldmask);
3823 return stats->num_selects;
3824 }
3825
3826 /*
3827 * Count up number of successful SELECTs in temporary area and returns
3828 * the number.
3829 */
3830 long long int
pool_tmp_stats_count_up_num_selects(void)3831 pool_tmp_stats_count_up_num_selects(void)
3832 {
3833 POOL_SESSION_CONTEXT *session_context;
3834
3835 session_context = pool_get_session_context(false);
3836 session_context->num_selects++;
3837 return session_context->num_selects;
3838 }
3839
3840 /*
3841 * Return number of successful SELECTs in temporary area.
3842 */
3843 long long int
pool_tmp_stats_get_num_selects(void)3844 pool_tmp_stats_get_num_selects(void)
3845 {
3846 POOL_SESSION_CONTEXT *session_context;
3847
3848 session_context = pool_get_session_context(false);
3849 return session_context->num_selects;
3850 }
3851
3852 /*
3853 * Reset number of successful SELECTs in temporary area.
3854 */
3855 void
pool_tmp_stats_reset_num_selects(void)3856 pool_tmp_stats_reset_num_selects(void)
3857 {
3858 POOL_SESSION_CONTEXT *session_context;
3859
3860 session_context = pool_get_session_context(false);
3861 session_context->num_selects = 0;
3862 }
3863
3864 /*
3865 * Count up number of SELECTs extracted from cache returns the number.
3866 * QUERY_CACHE_STATS_SEM lock is acquired in this function.
3867 */
3868 long long int
pool_stats_count_up_num_cache_hits(void)3869 pool_stats_count_up_num_cache_hits(void)
3870 {
3871 pool_sigset_t oldmask;
3872
3873 POOL_SETMASK2(&BlockSig, &oldmask);
3874 pool_semaphore_lock(QUERY_CACHE_STATS_SEM);
3875 stats->num_cache_hits++;
3876 pool_semaphore_unlock(QUERY_CACHE_STATS_SEM);
3877 POOL_SETMASK(&oldmask);
3878 return stats->num_cache_hits;
3879 }
3880
3881 /*
3882 * On shared memory hash table implementation. We use sub part of md5
3883 * hash key as hash function. The experiment has shown that has_any()
3884 * of PostgreSQL is a little bit better than the method using part of
3885 * md5 hash value, but it seems adding some cpu cycles to call
3886 * hash_any() is not worth the trouble.
3887 */
3888
3889 static volatile POOL_HASH_HEADER *hash_header;
3890 static volatile POOL_HASH_ELEMENT *hash_elements;
3891 static volatile POOL_HASH_ELEMENT *hash_free;
3892
3893 /*
3894 * Initialize hash table on shared memory "nelements" is max number of
3895 * hash keys. The actual number of hash key is rounded up to power of
3896 * 2.
3897 */
3898 #undef POOL_HASH_DEBUG
3899
3900 int
pool_hash_init(int nelements)3901 pool_hash_init(int nelements)
3902 {
3903 size_t size;
3904 int nelements2; /* number of rounded up hash keys */
3905 int shift;
3906 uint32 mask;
3907 POOL_HASH_HEADER hh;
3908 int i;
3909
3910 if (nelements <= 0)
3911 ereport(ERROR,
3912 (errmsg("initializing hash table on shared memory, invalid number of elements: %d", nelements)));
3913
3914 /* Round up to power of 2 */
3915 shift = 32;
3916 nelements2 = 1;
3917 do
3918 {
3919 nelements2 <<= 1;
3920 shift--;
3921 } while (nelements2 < nelements);
3922
3923 mask = ~0;
3924 mask >>= shift;
3925 size = (char *) &hh.elements - (char *) &hh + sizeof(POOL_HEADER_ELEMENT) * nelements2;
3926 hash_header = pool_shared_memory_create(size);
3927 hash_header->nhash = nelements2;
3928 hash_header->mask = mask;
3929
3930 #ifdef POOL_HASH_DEBUG
3931 ereport(LOG,
3932 (errmsg("initializing hash table on shared memory"),
3933 errdetail("size:%zd nelements2:%d", size, nelements2)));
3934
3935 #endif
3936
3937 size = sizeof(POOL_HASH_ELEMENT) * nelements2;
3938 hash_elements = pool_shared_memory_create(size);
3939
3940 #ifdef POOL_HASH_DEBUG
3941 ereport(LOG,
3942 (errmsg("initializing hash table on shared memory"),
3943 errdetail("size:%zd nelements2:%d", size, nelements2)));
3944 #endif
3945
3946 for (i = 0; i < nelements2 - 1; i++)
3947 {
3948 hash_elements[i].next = (POOL_HASH_ELEMENT *) & hash_elements[i + 1];
3949 }
3950 hash_elements[nelements2 - 1].next = NULL;
3951 hash_free = hash_elements;
3952
3953 return 0;
3954 }
3955
3956 /*
3957 * Reset hash table on shared memory "nelements" is max number of
3958 * hash keys. The actual number of hash key is rounded up to power of
3959 * 2.
3960 */
3961 static int
pool_hash_reset(int nelements)3962 pool_hash_reset(int nelements)
3963 {
3964 size_t size;
3965 int nelements2; /* number of rounded up hash keys */
3966 int shift;
3967 uint32 mask;
3968 POOL_HASH_HEADER hh;
3969 int i;
3970
3971 if (nelements <= 0)
3972 ereport(ERROR,
3973 (errmsg("clearing hash table on shared memory, invalid number of elements: %d", nelements)));
3974
3975 /* Round up to power of 2 */
3976 shift = 32;
3977 nelements2 = 1;
3978 do
3979 {
3980 nelements2 <<= 1;
3981 shift--;
3982 } while (nelements2 < nelements);
3983
3984 mask = ~0;
3985 mask >>= shift;
3986
3987 size = (char *) &hh.elements - (char *) &hh + sizeof(POOL_HEADER_ELEMENT) * nelements2;
3988 memset((void *) hash_header, 0, size);
3989
3990 hash_header->nhash = nelements2;
3991 hash_header->mask = mask;
3992
3993 size = sizeof(POOL_HASH_ELEMENT) * nelements2;
3994 memset((void *) hash_elements, 0, size);
3995
3996 for (i = 0; i < nelements2 - 1; i++)
3997 {
3998 hash_elements[i].next = (POOL_HASH_ELEMENT *) & hash_elements[i + 1];
3999 }
4000 hash_elements[nelements2 - 1].next = NULL;
4001 hash_free = hash_elements;
4002
4003 return 0;
4004 }
4005
4006 /*
4007 * Search cacheid by MD5 hash key string
4008 * If found, returns cache id, otherwise NULL.
4009 */
4010 POOL_CACHEID *
pool_hash_search(POOL_QUERY_HASH * key)4011 pool_hash_search(POOL_QUERY_HASH * key)
4012 {
4013 volatile POOL_HASH_ELEMENT *element;
4014
4015 uint32 hash_key = create_hash_key(key);
4016
4017 if (hash_key >= hash_header->nhash)
4018 {
4019 ereport(WARNING,
4020 (errmsg("memcache: searching cacheid from hash. invalid hash key"),
4021 errdetail("invalid hash key: %uld nhash: %ld",
4022 hash_key, hash_header->nhash)));
4023 return NULL;
4024 }
4025
4026 {
4027 char md5[POOL_MD5_HASHKEYLEN + 1];
4028
4029 memcpy(md5, key->query_hash, POOL_MD5_HASHKEYLEN);
4030 md5[POOL_MD5_HASHKEYLEN] = '\0';
4031 #ifdef POOL_HASH_DEBUG
4032 ereport(LOG,
4033 (errmsg("searching hash table"),
4034 errdetail("hash_key:%d md5:%s", hash_key, md5)));
4035 #endif
4036 }
4037
4038 element = hash_header->elements[hash_key].element;
4039 while (element)
4040 {
4041 {
4042 char md5[POOL_MD5_HASHKEYLEN + 1];
4043
4044 memcpy(md5, key->query_hash, POOL_MD5_HASHKEYLEN);
4045 md5[POOL_MD5_HASHKEYLEN] = '\0';
4046 #ifdef POOL_HASH_DEBUG
4047 ereport(LOG,
4048 (errmsg("searching hash table"),
4049 errdetail("element md5:%s", md5)));
4050 #endif
4051 }
4052
4053 if (memcmp((const void *) element->hashkey.query_hash,
4054 (const void *) key->query_hash, sizeof(key->query_hash)) == 0)
4055 {
4056 return (POOL_CACHEID *) & element->cacheid;
4057 }
4058 element = element->next;
4059 }
4060 return NULL;
4061 }
4062
4063 /*
4064 * Insert MD5 key and associated cache id into shmem hash table. If
4065 * "update" is true, replace cacheid associated with the MD5 key,
4066 * rather than throw an error.
4067 */
4068 static int
pool_hash_insert(POOL_QUERY_HASH * key,POOL_CACHEID * cacheid,bool update)4069 pool_hash_insert(POOL_QUERY_HASH * key, POOL_CACHEID * cacheid, bool update)
4070 {
4071 POOL_HASH_ELEMENT *element;
4072 POOL_HASH_ELEMENT *new_element;
4073
4074 uint32 hash_key = create_hash_key(key);
4075
4076 if (hash_key >= hash_header->nhash)
4077 {
4078 ereport(WARNING,
4079 (errmsg("memcache: adding cacheid to hash. invalid hash key"),
4080 errdetail("invalid hash key: %uld nhash: %ld",
4081 hash_key, hash_header->nhash)));
4082 return -1;
4083 }
4084
4085 {
4086 char md5[POOL_MD5_HASHKEYLEN + 1];
4087
4088 memcpy(md5, key->query_hash, POOL_MD5_HASHKEYLEN);
4089 md5[POOL_MD5_HASHKEYLEN] = '\0';
4090 #ifdef POOL_HASH_DEBUG
4091 ereport(LOG,
4092 (errmsg("searching hash table"),
4093 errdetail("hash_key:%d md5:%s block:%d item:%d", hash_key, md5, cacheid->blockid, cacheid->itemid)));
4094 #endif
4095 }
4096
4097 /*
4098 * Look for hash key.
4099 */
4100 element = hash_header->elements[hash_key].element;
4101
4102 while (element)
4103 {
4104 if (memcmp((const void *) element->hashkey.query_hash,
4105 (const void *) key->query_hash, sizeof(key->query_hash)) == 0)
4106 {
4107 /* Hash key found. If "update" is false, just throw an error. */
4108 char md5[POOL_MD5_HASHKEYLEN + 1];
4109
4110 if (!update)
4111 {
4112 memcpy(md5, key->query_hash, POOL_MD5_HASHKEYLEN);
4113 md5[POOL_MD5_HASHKEYLEN] = '\0';
4114 ereport(LOG,
4115 (errmsg("memcache: adding cacheid to hash. hash key:\"%s\" already exists", md5)));
4116 return -1;
4117 }
4118 else
4119 {
4120 /* Update cache id */
4121 memcpy((void *) &element->cacheid, cacheid, sizeof(POOL_CACHEID));
4122 return 0;
4123 }
4124 }
4125 element = element->next;
4126 }
4127
4128 /*
4129 * Ok, same key did not exist. Just insert new hash key.
4130 */
4131 new_element = (POOL_HASH_ELEMENT *) get_new_hash_element();
4132 if (!new_element)
4133 {
4134 ereport(LOG,
4135 (errmsg("memcache: adding cacheid to hash. failed to get new element")));
4136 return -1;
4137 }
4138
4139 element = hash_header->elements[hash_key].element;
4140
4141 hash_header->elements[hash_key].element = new_element;
4142 new_element->next = element;
4143
4144 memcpy((void *) new_element->hashkey.query_hash, key->query_hash, POOL_MD5_HASHKEYLEN);
4145 memcpy((void *) &new_element->cacheid, cacheid, sizeof(POOL_CACHEID));
4146
4147 return 0;
4148 }
4149
4150 /*
4151 * Delete MD5 key and associated cache id from shmem hash table.
4152 */
4153 int
pool_hash_delete(POOL_QUERY_HASH * key)4154 pool_hash_delete(POOL_QUERY_HASH * key)
4155 {
4156 POOL_HASH_ELEMENT *element;
4157 POOL_HASH_ELEMENT **delete_point;
4158 bool found;
4159
4160 uint32 hash_key = create_hash_key(key);
4161
4162 if (hash_key >= hash_header->nhash)
4163 {
4164 ereport(LOG,
4165 (errmsg("memcache: deleting key from hash. invalid key"),
4166 errdetail("invalid hash key: %uld nhash: %ld",
4167 hash_key, hash_header->nhash)));
4168 return -1;
4169 }
4170
4171 /*
4172 * Look for delete location
4173 */
4174 found = false;
4175 delete_point = (POOL_HASH_ELEMENT * *) & (hash_header->elements[hash_key].element);
4176 element = hash_header->elements[hash_key].element;
4177
4178 while (element)
4179 {
4180 if (memcmp(element->hashkey.query_hash, key->query_hash, sizeof(key->query_hash)) == 0)
4181 {
4182 found = true;
4183 break;
4184 }
4185 delete_point = &element->next;
4186 element = element->next;
4187 }
4188
4189 if (!found)
4190 {
4191 char md5[POOL_MD5_HASHKEYLEN + 1];
4192
4193 memcpy(md5, key->query_hash, POOL_MD5_HASHKEYLEN);
4194 md5[POOL_MD5_HASHKEYLEN] = '\0';
4195 ereport(LOG,
4196 (errmsg("memcache: deleting key from hash. key:\"%s\" not found", md5)));
4197 return -1;
4198 }
4199
4200 /*
4201 * Put back the element to free list
4202 */
4203 *delete_point = element->next;
4204 put_back_hash_element(element);
4205
4206 return 0;
4207 }
4208
4209 /*
4210 * Calculate 32bit binary hash key (i.e. location in hash header) from MD5
4211 * string. We use top most 8 characters of MD5 string for calculation.
4212 */
4213 static uint32
create_hash_key(POOL_QUERY_HASH * key)4214 create_hash_key(POOL_QUERY_HASH * key)
4215 {
4216 #define POOL_HASH_NCHARS 8
4217
4218 char md5[POOL_HASH_NCHARS + 1];
4219 uint32 mask;
4220
4221 memcpy(md5, key->query_hash, POOL_HASH_NCHARS);
4222 md5[POOL_HASH_NCHARS] = '\0';
4223 mask = strtoul(md5, NULL, 16);
4224 mask &= hash_header->mask;
4225 return mask;
4226 }
4227
4228 /*
4229 * Get new free hash element from free list.
4230 */
4231 static volatile POOL_HASH_ELEMENT *
get_new_hash_element(void)4232 get_new_hash_element(void)
4233 {
4234 volatile POOL_HASH_ELEMENT *elm;
4235
4236 if (!hash_free->next)
4237 {
4238 /* No free element */
4239 return NULL;
4240 }
4241
4242 #ifdef POOL_HASH_DEBUG
4243 ereport(LOG,
4244 (errmsg("getting new hash element"),
4245 errdetail("hash_free->next:%p hash_free->next->next:%p",
4246 hash_free->next, hash_free->next->next)));
4247 #endif
4248
4249 elm = hash_free->next;
4250 hash_free->next = elm->next;
4251
4252 return elm;
4253 }
4254
4255 /*
4256 * Put back hash element to free list.
4257 */
4258 static void
put_back_hash_element(volatile POOL_HASH_ELEMENT * element)4259 put_back_hash_element(volatile POOL_HASH_ELEMENT * element)
4260 {
4261 POOL_HASH_ELEMENT *elm;
4262
4263 #ifdef POOL_HASH_DEBUG
4264 ereport(LOG,
4265 (errmsg("getting new hash element"),
4266 errdetail("hash_free->next:%p hash_free->next->next:%p",
4267 hash_free->next, hash_free->next->next)));
4268 #endif
4269
4270 elm = hash_free->next;
4271 hash_free->next = (POOL_HASH_ELEMENT *) element;
4272 element->next = elm;
4273 }
4274
4275 /*
4276 * Return true if there's a free hash element.
4277 */
4278 static bool
is_free_hash_element(void)4279 is_free_hash_element(void)
4280 {
4281 return hash_free->next != NULL;
4282 }
4283
4284 /*
4285 * Returns shared memory cache stats.
4286 * Subsequent call to this function will break return value
4287 * because its in static memory.
4288 * Caller must hold shmem_lock before calling this function.
4289 * If in memory query cache is not enabled, all stats are 0.
4290 */
4291 POOL_SHMEM_STATS *
pool_get_shmem_storage_stats(void)4292 pool_get_shmem_storage_stats(void)
4293 {
4294 static POOL_SHMEM_STATS mystats;
4295 POOL_HASH_ELEMENT *element;
4296 int nblocks;
4297 int i;
4298
4299 memset(&mystats, 0, sizeof(POOL_SHMEM_STATS));
4300
4301 if (!pool_config->memory_cache_enabled)
4302 return &mystats;
4303
4304 /*
4305 * Copy cache hit data
4306 */
4307 mystats.cache_stats.num_selects = stats->num_selects;
4308 mystats.cache_stats.num_cache_hits = stats->num_cache_hits;
4309
4310 if (pool_config->memqcache_method != SHMEM_CACHE)
4311 return &mystats;
4312
4313 /* number of total hash entries */
4314 mystats.num_hash_entries = hash_header->nhash;
4315
4316 /* number of used hash entries */
4317 for (i = 0; i < hash_header->nhash; i++)
4318 {
4319 element = hash_header->elements[i].element;
4320 while (element)
4321 {
4322 mystats.used_hash_entries++;
4323 element = element->next;
4324 }
4325 }
4326
4327 nblocks = pool_get_memqcache_blocks();
4328
4329 for (i = 0; i < nblocks; i++)
4330 {
4331 POOL_CACHE_BLOCK_HEADER *bh;
4332 POOL_CACHE_ITEM_POINTER *cip;
4333 char *p = block_address(i);
4334
4335 bh = (POOL_CACHE_BLOCK_HEADER *) p;
4336 int j;
4337
4338 if (bh->flags & POOL_BLOCK_USED)
4339 {
4340 for (j = 0; j < bh->num_items; j++)
4341 {
4342 cip = item_pointer(p, j);
4343 if (POOL_ITEM_DELETED & cip->flags)
4344 {
4345 mystats.fragment_cache_entries_size += item_header(p, j)->total_length;
4346 }
4347 else
4348 {
4349 /* number of used cache entries */
4350 mystats.num_cache_entries++;
4351 /* total size of used cache entries */
4352 mystats.used_cache_entries_size += (item_header(p, j)->total_length + sizeof(POOL_CACHE_ITEM_POINTER));
4353 }
4354 }
4355 mystats.used_cache_entries_size += sizeof(POOL_CACHE_BLOCK_HEADER);
4356 /* total size of free(usable) cache entries */
4357 mystats.free_cache_entries_size += bh->free_bytes;
4358 }
4359 else
4360 {
4361 mystats.free_cache_entries_size += pool_config->memqcache_cache_block_size;
4362 }
4363 }
4364
4365 /*
4366 * Copy POOL_QUERY_CACHE_STATS
4367 */
4368 memcpy(&mystats.cache_stats, stats, sizeof(mystats.cache_stats));
4369
4370 return &mystats;
4371 }
4372
4373 /*
4374 * Inject cached message to the target backend buffer to pretend as if backend
4375 * actually replies with Data row and Command Complete message.
4376 */
4377 static void
inject_cached_message(POOL_CONNECTION * backend,char * qcache,int qcachelen)4378 inject_cached_message(POOL_CONNECTION * backend, char *qcache, int qcachelen)
4379 {
4380 char kind;
4381 int len;
4382 char *buf;
4383 int timeout;
4384 int i = 0;
4385 bool is_prepared_stmt = false;
4386 POOL_SESSION_CONTEXT *session_context;
4387 POOL_QUERY_CONTEXT *query_context;
4388 POOL_PENDING_MESSAGE *msg;
4389
4390 session_context = pool_get_session_context(false);
4391 query_context = session_context->query_context;
4392 msg = pool_pending_message_find_lastest_by_query_context(query_context);
4393
4394 if (msg)
4395 {
4396 /*
4397 * If pending message found, we should extract target backend from it
4398 */
4399 int backend_id;
4400
4401 backend_id = pool_pending_message_get_target_backend_id(msg);
4402 backend = CONNECTION(session_context->backend, backend_id);
4403 timeout = -1;
4404 }
4405 else
4406 timeout = 0;
4407
4408 /* Send flush messsage to backend to retrieve response of backend */
4409 pool_write(backend, "H", 1);
4410 len = htonl(sizeof(len));
4411 pool_write_and_flush(backend, &len, sizeof(len));
4412
4413 /*
4414 * Push any response from backend
4415 */
4416 for (;;)
4417 {
4418 /* check if there's any pending data */
4419 if (!pool_ssl_pending(backend) && pool_read_buffer_is_empty(backend))
4420 {
4421 pool_set_timeout(timeout);
4422 if (pool_check_fd(backend) != 0)
4423 {
4424 ereport(DEBUG1,
4425 (errmsg("inject_cached_message: select shows no pending data")));
4426 pool_set_timeout(-1);
4427 break;
4428 }
4429 pool_set_timeout(-1);
4430 }
4431
4432 pool_read(backend, &kind, 1);
4433 ereport(DEBUG1,
4434 (errmsg("inject_cached_message: push message kind: '%c'", kind)));
4435 if (msg &&
4436 ((kind == 'T' && msg->type == POOL_DESCRIBE) ||
4437 (kind == '2' && msg->type == POOL_BIND)))
4438 {
4439 /* Pending message seen. Now it is likely to end of pending data */
4440 timeout = 0;
4441 }
4442 pool_push(backend, &kind, sizeof(kind));
4443 pool_read(backend, &len, sizeof(len));
4444 pool_push(backend, &len, sizeof(len));
4445 if ((ntohl(len) - sizeof(len)) > 0)
4446 {
4447 buf = pool_read2(backend, ntohl(len) - sizeof(len));
4448 pool_push(backend, buf, ntohl(len) - sizeof(len));
4449 }
4450 }
4451
4452 /*
4453 * Inject row data and command complete
4454 */
4455 while (i < qcachelen)
4456 {
4457 char tmpkind;
4458 int tmplen;
4459 char *p;
4460
4461 tmpkind = qcache[i];
4462 i++;
4463
4464 memcpy(&tmplen, qcache + i, sizeof(tmplen));
4465 i += sizeof(tmplen);
4466 len = ntohl(tmplen);
4467 p = qcache + i;
4468 i += len - sizeof(tmplen);
4469
4470 /* No need to cache PARSE and BIND responses */
4471 if (tmpkind == '1' || tmpkind == '2')
4472 {
4473 is_prepared_stmt = true;
4474 continue;
4475 }
4476
4477 /*
4478 * In the prepared statement execution, there is no need to send 'T'
4479 * response to the frontend.
4480 */
4481 if (is_prepared_stmt && tmpkind == 'T')
4482 {
4483 continue;
4484 }
4485
4486 /* push message */
4487 ereport(DEBUG1,
4488 (errmsg("inject_cached_message: push cached messages: '%c' len: %d", tmpkind, len)));
4489 pool_push(backend, &tmpkind, 1);
4490 pool_push(backend, &tmplen, sizeof(tmplen));
4491 if (len > 0)
4492 pool_push(backend, p, len - sizeof(tmplen));
4493 }
4494
4495 /*
4496 * Pop data.
4497 */
4498 pool_pop(backend, &len);
4499 }
4500