1 /* -*-pgsql-c-*- */
2 /*
3 * pgpool: a language independent connection pool server for PostgreSQL
4 * written by Tatsuo Ishii
5 *
6 * Copyright (c) 2003-2020 PgPool Global Development Group
7 *
8 * Permission to use, copy, modify, and distribute this software and
9 * its documentation for any purpose and without fee is hereby
10 * granted, provided that the above copyright notice appear in all
11 * copies and that both that copyright notice and this permission
12 * notice appear in supporting documentation, and that the name of the
13 * author not be used in advertising or publicity pertaining to
14 * distribution of the software without specific, written prior
15 * permission. The author makes no representations about the
16 * suitability of this software for any purpose. It is provided "as
17 * is" without express or implied warranty.
18 *
19 * pool_memqcache.c: query cache on shmem or memcached
20 *
21 */
22 #define DATABASE_TO_OID_QUERY "SELECT oid FROM pg_database WHERE datname = '%s'"
23
24 #include "pool.h"
25
26 #include <stdio.h>
27 #include <string.h>
28 #include <unistd.h>
29 #include <sys/file.h>
30 #include <sys/stat.h>
31 #include <sys/types.h>
32 #include <fcntl.h>
33 #include <errno.h>
34 #include <string.h>
35 #include <stdlib.h>
36 #include <ctype.h>
37 #include <arpa/inet.h>
38 #include <dirent.h>
39
40 #ifdef USE_MEMCACHED
41 #include <libmemcached/memcached.h>
42 #endif
43
44 #include "auth/md5.h"
45 #include "pool_config.h"
46 #include "protocol/pool_proto_modules.h"
47 #include "parser/parsenodes.h"
48 #include "context/pool_session_context.h"
49 #include "query_cache/pool_memqcache.h"
50 #include "utils/pool_relcache.h"
51 #include "utils/pool_select_walker.h"
52 #include "utils/pool_stream.h"
53 #include "utils/pool_stream.h"
54 #include "utils/elog.h"
55 #include "utils/palloc.h"
56 #include "utils/memutils.h"
57
58
59 #ifdef USE_MEMCACHED
60 memcached_st *memc;
61 #endif
62
63 static char *encode_key(const char *s, char *buf, POOL_CONNECTION_POOL * backend);
64 #ifdef DEBUG
65 static void dump_cache_data(const char *data, size_t len);
66 #endif
67 static int pool_commit_cache(POOL_CONNECTION_POOL * backend, char *query, char *data, size_t datalen, int num_oids, int *oids);
68 static int pool_fetch_cache(POOL_CONNECTION_POOL * backend, const char *query, char **buf, size_t *len);
69 static int send_cached_messages(POOL_CONNECTION * frontend, const char *qcache, int qcachelen);
70 static void send_message(POOL_CONNECTION * conn, char kind, int len, const char *data);
71 #ifdef USE_MEMCACHED
72 static int delete_cache_on_memcached(const char *key);
73 #endif
74 static int pool_get_dml_table_oid(int **oid);
75 static int pool_get_dropdb_table_oids(int **oids, int dboid);
76 static void pool_discard_dml_table_oid(void);
77 static void pool_invalidate_query_cache(int num_table_oids, int *table_oid, bool unlink, int dboid);
78 static int pool_get_database_oid(void);
79 static void pool_add_table_oid_map(POOL_CACHEKEY * cachkey, int num_table_oids, int *table_oids);
80 static void pool_reset_memqcache_buffer(bool reset_dml_oids);
81 static POOL_CACHEID * pool_add_item_shmem_cache(POOL_QUERY_HASH * query_hash, char *data, int size);
82 static POOL_CACHEID * pool_find_item_on_shmem_cache(POOL_QUERY_HASH * query_hash);
83 static char *pool_get_item_shmem_cache(POOL_QUERY_HASH * query_hash, int *size, int *sts);
84 static POOL_QUERY_CACHE_ARRAY * pool_add_query_cache_array(POOL_QUERY_CACHE_ARRAY * cache_array, POOL_TEMP_QUERY_CACHE * cache);
85 static void pool_add_temp_query_cache(POOL_TEMP_QUERY_CACHE * temp_cache, char kind, char *data, int data_len);
86 static void pool_add_oids_temp_query_cache(POOL_TEMP_QUERY_CACHE * temp_cache, int num_oids, int *oids);
87 static POOL_INTERNAL_BUFFER * pool_create_buffer(void);
88 static void pool_discard_buffer(POOL_INTERNAL_BUFFER * buffer);
89 static void pool_add_buffer(POOL_INTERNAL_BUFFER * buffer, void *data, size_t len);
90 static void *pool_get_buffer(POOL_INTERNAL_BUFFER * buffer, size_t *len);
91 #ifdef NOT_USED
92 static char *pool_get_buffer_pointer(POOL_INTERNAL_BUFFER * buffer);
93 #endif
94 static char *pool_get_current_cache_buffer(size_t *len);
95 static size_t pool_get_buffer_length(POOL_INTERNAL_BUFFER * buffer);
96 static void pool_check_and_discard_cache_buffer(int num_oids, int *oids);
97
98 static void pool_set_memqcache_blocks(int num_blocks);
99 static int pool_get_memqcache_blocks(void);
100 static void *pool_memory_cache_address(void);
101 static void pool_reset_fsmm(size_t size);
102 static void *pool_fsmm_address(void);
103 static void pool_update_fsmm(POOL_CACHE_BLOCKID blockid, size_t free_space);
104 static POOL_CACHE_BLOCKID pool_get_block(size_t free_space);
105 static POOL_CACHE_ITEM_HEADER * pool_cache_item_header(POOL_CACHEID * cacheid);
106 static int pool_init_cache_block(POOL_CACHE_BLOCKID blockid);
107 #if NOT_USED
108 static void pool_wipe_out_cache_block(POOL_CACHE_BLOCKID blockid);
109 #endif
110 static int pool_delete_item_shmem_cache(POOL_CACHEID * cacheid);
111 static char *block_address(int blockid);
112 static POOL_CACHE_ITEM_POINTER * item_pointer(char *block, int i);
113 static POOL_CACHE_ITEM_HEADER * item_header(char *block, int i);
114 static POOL_CACHE_BLOCKID pool_reuse_block(void);
115 #ifdef SHMEMCACHE_DEBUG
116 static void dump_shmem_cache(POOL_CACHE_BLOCKID blockid);
117 #endif
118
119 static int pool_hash_reset(int nelements);
120 static int pool_hash_insert(POOL_QUERY_HASH * key, POOL_CACHEID * cacheid, bool update);
121 static uint32 create_hash_key(POOL_QUERY_HASH * key);
122 static volatile POOL_HASH_ELEMENT *get_new_hash_element(void);
123 static void put_back_hash_element(volatile POOL_HASH_ELEMENT * element);
124 static bool is_free_hash_element(void);
125 static void inject_cached_message(POOL_CONNECTION * backend, char *qcache, int qcachelen);
126
127 /*
128 * Connect to Memcached
129 */
130 int
memcached_connect(void)131 memcached_connect(void)
132 {
133 char *memqcache_memcached_host;
134 int memqcache_memcached_port;
135 #ifdef USE_MEMCACHED
136 memcached_server_st *servers;
137 memcached_return rc;
138
139 /* Already connected? */
140 if (memc)
141 {
142 return 0;
143 }
144 #endif
145
146 memqcache_memcached_host = pool_config->memqcache_memcached_host;
147 memqcache_memcached_port = pool_config->memqcache_memcached_port;
148
149 ereport(DEBUG1,
150 (errmsg("connecting to memcached on Host:\"%s:%d\"", memqcache_memcached_host, memqcache_memcached_port)));
151
152 #ifdef USE_MEMCACHED
153 memc = memcached_create(NULL);
154 servers = memcached_server_list_append(NULL,
155 memqcache_memcached_host,
156 memqcache_memcached_port,
157 &rc);
158
159 rc = memcached_server_push(memc, servers);
160 if (rc != MEMCACHED_SUCCESS)
161 {
162 ereport(WARNING,
163 (errmsg("failed to connect to memcached, server push error:\"%s\"\n", memcached_strerror(memc, rc))));
164 memc = (memcached_st *) - 1;
165 return -1;
166 }
167 memcached_server_list_free(servers);
168 #else
169 ereport(WARNING,
170 (errmsg("failed to connect to memcached, memcached support is not enabled")));
171 return -1;
172 #endif
173 return 0;
174 }
175
176 /*
177 * Disconnect to Memcached
178 */
179 void
memcached_disconnect(void)180 memcached_disconnect(void)
181 {
182 #ifdef USE_MEMCACHED
183 if (!memc)
184 {
185 return;
186 }
187 memcached_free(memc);
188 #else
189 ereport(WARNING,
190 (errmsg("failed to disconnect from memcached, memcached support is not enabled")));
191 #endif
192 }
193
194 /*
195 * Register buffer data for query cache in memory cache
196 */
197 void
memqcache_register(char kind,POOL_CONNECTION * frontend,char * data,int data_len)198 memqcache_register(char kind,
199 POOL_CONNECTION * frontend,
200 char *data,
201 int data_len)
202 {
203 POOL_TEMP_QUERY_CACHE *cache;
204 POOL_SESSION_CONTEXT *session_context;
205 POOL_QUERY_CONTEXT *query_context;
206
207 cache = pool_get_current_cache();
208
209 if (cache == NULL)
210 {
211 session_context = pool_get_session_context(true);
212
213 if (session_context && pool_is_query_in_progress())
214 {
215 char *query = pool_get_query_string();
216
217 query_context = session_context->query_context;
218
219 if (query)
220 query_context->temp_cache = pool_create_temp_query_cache(query);
221 }
222 }
223
224 pool_add_temp_query_cache(cache, kind, data, data_len);
225 }
226
227 /*
228 * Commit SELECT results to cache storage.
229 */
230 static int
pool_commit_cache(POOL_CONNECTION_POOL * backend,char * query,char * data,size_t datalen,int num_oids,int * oids)231 pool_commit_cache(POOL_CONNECTION_POOL * backend, char *query, char *data, size_t datalen, int num_oids, int *oids)
232 {
233 #ifdef USE_MEMCACHED
234 memcached_return rc;
235 #endif
236 POOL_CACHEKEY cachekey;
237 char tmpkey[MAX_KEY];
238 time_t memqcache_expire;
239
240 /*
241 * get_buflen() will return -1 if query result exceeds memqcache_maxcache
242 */
243 if (datalen == -1)
244 {
245 return -1;
246 }
247
248 /* query disabled */
249 if (strlen(query) <= 0)
250 {
251 return -1;
252 }
253 ereport(DEBUG1,
254 (errmsg("commiting SELECT results to cache storage"),
255 errdetail("Query=\"%s\"", query)));
256
257 #ifdef DEBUG
258 dump_cache_data(data, datalen);
259 #endif
260
261 /* encode md5key for memcached */
262 encode_key(query, tmpkey, backend);
263 ereport(DEBUG2,
264 (errmsg("commiting SELECT results to cache storage"),
265 errdetail("search key : \"%s\"", tmpkey)));
266
267 memcpy(cachekey.hashkey, tmpkey, 32);
268
269 memqcache_expire = pool_config->memqcache_expire;
270 ereport(DEBUG1,
271 (errmsg("commiting SELECT results to cache storage"),
272 errdetail("memqcache_expire = %ld", memqcache_expire)));
273
274 if (pool_is_shmem_cache())
275 {
276 POOL_CACHEID *cacheid;
277 POOL_QUERY_HASH query_hash;
278
279 memcpy(query_hash.query_hash, tmpkey, sizeof(query_hash.query_hash));
280
281 cacheid = pool_hash_search(&query_hash);
282
283 if (cacheid != NULL)
284 {
285 ereport(DEBUG1,
286 (errmsg("commiting SELECT results to cache storage"),
287 errdetail("item already exists")));
288
289 return 0;
290 }
291 else
292 {
293 cacheid = pool_add_item_shmem_cache(&query_hash, data, datalen);
294 if (cacheid == NULL)
295 {
296 ereport(LOG,
297 (errmsg("failed to add item to shmem cache")));
298 return -1;
299 }
300 else
301 {
302 ereport(DEBUG2,
303 (errmsg("commiting SELECT results to cache storage"),
304 errdetail("blockid: %d itemid: %d",
305 cacheid->blockid, cacheid->itemid)));
306 }
307 cachekey.cacheid.blockid = cacheid->blockid;
308 cachekey.cacheid.itemid = cacheid->itemid;
309 }
310 }
311
312 #ifdef USE_MEMCACHED
313 else
314 {
315 rc = memcached_set(memc, tmpkey, 32,
316 data, datalen, (time_t) memqcache_expire, 0);
317 if (rc != MEMCACHED_SUCCESS)
318 {
319 ereport(WARNING,
320 (errmsg("cache commit failed with error:\"%s\"", memcached_strerror(memc, rc))));
321 return -1;
322 }
323 ereport(DEBUG1,
324 (errmsg("commiting SELECT results to cache storage"),
325 errdetail("set cache succeeded")));
326 }
327 #endif
328
329 /*
330 * Register cache id to oid map
331 */
332 pool_add_table_oid_map(&cachekey, num_oids, oids);
333
334 return 0;
335 }
336
337 /*
338 * Fetch from memory cache.
339 * Return:
340 * 0: fetch success,
341 * 1: not found
342 */
343 static int
pool_fetch_cache(POOL_CONNECTION_POOL * backend,const char * query,char ** buf,size_t * len)344 pool_fetch_cache(POOL_CONNECTION_POOL * backend, const char *query, char **buf, size_t *len)
345 {
346 char *ptr;
347 char tmpkey[MAX_KEY];
348 int sts;
349 char *p;
350
351 if (strlen(query) <= 0)
352 ereport(ERROR,
353 (errmsg("fetching from cache storage, no query")));
354
355 /* encode md5key for memcached */
356 encode_key(query, tmpkey, backend);
357 ereport(DEBUG1,
358 (errmsg("fetching from cache storage"),
359 errdetail("search key \"%s\"", tmpkey)));
360
361
362 if (pool_is_shmem_cache())
363 {
364 POOL_QUERY_HASH query_hash;
365 int mylen;
366
367 memcpy(query_hash.query_hash, tmpkey, sizeof(query_hash.query_hash));
368
369 ptr = pool_get_item_shmem_cache(&query_hash, &mylen, &sts);
370 if (ptr == NULL)
371 {
372 ereport(DEBUG1,
373 (errmsg("fetching from cache storage"),
374 errdetail("cache not found on shared memory")));
375
376 return 1;
377 }
378 *len = mylen;
379 }
380 #ifdef USE_MEMCACHED
381 else
382 {
383 memcached_return rc;
384 unsigned int flags;
385
386 ptr = memcached_get(memc, tmpkey, strlen(tmpkey), len, &flags, &rc);
387
388 if (rc != MEMCACHED_SUCCESS)
389 {
390 if (rc != MEMCACHED_NOTFOUND)
391 {
392 ereport(LOG,
393 (errmsg("fetching from cache storage, memcached_get failed with error: \"%s\"", memcached_strerror(memc, rc))));
394
395 /*
396 * Turn off memory cache support to prevent future errors.
397 */
398 pool_config->memory_cache_enabled = 0;
399 /* Behave as if cache not found */
400 return 1;
401 }
402 else
403 {
404 /* Not found */
405 ereport(DEBUG1,
406 (errmsg("fetching from cache storage"),
407 errdetail("cache item not found for key: \"%s\" and query:\"%s\"", tmpkey, query)));
408 return 1;
409 }
410 }
411 }
412 #else
413 else
414 {
415 ereport(ERROR,
416 (errmsg("memcached support is not enabled")));
417 }
418 #endif
419
420 p = palloc(*len);
421
422 memcpy(p, ptr, *len);
423
424 if (!pool_is_shmem_cache())
425 {
426 free(ptr);
427 }
428
429 ereport(DEBUG1,
430 (errmsg("fetching from cache storage"),
431 errdetail("query=\"%s\" len:%zd", query, *len)));
432 #ifdef DEBUG
433 dump_cache_data(p, *len);
434 #endif
435
436 *buf = p;
437
438 return 0;
439 }
440
441 /*
442 * encode key.
443 * create cache key as md5(username + query string + database name)
444 */
445 static char *
encode_key(const char * s,char * buf,POOL_CONNECTION_POOL * backend)446 encode_key(const char *s, char *buf, POOL_CONNECTION_POOL * backend)
447 {
448 char *strkey;
449 int u_length;
450 int d_length;
451 int q_length;
452 int length;
453
454 u_length = strlen(backend->info->user);
455 ereport(DEBUG1,
456 (errmsg("memcache encode key"),
457 errdetail("username: \"%s\" database_name: \"%s\"", backend->info->user, backend->info->database)));
458
459 d_length = strlen(backend->info->database);
460
461 q_length = strlen(s);
462 ereport(DEBUG1,
463 (errmsg("memcache encode key"),
464 errdetail("query: \"%s\"", s)));
465
466 length = u_length + d_length + q_length + 1;
467
468 strkey = (char *) palloc(sizeof(char) * length);
469
470 snprintf(strkey, length, "%s%s%s", backend->info->user, s, backend->info->database);
471
472 pool_md5_hash(strkey, strlen(strkey), buf);
473 ereport(DEBUG1,
474 (errmsg("memcache encode key"),
475 errdetail("`%s' -> `%s'", strkey, buf)));
476 pfree(strkey);
477 return buf;
478 }
479
480 #ifdef DEBUG
481 /*
482 * dump cache data
483 */
484 static void
dump_cache_data(const char * data,size_t len)485 dump_cache_data(const char *data, size_t len)
486 {
487 int i;
488 int plen;
489
490
491 fprintf(stderr, "shmem: len = %zd\n", len);
492
493 while (len > 0)
494 {
495 fprintf(stderr, "shmem: kind:%c\n", *data++);
496 len--;
497 memmove(&plen, data, 4);
498 len -= 4;
499 data += 4;
500 plen = ntohl(plen);
501 fprintf(stderr, "shmem: len:%d\n", plen);
502 plen -= 4;
503
504 fprintf(stderr, "shmem: ");
505 for (i = 0; i < plen; i++)
506 {
507 fprintf(stderr, "%02x ", (unsigned char) (*data++));
508 len--;
509 }
510 fprintf(stderr, "\n");
511 }
512 }
513 #endif
514
515 /*
516 * send cached messages
517 */
518 static int
send_cached_messages(POOL_CONNECTION * frontend,const char * qcache,int qcachelen)519 send_cached_messages(POOL_CONNECTION * frontend, const char *qcache, int qcachelen)
520 {
521 int msg = 0;
522 int i = 0;
523 int is_prepared_stmt = 0;
524 int len;
525 const char *p;
526
527 while (i < qcachelen)
528 {
529 char tmpkind;
530 int tmplen;
531
532 tmpkind = qcache[i];
533 i++;
534
535 memcpy(&tmplen, qcache + i, sizeof(tmplen));
536 i += sizeof(tmplen);
537 len = ntohl(tmplen);
538 p = qcache + i;
539 i += len - sizeof(tmplen);
540
541 /* No need to cache PARSE and BIND responses */
542 if (tmpkind == '1' || tmpkind == '2')
543 {
544 is_prepared_stmt = 1;
545 continue;
546 }
547
548 /*
549 * In the prepared statement execution, there is no need to send 'T'
550 * response to the frontend.
551 */
552 if (is_prepared_stmt && tmpkind == 'T')
553 {
554 continue;
555 }
556
557 /* send message to frontend */
558 ereport(DEBUG1,
559 (errmsg("memcache: sending cached messages: '%c' len: %d", tmpkind, len)));
560 send_message(frontend, tmpkind, len, p);
561
562 msg++;
563 }
564
565 return msg;
566 }
567
568 /*
569 * send message to frontend
570 */
571 static void
send_message(POOL_CONNECTION * conn,char kind,int len,const char * data)572 send_message(POOL_CONNECTION * conn, char kind, int len, const char *data)
573 {
574 ereport(DEBUG2,
575 (errmsg("memcache: sending messages: kind '%c', len=%d, data=%p", kind, len, data)));
576
577 pool_write(conn, &kind, 1);
578
579 len = htonl(len);
580 pool_write(conn, &len, sizeof(len));
581
582 len = ntohl(len);
583 pool_write(conn, (void *) data, len - sizeof(len));
584 }
585
586 #ifdef USE_MEMCACHED
587 /*
588 * delete query cache on memcached
589 */
590 static int
delete_cache_on_memcached(const char * key)591 delete_cache_on_memcached(const char *key)
592 {
593
594 memcached_return rc;
595
596 ereport(DEBUG2,
597 (errmsg("memcache: deleteing cache on memcached with key: \"%s\"", key)));
598
599
600 /* delete cache data on memcached. key is md5 hash query */
601 rc = memcached_delete(memc, key, 32, (time_t) 0);
602
603 /* delete cache data on memcached is failed */
604 if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_BUFFERED)
605 {
606 ereport(LOG,
607 (errmsg("failed to delete cache on memcached, error:\"%s\"", memcached_strerror(memc, rc))));
608 return 0;
609 }
610 return 1;
611
612 }
613 #endif
614
615 /*
616 * Fetch SELECT data from cache if possible.
617 */
618 POOL_STATUS
pool_fetch_from_memory_cache(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend,char * contents,bool * foundp)619 pool_fetch_from_memory_cache(POOL_CONNECTION * frontend,
620 POOL_CONNECTION_POOL * backend,
621 char *contents, bool *foundp)
622 {
623 char *qcache;
624 size_t qcachelen;
625 int sts;
626 pool_sigset_t oldmask;
627
628 ereport(DEBUG1,
629 (errmsg("pool_fetch_from_memory_cache called")));
630
631 *foundp = false;
632
633 POOL_SETMASK2(&BlockSig, &oldmask);
634 pool_shmem_lock();
635
636 PG_TRY();
637 {
638 sts = pool_fetch_cache(backend, contents, &qcache, &qcachelen);
639 }
640 PG_CATCH();
641 {
642 pool_shmem_unlock();
643 POOL_SETMASK(&oldmask);
644 PG_RE_THROW();
645 }
646 PG_END_TRY();
647
648 pool_shmem_unlock();
649 POOL_SETMASK(&oldmask);
650
651 if (sts != 0)
652 /* Cache not found */
653 return POOL_CONTINUE;
654
655 /*
656 * Cache found. If we are doing extended query and in streaming
657 * replication mode, we need to retrieve any responses from backend and
658 * forward them to frontend.
659 */
660 if (pool_is_doing_extended_query_message() && SL_MODE)
661 {
662 POOL_SESSION_CONTEXT *session_context;
663 POOL_CONNECTION *target_backend;
664
665 ereport(DEBUG1,
666 (errmsg("memcache: injecting cache data")));
667
668 session_context = pool_get_session_context(true);
669 target_backend = CONNECTION(backend, session_context->load_balance_node_id);
670 inject_cached_message(target_backend, qcache, qcachelen);
671 }
672 else
673 {
674 /*
675 * Send each messages to frontend
676 */
677 send_cached_messages(frontend, qcache, qcachelen);
678 }
679
680 pfree(qcache);
681
682 /*
683 * Send a "READY FOR QUERY" if not in extended query.
684 */
685 if (!pool_is_doing_extended_query_message() && MAJOR(backend) == PROTO_MAJOR_V3)
686 {
687 signed char state;
688
689 /*
690 * We keep previous transaction state.
691 */
692 state = MASTER(backend)->tstate;
693 send_message(frontend, 'Z', 5, (char *) &state);
694 }
695
696 if (!pool_is_doing_extended_query_message() || !SL_MODE)
697 {
698 if (pool_flush(frontend))
699 {
700 return POOL_END;
701 }
702 }
703
704 *foundp = true;
705
706 if (pool_config->log_per_node_statement)
707 ereport(LOG,
708 (errmsg("fetch from memory cache"),
709 errdetail("query result fetched from cache. statement: %s", contents)));
710
711 ereport(DEBUG1,
712 (errmsg("fetch from memory cache"),
713 errdetail("query result found in the query cache, %s", contents)));
714
715 return POOL_CONTINUE;
716 }
717
718 /*
719 * Simple and rough (thus unreliable) check if the query is likely
720 * SELECT. Just check if the query starts with SELECT or WITH. This
721 * can be used before parse tree is available.
722 */
723 bool
pool_is_likely_select(char * query)724 pool_is_likely_select(char *query)
725 {
726 bool do_continue = false;
727
728 if (query == NULL)
729 return false;
730
731 if (pool_config->ignore_leading_white_space)
732 {
733 /* Ignore leading white spaces */
734 while (*query && isspace(*query))
735 query++;
736 }
737 if (!*query)
738 {
739 return false;
740 }
741
742 /*
743 * Get rid of head comment. It is sure that the query is in correct
744 * format, because the parser has rejected bad queries such as the one
745 * with not-ended comment.
746 */
747 while (*query)
748 {
749 /* Ignore spaces and return marks */
750 do_continue = false;
751 while (*query && isspace(*query))
752 {
753 query++;
754 do_continue = true;
755 }
756 if (do_continue)
757 {
758 continue;
759 }
760
761 while (*query && !strncmp(query, "\n", 2))
762 {
763 query++;
764 do_continue = true;
765 }
766 if (do_continue)
767 {
768 query += 2;
769 continue;
770 }
771
772 /* Ignore comments like C */
773 if (!strncmp(query, "/*", 2))
774 {
775 while (*query && strncmp(query, "*/", 2))
776 query++;
777
778 query += 2;
779 continue;
780 }
781
782 /* Ignore SQL comments */
783 if (!strncmp(query, "--", 2))
784 {
785 while (*query && strncmp(query, "\n", 2))
786 query++;
787
788 query += 2;
789 continue;
790 }
791
792 if (!strncasecmp(query, "SELECT", 6) || !strncasecmp(query, "WITH", 4))
793 {
794 return true;
795 }
796
797 query++;
798 }
799
800 return false;
801 }
802
803 /*
804 * Return true if SELECT can be cached. "node" is the parse tree for
805 * the query and "query" is the query string.
806 * The query must be SELECT or WITH.
807 */
808 bool
pool_is_allow_to_cache(Node * node,char * query)809 pool_is_allow_to_cache(Node *node, char *query)
810 {
811 int i = 0;
812 int num_oids = -1;
813 SelectContext ctx;
814
815 /*
816 * If NO QUERY CACHE comment exists, do not cache.
817 */
818 if (!strncasecmp(query, NO_QUERY_CACHE, NO_QUERY_CACHE_COMMENT_SZ))
819 return false;
820
821 /*
822 * Check black table list first.
823 */
824 if (pool_config->num_black_memqcache_table_list > 0)
825 {
826 /*
827 * Extract oids in from clause of SELECT, and check if SELECT to them
828 * could be cached.
829 */
830 num_oids = pool_extract_table_oids_from_select_stmt(node, &ctx);
831 if (num_oids > 0)
832 {
833 for (i = 0; i < num_oids; i++)
834 {
835 ereport(DEBUG1,
836 (errmsg("memcache: checking if node is allowed to cache: check table_names[%d] = \"%s\"", i, ctx.table_names[i])));
837 if (pool_is_table_in_black_list(ctx.table_names[i]) == true)
838 {
839 ereport(DEBUG1,
840 (errmsg("memcache: node is not allowed to cache")));
841 return false;
842 }
843 }
844 }
845 }
846
847 /* SELECT INTO or SELECT FOR SHARE or UPDATE cannot be cached */
848 if (pool_has_insertinto_or_locking_clause(node))
849 return false;
850
851 /*
852 * If SELECT uses non immutable functions, it's not allowed to cache.
853 */
854 if (pool_has_non_immutable_function_call(node))
855 return false;
856
857 /*
858 * If SELECT uses temporary tables it's not allowed to cache.
859 */
860 if (pool_config->check_temp_table && pool_has_temp_table(node))
861 return false;
862
863 /*
864 * If SELECT uses system catalogs, it's not allowed to cache.
865 */
866 if (pool_has_system_catalog(node))
867 return false;
868
869 /*
870 * TABLESAMPLE is not allowed to cache.
871 */
872 if (IsA(node, SelectStmt) &&((SelectStmt *) node)->fromClause)
873 {
874 List *tbl_list = ((SelectStmt *) node)->fromClause;
875 ListCell *tbl;
876
877 foreach(tbl, tbl_list)
878 {
879 if (IsA(lfirst(tbl), RangeTableSample))
880 return false;
881 }
882 }
883
884
885 /*
886 * If the table is in the while list, allow to cache even if it is VIEW or
887 * unlogged table.
888 */
889 if (pool_config->num_white_memqcache_table_list > 0)
890 {
891 if (num_oids < 0)
892 num_oids = pool_extract_table_oids_from_select_stmt(node, &ctx);
893
894 if (num_oids > 0)
895 {
896 for (i = 0; i < num_oids; i++)
897 {
898 char *table = ctx.table_names[i];
899
900 ereport(DEBUG1,
901 (errmsg("memcache: checking if node is allowed to cache: check table_names[%d] = \"%s\"", i, table)));
902 if (is_view(table) || is_unlogged_table(table))
903 {
904 if (pool_is_table_in_white_list(table) == false)
905 {
906 ereport(DEBUG1,
907 (errmsg("memcache: node is not allowed to cache")));
908 return false;
909 }
910 }
911 }
912 }
913 }
914 else
915 {
916 /*
917 * If SELECT uses views, it's not allowed to cache.
918 */
919 if (pool_has_view(node))
920 return false;
921
922 /*
923 * If SELECT uses unlogged tables, it's not allowed to cache.
924 */
925 if (pool_has_unlogged_table(node))
926 return false;
927 }
928
929 /*
930 * If Data-modifying statements in WITH clause, it's not allowed to cache.
931 */
932 if(IsA(node, SelectStmt) && ((SelectStmt *) node)->withClause)
933 {
934 ListCell *lc;
935 WithClause *withClause = ((SelectStmt *) node)->withClause;
936
937 foreach(lc, withClause->ctes)
938 {
939 CommonTableExpr *cte = (CommonTableExpr *)lfirst(lc);
940 if(IsA(cte->ctequery, InsertStmt) ||
941 IsA(cte->ctequery, DeleteStmt) ||
942 IsA(cte->ctequery, UpdateStmt))
943 {
944 return false;
945 }
946 }
947 }
948
949 return true;
950 }
951
952
953 /*
954 * Return true If the SELECTed table is in back list.
955 */
956 bool
pool_is_table_in_black_list(const char * table_name)957 pool_is_table_in_black_list(const char *table_name)
958 {
959
960 if (pool_config->num_black_memqcache_table_list > 0 &&
961 pattern_compare((char *) table_name, BLACKLIST, "black_memqcache_table_list") == 1)
962 {
963 return true;
964 }
965
966 return false;
967 }
968
969 /*
970 * Return true If the SELECTed table is in white list.
971 */
972 bool
pool_is_table_in_white_list(const char * table_name)973 pool_is_table_in_white_list(const char *table_name)
974 {
975 if (pool_config->num_white_memqcache_table_list > 0 &&
976 pattern_compare((char *) table_name, WHITELIST, "white_memqcache_table_list") == 1)
977 {
978 return true;
979 }
980
981 return false;
982 }
983
984 /*
985 * Extract table oid from INSERT/UPDATE/DELETE/TRUNCATE/
986 * DROP TABLE/ALTER TABLE/COPY FROM statement.
987 * For SELECT, if Data-modifying statements in its WITH clause,
988 * extract table oid from Data-modifying statements too.
989 * Returns number of oids.
990 * In case of error, returns 0 (InvalidOid).
991 * oids buffer (oidsp) will be discarded by subsequent call.
992 */
993 int
pool_extract_table_oids(Node * node,int ** oidsp)994 pool_extract_table_oids(Node *node, int **oidsp)
995 {
996 #define POOL_MAX_DML_OIDS 128
997 char *table;
998 static int oids[POOL_MAX_DML_OIDS];
999 int num_oids;
1000 int oid;
1001
1002 if (node == NULL)
1003 {
1004 ereport(LOG,
1005 (errmsg("memcache: error while extracting table oids. statement is NULL")));
1006 return 0;
1007 }
1008
1009 num_oids = 0;
1010 *oidsp = oids;
1011
1012 if (IsA(node, InsertStmt))
1013 {
1014 InsertStmt *stmt = (InsertStmt *) node;
1015
1016 num_oids = pool_extract_withclause_oids((Node *) stmt->withClause, *oidsp);
1017 table = make_table_name_from_rangevar(stmt->relation);
1018 }
1019 else if (IsA(node, UpdateStmt))
1020 {
1021 UpdateStmt *stmt = (UpdateStmt *) node;
1022
1023 num_oids = pool_extract_withclause_oids((Node *) stmt->withClause, *oidsp);
1024 table = make_table_name_from_rangevar(stmt->relation);
1025 }
1026 else if (IsA(node, DeleteStmt))
1027 {
1028 DeleteStmt *stmt = (DeleteStmt *) node;
1029
1030 num_oids = pool_extract_withclause_oids((Node *) stmt->withClause, *oidsp);
1031 table = make_table_name_from_rangevar(stmt->relation);
1032 }
1033 else if(IsA(node, SelectStmt))
1034 {
1035 SelectStmt *stmt = (SelectStmt *) node;
1036 num_oids = pool_extract_withclause_oids((Node *) stmt->withClause, *oidsp);
1037 table = NULL;
1038 }
1039 #ifdef NOT_USED
1040
1041 /*
1042 * We do not handle CREATE TABLE here. It is possible that
1043 * pool_extract_table_oids() is called before CREATE TABLE gets executed.
1044 */
1045 else if (IsA(node, CreateStmt))
1046 {
1047 CreateStmt *stmt = (CreateStmt *) node;
1048
1049 table = make_table_name_from_rangevar(stmt->relation);
1050 }
1051 #endif
1052
1053 else if (IsA(node, AlterTableStmt))
1054 {
1055 AlterTableStmt *stmt = (AlterTableStmt *) node;
1056
1057 table = make_table_name_from_rangevar(stmt->relation);
1058 }
1059
1060 else if (IsA(node, CopyStmt))
1061 {
1062 CopyStmt *stmt = (CopyStmt *) node;
1063
1064 if (stmt->is_from) /* COPY FROM? */
1065 {
1066 table = make_table_name_from_rangevar(stmt->relation);
1067 }
1068 else
1069 {
1070 return 0;
1071 }
1072 }
1073
1074 else if (IsA(node, DropStmt))
1075 {
1076 ListCell *cell;
1077
1078 DropStmt *stmt = (DropStmt *) node;
1079
1080 if (stmt->removeType != OBJECT_TABLE)
1081 {
1082 return 0;
1083 }
1084
1085 /*
1086 * Here, stmt->objects is list of target relation info. The first
1087 * cell of target relation info is a list (possibly) consists of
1088 * database, schema and relation. We need to call
1089 * makeRangeVarFromNameList() before passing to
1090 * make_table_name_from_rangevar. Otherwise we get weird excessively
1091 * decorated relation name (''table_name'').
1092 */
1093 foreach(cell, stmt->objects)
1094 {
1095 if (num_oids >= POOL_MAX_DML_OIDS)
1096 {
1097 ereport(LOG,
1098 (errmsg("memcache: error while extracting table oids. too many oids:%d", num_oids)));
1099 return 0;
1100 }
1101
1102 table = make_table_name_from_rangevar(makeRangeVarFromNameList(lfirst(cell)));
1103 oid = pool_table_name_to_oid(table);
1104 if (oid > 0)
1105 {
1106 oids[num_oids++] = pool_table_name_to_oid(table);
1107 ereport(DEBUG1,
1108 (errmsg("memcache: extracting table oids: table: \"%s\" oid:%d", table, oids[num_oids - 1])));
1109 }
1110 }
1111 return num_oids;
1112 }
1113 else if (IsA(node, TruncateStmt))
1114 {
1115 ListCell *cell;
1116
1117 TruncateStmt *stmt = (TruncateStmt *) node;
1118
1119 foreach(cell, stmt->relations)
1120 {
1121 if (num_oids >= POOL_MAX_DML_OIDS)
1122 {
1123 ereport(LOG,
1124 (errmsg("memcache: error while extracting table oids. too many oids:%d", num_oids)));
1125 return 0;
1126 }
1127
1128 table = make_table_name_from_rangevar(lfirst(cell));
1129 oid = pool_table_name_to_oid(table);
1130 if (oid > 0)
1131 {
1132 oids[num_oids++] = pool_table_name_to_oid(table);
1133 ereport(DEBUG1,
1134 (errmsg("memcache: extracting table oids: table: \"%s\" oid:%d", table, oids[num_oids - 1])));
1135 }
1136 }
1137 return num_oids;
1138 }
1139 else if (IsA(node, ExplainStmt))
1140 {
1141 ListCell *cell;
1142 DefElem *def;
1143 ExplainStmt *stmt = (ExplainStmt *) node;
1144
1145 foreach(cell, stmt->options)
1146 {
1147 def = lfirst(cell);
1148 if (strncmp("analyze", def->defname, 7) == 0)
1149 {
1150 return pool_extract_table_oids(stmt->query, oidsp);
1151 }
1152 }
1153
1154 table = NULL;
1155 }
1156 else
1157 {
1158 ereport(DEBUG1,
1159 (errmsg("memcache: extracting table oids: statment is different from INSERT/UPDATE/DELETE/TRUNCATE/DROP TABLE/ALTER TABLE")));
1160 return 0;
1161 }
1162
1163 oid = pool_table_name_to_oid(table);
1164 if (oid > 0)
1165 {
1166 if (num_oids >= POOL_MAX_DML_OIDS)
1167 {
1168 ereport(LOG,
1169 (errmsg("memcache: error while extracting table oids. too many oids:%d", num_oids)));
1170 return 0;
1171 }
1172
1173 oids[num_oids++] = pool_table_name_to_oid(table);
1174 ereport(DEBUG1,
1175 (errmsg("memcache: extracting table oids: table: \"%s\" oid:%d", table, oid)));
1176 }
1177 return num_oids;
1178 }
1179
1180 /*
1181 * Extract table oid from INSERT/UPDATE/DELETE
1182 * FROM statement in WITH clause.
1183 * Returns number of oids.
1184 * oids buffer (oidsp) will be discarded by subsequent call.
1185 */
1186 int
pool_extract_withclause_oids(Node * node,int * oidsp)1187 pool_extract_withclause_oids(Node *node, int *oidsp)
1188 {
1189 int num_oids = 0;
1190 int oid;
1191 char *table;
1192 ListCell *lc;
1193 WithClause *with;
1194
1195 if(oidsp == NULL)
1196 {
1197 return 0;
1198 }
1199
1200 if(!node || !IsA(node, WithClause))
1201 {
1202 return 0;
1203 }
1204
1205 with = (WithClause *) node;
1206 foreach(lc, with->ctes)
1207 {
1208 CommonTableExpr *cte = (CommonTableExpr *)lfirst(lc);
1209 if(IsA(cte->ctequery, InsertStmt))
1210 {
1211 InsertStmt *stmt = (InsertStmt *) cte->ctequery;
1212 table = make_table_name_from_rangevar(stmt->relation);
1213 }
1214 else if(IsA(cte->ctequery, DeleteStmt))
1215 {
1216 DeleteStmt *stmt = (DeleteStmt *) cte->ctequery;
1217 table = make_table_name_from_rangevar(stmt->relation);
1218 }
1219 else if(IsA(cte->ctequery, UpdateStmt))
1220 {
1221 UpdateStmt *stmt = (UpdateStmt *) cte->ctequery;
1222 table = make_table_name_from_rangevar(stmt->relation);
1223 }
1224 else
1225 {
1226 /* only check INSERT/DELETE/UPDATE in WITH clause */
1227 table = NULL;
1228 }
1229
1230 oid = pool_table_name_to_oid(table);
1231 if (oid > 0)
1232 {
1233 if (num_oids >= POOL_MAX_DML_OIDS)
1234 {
1235 break;
1236 }
1237
1238 oidsp[num_oids++] = pool_table_name_to_oid(table);
1239 ereport(DEBUG1,
1240 (errmsg("memcache: extracting table oids: table: \"%s\" oid:%d", table, oidsp[num_oids - 1])));
1241 }
1242 }
1243
1244 return num_oids;
1245 }
1246
1247 #define POOL_OIDBUF_SIZE 1024
1248 static int *oidbuf;
1249 static int oidbufp;
1250 static int oidbuf_size;
1251
1252 /*
1253 * Add table oid to internal buffer
1254 */
1255 void
pool_add_dml_table_oid(int oid)1256 pool_add_dml_table_oid(int oid)
1257 {
1258 int i;
1259 int *tmp;
1260
1261 if (oid == 0)
1262 return;
1263
1264 if (oidbufp >= oidbuf_size)
1265 {
1266 MemoryContext oldcxt;
1267
1268 oidbuf_size += POOL_OIDBUF_SIZE;
1269
1270 /*
1271 * This need to live throughout the life of child so home it in
1272 * TopMemoryContext
1273 */
1274 oldcxt = MemoryContextSwitchTo(TopMemoryContext);
1275 tmp = repalloc(oidbuf, sizeof(int) * oidbuf_size);
1276 MemoryContextSwitchTo(oldcxt);
1277 if (tmp == NULL)
1278 return;
1279
1280 oidbuf = tmp;
1281 }
1282
1283 for (i = 0; i < oidbufp; i++)
1284 {
1285 if (oidbuf[i] == oid)
1286 /* Already same oid exists */
1287 return;
1288 }
1289 oidbuf[oidbufp++] = oid;
1290 }
1291
1292
1293 /*
1294 * Get table oid buffer
1295 */
1296 static int
pool_get_dml_table_oid(int ** oid)1297 pool_get_dml_table_oid(int **oid)
1298 {
1299 *oid = oidbuf;
1300 return oidbufp;
1301 }
1302
1303 static int
pool_get_dropdb_table_oids(int ** oids,int dboid)1304 pool_get_dropdb_table_oids(int **oids, int dboid)
1305 {
1306 int *rtn = 0;
1307 int oids_size = 0;
1308 int *tmp;
1309
1310 int num_oids = 0;
1311 DIR *dir;
1312 struct dirent *dp;
1313 char path[1024];
1314
1315 snprintf(path, sizeof(path), "%s/%d", pool_config->memqcache_oiddir, dboid);
1316 if ((dir = opendir(path)) == NULL)
1317 {
1318 ereport(DEBUG1,
1319 (errmsg("memcache: getting drop table oids"),
1320 errdetail("Failed to open dir: %s", path)));
1321 return 0;
1322 }
1323
1324 while ((dp = readdir(dir)) != NULL)
1325 {
1326 if (strcmp(dp->d_name, ".") == 0 || strcmp(dp->d_name, "..") == 0)
1327 continue;
1328
1329 if (num_oids >= oids_size)
1330 {
1331 oids_size += POOL_OIDBUF_SIZE;
1332 tmp = repalloc(rtn, sizeof(int) * oids_size);
1333 if (tmp == NULL)
1334 {
1335 closedir(dir);
1336 return 0;
1337 }
1338 rtn = tmp;
1339 }
1340
1341 rtn[num_oids] = atol(dp->d_name);
1342 num_oids++;
1343 }
1344
1345 closedir(dir);
1346 *oids = rtn;
1347
1348 return num_oids;
1349 }
1350
1351 /* Discard oid internal buffer */
1352 static void
pool_discard_dml_table_oid(void)1353 pool_discard_dml_table_oid(void)
1354 {
1355 oidbufp = 0;
1356 }
1357
1358 /*
1359 * Management modules for oid map. When caching SELECT results, we
1360 * record table oids to file, which has following structure.
1361 *
1362 * memqcache_oiddir -+- database_oid -+-table_oid_file1
1363 * |
1364 * +-table_oid_file2
1365 * |
1366 * +-table_oid_file3...
1367 *
1368 * table_oid_file's name is table oid, which was used by the SELECT
1369 * statement. The file has 1 or more cacheid(s). When SELECT result is
1370 * cached, the file is created and cache id is appended. Later SELECT
1371 * using same table oid will add to the same file. If the SELECT uses
1372 * multiple tables, multiple table_oid_file will be created. When
1373 * INSERT/UPDATE/DELETE is executed, corresponding caches must be
1374 * deleted (cache invalidation) (when DROP TABLE, ALTER TABLE is
1375 * executed, the caches must be deleted as well). When database is
1376 * dropped, all caches belonging to the database must be deleted.
1377 */
1378
1379 /*
1380 * Get oid of current database
1381 */
1382 static int
pool_get_database_oid(void)1383 pool_get_database_oid(void)
1384 {
1385 /*
1386 * Query to convert table name to oid
1387 */
1388 int oid = 0;
1389 static POOL_RELCACHE * relcache;
1390 POOL_CONNECTION_POOL *backend;
1391
1392 backend = pool_get_session_context(false)->backend;
1393
1394 /*
1395 * If relcache does not exist, create it.
1396 */
1397 if (!relcache)
1398 {
1399 relcache = pool_create_relcache(pool_config->relcache_size, DATABASE_TO_OID_QUERY,
1400 int_register_func, int_unregister_func,
1401 false);
1402 if (relcache == NULL)
1403 {
1404 ereport(LOG,
1405 (errmsg("memcache: error creating relcache while getting database OID")));
1406 return oid;
1407 }
1408 }
1409
1410 /*
1411 * Search relcache.
1412 */
1413 oid = (int) (intptr_t) pool_search_relcache(relcache, backend,
1414 MASTER_CONNECTION(backend)->sp->database);
1415 return oid;
1416 }
1417
1418 /*
1419 * Get oid of current database for discarding cache files
1420 * after executing DROP DATABASE
1421 */
1422 int
pool_get_database_oid_from_dbname(char * dbname)1423 pool_get_database_oid_from_dbname(char *dbname)
1424 {
1425 int dboid = 0;
1426 POOL_SELECT_RESULT *res;
1427 char query[1024];
1428
1429 POOL_CONNECTION_POOL *backend;
1430
1431 backend = pool_get_session_context(false)->backend;
1432
1433 snprintf(query, sizeof(query), DATABASE_TO_OID_QUERY, dbname);
1434 do_query(MASTER(backend), query, &res, MAJOR(backend));
1435
1436 if (res->numrows != 1)
1437 {
1438 ereport(DEBUG1,
1439 (errmsg("memcache: getting oid of current database"),
1440 errdetail("received %d rows", res->numrows)));
1441 free_select_result(res);
1442 return 0;
1443 }
1444
1445 dboid = atol(res->data[0]);
1446 free_select_result(res);
1447
1448 return dboid;
1449 }
1450
1451 /*
1452 * Add cache id (shmem case) or hash key (memcached case) to table oid
1453 * map file. Caller must hold shmem lock before calling this function
1454 * to avoid file extension conflict among different pgpool child
1455 * process.
1456 * As of pgpool-II 3.2, pool_handle_query_cache is responsible for that.
1457 * (pool_handle_query_cache -> pool_commit_cache -> pool_add_table_oid_map)
1458 */
1459 static void
pool_add_table_oid_map(POOL_CACHEKEY * cachekey,int num_table_oids,int * table_oids)1460 pool_add_table_oid_map(POOL_CACHEKEY * cachekey, int num_table_oids, int *table_oids)
1461 {
1462 char *dir;
1463 int dboid;
1464 char path[1024];
1465 int i;
1466 int len;
1467
1468 /*
1469 * Create memqcache_oiddir
1470 */
1471 dir = pool_config->memqcache_oiddir;
1472
1473 if (mkdir(dir, S_IREAD | S_IWRITE | S_IEXEC) == -1)
1474 {
1475 if (errno != EEXIST)
1476 {
1477 ereport(WARNING,
1478 (errmsg("memcache: adding table oid maps, failed to create directory:\"%s\"", dir),
1479 errdetail("%m")));
1480 return;
1481 }
1482 }
1483
1484 /*
1485 * Create memqcache_oiddir/database_oid
1486 */
1487 dboid = pool_get_database_oid();
1488 ereport(DEBUG1,
1489 (errmsg("memcache: adding table oid maps"),
1490 errdetail("dboid %d", dboid)));
1491
1492 if (dboid <= 0)
1493 {
1494 ereport(WARNING,
1495 (errmsg("memcache: adding table oid maps, failed to get database OID")));
1496 return;
1497 }
1498
1499 snprintf(path, sizeof(path), "%s/%d", dir, dboid);
1500 if (mkdir(path, S_IREAD | S_IWRITE | S_IEXEC) == -1)
1501 {
1502 if (errno != EEXIST)
1503 {
1504 ereport(WARNING,
1505 (errmsg("memcache: adding table oid maps, failed to create directory:\"%s\"", path),
1506 errdetail("%m")));
1507 return;
1508 }
1509 }
1510
1511 if (pool_is_shmem_cache())
1512 {
1513 len = sizeof(cachekey->cacheid);
1514 }
1515 else
1516 {
1517 len = sizeof(cachekey->hashkey);
1518 }
1519
1520 for (i = 0; i < num_table_oids; i++)
1521 {
1522 int fd;
1523 int oid = table_oids[i];
1524 int sts;
1525 struct flock fl;
1526
1527 /*
1528 * Create or open each memqcache_oiddir/database_oid/table_oid
1529 */
1530 snprintf(path, sizeof(path), "%s/%d/%d", dir, dboid, oid);
1531 if ((fd = open(path, O_CREAT | O_RDWR, S_IRUSR | S_IWUSR)) == -1)
1532 {
1533 ereport(WARNING,
1534 (errmsg("memcache: adding table oid maps, failed to open file:\"%s\"", path),
1535 errdetail("%m")));
1536 return;
1537 }
1538
1539 fl.l_type = F_WRLCK;
1540 fl.l_whence = SEEK_SET;
1541 fl.l_start = 0; /* Offset from l_whence */
1542 fl.l_len = 0; /* length, 0 = to EOF */
1543
1544 sts = fcntl(fd, F_SETLKW, &fl);
1545 if (sts == -1)
1546 {
1547 ereport(WARNING,
1548 (errmsg("memcache: adding table oid maps, failed to lock file:\"%s\"", path),
1549 errdetail("%m")));
1550
1551 close(fd);
1552 return;
1553 }
1554
1555 /*
1556 * Below was ifdef-out because of a performance reason. Looking for
1557 * duplicate cache entries in a file needed unacceptably high cost. So
1558 * we gave up this and decided not to care about duplicate entries in
1559 * the file.
1560 */
1561 #ifdef NOT_USED
1562 for (;;)
1563 {
1564 sts = read(fd, (char *) &buf, len);
1565 if (sts == -1)
1566 {
1567 ereport(WARNING,
1568 (errmsg("memcache: adding table oid maps, failed to read file:\"%s\"", path),
1569 errdetail("%m")));
1570 close(fd);
1571 return;
1572 }
1573 else if (sts == len)
1574 {
1575 if (memcmp(cachekey, &buf, len) == 0)
1576 {
1577 /* Same key found. Skip this */
1578 close(fd);
1579 return;
1580 }
1581 continue;
1582 }
1583
1584 /*
1585 * Must be EOF
1586 */
1587 if (sts != 0)
1588 {
1589 ereport(WARNING,
1590 (errmsg("memcache: adding table oid maps, invalid data length:%d in file:\"%s\". error:\"%s\"", sts, path)));
1591 close(fd);
1592 return;
1593 }
1594 break;
1595 }
1596 #endif
1597
1598 if (lseek(fd, 0, SEEK_END) == -1)
1599 {
1600 ereport(WARNING,
1601 (errmsg("memcache: adding table oid maps, failed seek on file:\"%s\"", path),
1602 errdetail("%m")));
1603 close(fd);
1604 return;
1605 }
1606
1607 /*
1608 * Write cache_id or cache key at the end of file
1609 */
1610 sts = write(fd, (char *) cachekey, len);
1611 if (sts == -1 || sts != len)
1612 {
1613 ereport(WARNING,
1614 (errmsg("memcache: adding table oid maps, failed to write file:\"%s\"", path),
1615 errdetail("%m")));
1616 close(fd);
1617 return;
1618 }
1619 close(fd);
1620 }
1621 }
1622
1623 /*
1624 * Discard all oid maps at pgpool-II startup.
1625 * This is necessary for shmem case.
1626 */
1627 void
pool_discard_oid_maps(void)1628 pool_discard_oid_maps(void)
1629 {
1630 char command[1024];
1631
1632 snprintf(command, sizeof(command), "/bin/rm -fr %s/[0-9]*",
1633 pool_config->memqcache_oiddir);
1634 if (system(command) == -1)
1635 ereport(WARNING,
1636 (errmsg("unable to execute command \"%s\"", command),
1637 errdetail("system() command failed with error \"%m\"")));
1638
1639
1640 }
1641
1642 void
pool_discard_oid_maps_by_db(int dboid)1643 pool_discard_oid_maps_by_db(int dboid)
1644 {
1645 char command[1024];
1646
1647 if (pool_is_shmem_cache())
1648 {
1649 snprintf(command, sizeof(command), "/bin/rm -fr %s/%d/",
1650 pool_config->memqcache_oiddir, dboid);
1651
1652 ereport(DEBUG1,
1653 (errmsg("memcache: discarding oid maps by db"),
1654 errdetail("command: '%s\'", command)));
1655
1656 if (system(command) == -1)
1657 ereport(WARNING,
1658 (errmsg("unable to execute command \"%s\"", command),
1659 errdetail("system() command failed with error \"%m\"")));
1660 }
1661 }
1662
1663 /*
1664 * Read cache id (shmem case) or hash key (memcached case) from table
1665 * oid map file according to table_oids and discard cache entries. If
1666 * unlink is true, the file will be unlinked after successful cache
1667 * removal.
1668 */
1669 static void
pool_invalidate_query_cache(int num_table_oids,int * table_oid,bool unlinkp,int dboid)1670 pool_invalidate_query_cache(int num_table_oids, int *table_oid, bool unlinkp, int dboid)
1671 {
1672 char *dir;
1673 char path[1024];
1674 int i;
1675 int len;
1676 POOL_CACHEKEY buf;
1677
1678 /*
1679 * Create memqcache_oiddir
1680 */
1681 dir = pool_config->memqcache_oiddir;
1682 if (mkdir(dir, S_IREAD | S_IWRITE | S_IEXEC) == -1)
1683 {
1684 if (errno != EEXIST)
1685 {
1686 ereport(WARNING,
1687 (errmsg("memcache: invalidating query cache, failed to create directory:\"%s\"", dir),
1688 errdetail("%m")));
1689 return;
1690 }
1691 }
1692
1693 /*
1694 * Create memqcache_oiddir/database_oid
1695 */
1696 if (dboid == 0)
1697 {
1698 dboid = pool_get_database_oid();
1699 ereport(DEBUG1,
1700 (errmsg("memcache invalidating query cache"),
1701 errdetail("dboid %d", dboid)));
1702
1703 if (dboid <= 0)
1704 {
1705 ereport(WARNING,
1706 (errmsg("memcache: invalidating query cache, could not get database OID")));
1707 return;
1708 }
1709 }
1710
1711 snprintf(path, sizeof(path), "%s/%d", dir, dboid);
1712 if (mkdir(path, S_IREAD | S_IWRITE | S_IEXEC) == -1)
1713 {
1714 if (errno != EEXIST)
1715 {
1716 ereport(WARNING,
1717 (errmsg("memcache: invalidating query cache, failed to create directory:\"%s\"", path),
1718 errdetail("%m")));
1719 return;
1720 }
1721 }
1722
1723 if (pool_is_shmem_cache())
1724 {
1725 len = sizeof(buf.cacheid);
1726 }
1727 else
1728 {
1729 len = sizeof(buf.hashkey);
1730 }
1731
1732 for (i = 0; i < num_table_oids; i++)
1733 {
1734 int fd;
1735 int oid = table_oid[i];
1736 int sts;
1737 struct flock fl;
1738
1739 /*
1740 * Open each memqcache_oiddir/database_oid/table_oid
1741 */
1742 snprintf(path, sizeof(path), "%s/%d/%d", dir, dboid, oid);
1743 if ((fd = open(path, O_RDONLY)) == -1)
1744 {
1745 /*
1746 * This may be normal. It is possible that no SELECT has been
1747 * issued since the table has been created or since pgpool-II
1748 * started up.
1749 */
1750 ereport(DEBUG1,
1751 (errmsg("memcache invalidating query cache"),
1752 errdetail("failed to open \"%s\". reason:\"%m\"", path)));
1753 continue;
1754 }
1755
1756 fl.l_type = F_RDLCK;
1757 fl.l_whence = SEEK_SET;
1758 fl.l_start = 0; /* Offset from l_whence */
1759 fl.l_len = 0; /* length, 0 = to EOF */
1760
1761 sts = fcntl(fd, F_SETLKW, &fl);
1762 if (sts == -1)
1763 {
1764 ereport(WARNING,
1765 (errmsg("memcache: invalidating query cache, failed to lock file:\"%s\"", path),
1766 errdetail("%m")));
1767 close(fd);
1768 return;
1769 }
1770 for (;;)
1771 {
1772 sts = read(fd, (char *) &buf, len);
1773 if (sts == -1)
1774 {
1775 ereport(WARNING,
1776 (errmsg("memcache: invalidating query cache, failed to read file:\"%s\"", path),
1777 errdetail("%m")));
1778
1779 close(fd);
1780 return;
1781 }
1782 else if (sts == len)
1783 {
1784 if (pool_is_shmem_cache())
1785 {
1786 ereport(DEBUG1,
1787 (errmsg("memcache invalidating query cache"),
1788 errdetail("deleting cacheid:%d itemid:%d",
1789 buf.cacheid.blockid, buf.cacheid.itemid)));
1790 pool_delete_item_shmem_cache(&buf.cacheid);
1791 }
1792 #ifdef USE_MEMCACHED
1793 else
1794 {
1795 char delbuf[33];
1796
1797 memcpy(delbuf, buf.hashkey, 32);
1798 delbuf[32] = 0;
1799 ereport(DEBUG1,
1800 (errmsg("memcache invalidating query cache"),
1801 errdetail("deleting %s", delbuf)));
1802
1803 delete_cache_on_memcached(delbuf);
1804 }
1805 #endif
1806 continue;
1807 }
1808
1809 /*
1810 * Must be EOF
1811 */
1812 if (sts != 0)
1813 {
1814 ereport(WARNING,
1815 (errmsg("memcache: invalidating query cache, invalid data length:%d in file:\"%s\"", sts, path)));
1816 close(fd);
1817 return;
1818 }
1819 break;
1820 }
1821
1822 if (unlinkp)
1823 {
1824 unlink(path);
1825 }
1826 close(fd);
1827 }
1828 #ifdef SHMEMCACHE_DEBUG
1829 dump_shmem_cache(0);
1830 #endif
1831 }
1832
1833 /*
1834 * Reset SELECT data buffers. If reset_dml_oids is true, call
1835 * pool_discard_dml_table_oid() to reset table oids used in DML statements.
1836 */
1837 static void
pool_reset_memqcache_buffer(bool reset_dml_oids)1838 pool_reset_memqcache_buffer(bool reset_dml_oids)
1839 {
1840 POOL_SESSION_CONTEXT *session_context;
1841
1842 session_context = pool_get_session_context(true);
1843
1844 if (session_context && session_context->query_cache_array)
1845 {
1846 POOL_TEMP_QUERY_CACHE *cache;
1847
1848 ereport(DEBUG1,
1849 (errmsg("memcache reset buffer"),
1850 errdetail("discard: %p", session_context->query_cache_array)));
1851
1852 pool_discard_query_cache_array(session_context->query_cache_array);
1853 session_context->query_cache_array = pool_create_query_cache_array();
1854
1855 ereport(DEBUG1,
1856 (errmsg("memcache reset buffer"),
1857 errdetail("create: %p", session_context->query_cache_array)));
1858
1859 /*
1860 * if the query context is still under use, we cannot discard
1861 * temporary cache.
1862 */
1863 if ((SL_MODE && pool_is_doing_extended_query_message()) ||
1864 can_query_context_destroy(session_context->query_context))
1865 {
1866 ereport(DEBUG1,
1867 (errmsg("memcache reset buffer"),
1868 errdetail("discard temp buffer of %p (%s)",
1869 session_context->query_context, session_context->query_context->original_query)));
1870
1871 cache = pool_get_current_cache();
1872 pool_discard_temp_query_cache(cache);
1873
1874 /*
1875 * Reset temp_cache pointer in the current query context so that
1876 * we don't double free memory.
1877 */
1878 session_context->query_context->temp_cache = NULL;
1879 }
1880 }
1881
1882 if (reset_dml_oids)
1883 pool_discard_dml_table_oid();
1884
1885 pool_tmp_stats_reset_num_selects();
1886 }
1887
1888 /*
1889 * Return true if memory cache method is "shmem". The purpose of this
1890 * function is to cache the result of stcmp and to save a few cycle.
1891 */
1892 bool
pool_is_shmem_cache(void)1893 pool_is_shmem_cache(void)
1894 {
1895 return pool_config->memqcache_method == SHMEM_CACHE;
1896 }
1897
1898 /*
1899 * Remember memory cache number of blocks.
1900 */
1901 static int memqcache_num_blocks;
1902 static void
pool_set_memqcache_blocks(int num_blocks)1903 pool_set_memqcache_blocks(int num_blocks)
1904 {
1905 memqcache_num_blocks = num_blocks;
1906 }
1907
1908 /*
1909 * Return memory cache number of blocks.
1910 */
1911 static int
pool_get_memqcache_blocks(void)1912 pool_get_memqcache_blocks(void)
1913 {
1914 return memqcache_num_blocks;
1915 }
1916
1917 /*
1918 * Query cache on shared memory management modules.
1919 */
1920
1921 /*
1922 * Calculate necessary shared memory size.
1923 */
1924 size_t
pool_shared_memory_cache_size(void)1925 pool_shared_memory_cache_size(void)
1926 {
1927 int64 num_blocks;
1928 size_t size;
1929
1930 if (pool_config->memqcache_maxcache > pool_config->memqcache_cache_block_size)
1931 ereport(FATAL,
1932 (errmsg("invalid memory cache configuration"),
1933 errdetail("memqcache_cache_block_size %d should be greater or equal to memqcache_maxcache %d",
1934 pool_config->memqcache_cache_block_size,
1935 pool_config->memqcache_maxcache)));
1936
1937
1938 num_blocks = pool_config->memqcache_total_size /
1939 pool_config->memqcache_cache_block_size;
1940 if (num_blocks == 0)
1941 ereport(FATAL,
1942 (errmsg("invalid memory cache configuration"),
1943 errdetail("memqcache_total_size %ld should be greater or equal to memqcache_cache_block_size %d",
1944 pool_config->memqcache_total_size,
1945 pool_config->memqcache_cache_block_size)));
1946
1947 ereport(LOG,
1948 (errmsg("memory cache initialized"),
1949 errdetail("memcache blocks :%ld", num_blocks)));
1950 /* Remember # of blocks */
1951 pool_set_memqcache_blocks(num_blocks);
1952 size = pool_config->memqcache_cache_block_size * num_blocks;
1953 return size;
1954 }
1955
1956 /*
1957 * Acquire and initialize shared memory cache. This should be called
1958 * only once from pgpool main process at the process staring up time.
1959 */
1960 static void *shmem;
1961 int
pool_init_memory_cache(size_t size)1962 pool_init_memory_cache(size_t size)
1963 {
1964 ereport(DEBUG1,
1965 (errmsg("memory cache request size : %zd", size)));
1966
1967 shmem = pool_shared_memory_create(size);
1968 return 0;
1969 }
1970
1971 /*
1972 * Clear all the shared memory cache and reset FSMM and hash table.
1973 */
1974 void
pool_clear_memory_cache(void)1975 pool_clear_memory_cache(void)
1976 {
1977 size_t size;
1978 pool_sigset_t oldmask;
1979
1980 POOL_SETMASK2(&BlockSig, &oldmask);
1981 pool_shmem_lock();
1982
1983 PG_TRY();
1984 {
1985 size = pool_shared_memory_cache_size();
1986 memset(shmem, 0, size);
1987
1988 size = pool_shared_memory_fsmm_size();
1989 pool_reset_fsmm(size);
1990
1991 pool_discard_oid_maps();
1992
1993 pool_hash_reset(pool_config->memqcache_max_num_cache);
1994 }
1995 PG_CATCH();
1996 {
1997 pool_shmem_unlock();
1998 POOL_SETMASK(&oldmask);
1999 PG_RE_THROW();
2000 }
2001 PG_END_TRY();
2002
2003 pool_shmem_unlock();
2004 POOL_SETMASK(&oldmask);
2005 }
2006
2007 /*
2008 * Return shared memory cache address
2009 */
2010 static void *
pool_memory_cache_address(void)2011 pool_memory_cache_address(void)
2012 {
2013 return shmem;
2014 }
2015
2016 /*
2017 * Initialize new block
2018 */
2019
2020 /*
2021 * Free space management map
2022 *
2023 * Free space management map (FSMM) consists of bytes. Each byte
2024 * corresponds to block id. For example, if you have 1GB cache and
2025 * block size is 8Kb, number of blocks = 131,072, thus total size of
2026 * FSMM is 128Kb. Each FSMM entry has value from 0 to 255. Those
2027 * values describes total free space in each block.
2028 * For example, if the value is 2, the free space can be between 64
2029 * bytes and 95 bytes.
2030 *
2031 * value free space(in bytes)
2032 * 0 0-31
2033 * 1 32-63
2034 * 2 64-95
2035 * 3 96-127
2036 * :
2037 * 255 8160-8192
2038 */
2039
2040 /*
2041 * Calculate necessary shared memory size for FSMM. Should be called after
2042 * pool_shared_memory_cache_size.
2043 */
2044 size_t
pool_shared_memory_fsmm_size(void)2045 pool_shared_memory_fsmm_size(void)
2046 {
2047 size_t size;
2048
2049 size = pool_get_memqcache_blocks() * sizeof(char);
2050 return size;
2051 }
2052
2053 /*
2054 * Acquire and initialize shared memory cache for FSMM. This should be
2055 * called after pool_shared_memory_cache_size only once from pgpool
2056 * main process at the process staring up time.
2057 */
2058 static void *fsmm;
2059 int
pool_init_fsmm(size_t size)2060 pool_init_fsmm(size_t size)
2061 {
2062 int maxblock = pool_get_memqcache_blocks();
2063 int encode_value;
2064
2065 fsmm = pool_shared_memory_create(size);
2066 encode_value = POOL_MAX_FREE_SPACE / POOL_FSMM_RATIO;
2067 memset(fsmm, encode_value, maxblock);
2068 return 0;
2069 }
2070
2071 /*
2072 * Return shared memory fsmm address
2073 */
2074 static void *
pool_fsmm_address(void)2075 pool_fsmm_address(void)
2076 {
2077 return fsmm;
2078 }
2079
2080 /*
2081 * Clock algorithm shared query cache management modules.
2082 */
2083
2084 /*
2085 * Clock hand pointing to next victim block
2086 */
2087 static int *pool_fsmm_clock_hand;
2088
2089 /*
2090 * Allocate and initialize clock hand on shmem
2091 */
2092 void
pool_allocate_fsmm_clock_hand(void)2093 pool_allocate_fsmm_clock_hand(void)
2094 {
2095 pool_fsmm_clock_hand = pool_shared_memory_create(sizeof(*pool_fsmm_clock_hand));
2096 *pool_fsmm_clock_hand = 0;
2097 }
2098
2099 /*
2100 * Reset FSMM.
2101 */
2102 static void
pool_reset_fsmm(size_t size)2103 pool_reset_fsmm(size_t size)
2104 {
2105 int encode_value;
2106
2107 encode_value = POOL_MAX_FREE_SPACE / POOL_FSMM_RATIO;
2108 memset(fsmm, encode_value, size);
2109
2110 *pool_fsmm_clock_hand = 0;
2111 }
2112
2113 /*
2114 * Find victim block using clock algorithm and make it free.
2115 * Returns new free block id.
2116 */
pool_reuse_block(void)2117 static POOL_CACHE_BLOCKID pool_reuse_block(void)
2118 {
2119 int maxblock = pool_get_memqcache_blocks();
2120 char *block = block_address(*pool_fsmm_clock_hand);
2121 POOL_CACHE_BLOCK_HEADER *bh = (POOL_CACHE_BLOCK_HEADER *) block;
2122 POOL_CACHE_BLOCKID reused_block;
2123 POOL_CACHE_ITEM_POINTER *cip;
2124 char *p;
2125 int i;
2126
2127 bh->flags = 0;
2128 reused_block = *pool_fsmm_clock_hand;
2129 p = block_address(reused_block);
2130
2131 for (i = 0; i < bh->num_items; i++)
2132 {
2133 cip = item_pointer(p, i);
2134
2135 if (!(POOL_ITEM_DELETED & cip->flags))
2136 {
2137 pool_hash_delete(&cip->query_hash);
2138 ereport(DEBUG1,
2139 (errmsg("pool_reuse_block: blockid: %d item: %d", reused_block, i)));
2140 }
2141 }
2142
2143 pool_init_cache_block(reused_block);
2144 pool_update_fsmm(reused_block, POOL_MAX_FREE_SPACE);
2145
2146 (*pool_fsmm_clock_hand)++;
2147 if (*pool_fsmm_clock_hand >= maxblock)
2148 *pool_fsmm_clock_hand = 0;
2149
2150 ereport(LOG,
2151 (errmsg("pool_reuse_block: blockid: %d", reused_block)));
2152
2153 return reused_block;
2154 }
2155
2156 /*
2157 * Get block id which has enough space
2158 */
pool_get_block(size_t free_space)2159 static POOL_CACHE_BLOCKID pool_get_block(size_t free_space)
2160 {
2161 int encode_value;
2162 unsigned char *p = pool_fsmm_address();
2163 int i;
2164 int maxblock = pool_get_memqcache_blocks();
2165 POOL_CACHE_BLOCK_HEADER *bh;
2166
2167 if (p == NULL)
2168 {
2169 ereport(WARNING,
2170 (errmsg("memcache: getting block: FSMM is not initialized")));
2171 return -1;
2172 }
2173
2174 if (free_space > POOL_MAX_FREE_SPACE)
2175 {
2176 ereport(WARNING,
2177 (errmsg("memcache: getting block: invalid free space:%zd", free_space),
2178 errdetail("requested free space: %zd is more than maximum allowed space:%lu", free_space, POOL_MAX_FREE_SPACE)));
2179 return -1;
2180 }
2181
2182 encode_value = free_space / POOL_FSMM_RATIO;
2183
2184 for (i = 0; i < maxblock; i++)
2185 {
2186 if (p[i] >= encode_value)
2187 {
2188 /*
2189 * This block *may" have enough space. We need to make sure it
2190 * actually has enough space.
2191 */
2192 bh = (POOL_CACHE_BLOCK_HEADER *) block_address(i);
2193 if (bh->free_bytes >= free_space)
2194 {
2195 return (POOL_CACHE_BLOCKID) i;
2196 }
2197 }
2198 }
2199
2200 /*
2201 * No enough space found. Reuse victim block
2202 */
2203 return pool_reuse_block();
2204 }
2205
2206 /*
2207 * Update free space info for specified block
2208 */
2209 static void
pool_update_fsmm(POOL_CACHE_BLOCKID blockid,size_t free_space)2210 pool_update_fsmm(POOL_CACHE_BLOCKID blockid, size_t free_space)
2211 {
2212 int encode_value;
2213 char *p = pool_fsmm_address();
2214
2215 if (p == NULL)
2216 {
2217 ereport(WARNING,
2218 (errmsg("memcache: updating free space in block: FSMM is not initialized")));
2219 return;
2220 }
2221
2222 if (blockid >= pool_get_memqcache_blocks())
2223 {
2224 ereport(WARNING,
2225 (errmsg("memcache: updating free space in block: block id:%d in invalid", blockid)));
2226 return;
2227 }
2228
2229 if (free_space > POOL_MAX_FREE_SPACE)
2230 {
2231 ereport(WARNING,
2232 (errmsg("memcache: updating free space in block: invalid free space:%zd", free_space),
2233 errdetail("requested free space: %zd is more than maximum allowed space:%lu", free_space, POOL_MAX_FREE_SPACE)));
2234
2235 return;
2236 }
2237
2238 encode_value = free_space / POOL_FSMM_RATIO;
2239
2240 p[blockid] = encode_value;
2241
2242 return;
2243 }
2244
2245 /*
2246 * Add item data to shared memory cache.
2247 * On successful registration, returns cache id.
2248 * The cache id is overwritten by the subsequent call to this function.
2249 * On error returns NULL.
2250 */
pool_add_item_shmem_cache(POOL_QUERY_HASH * query_hash,char * data,int size)2251 static POOL_CACHEID * pool_add_item_shmem_cache(POOL_QUERY_HASH * query_hash, char *data, int size)
2252 {
2253 static POOL_CACHEID cacheid;
2254 POOL_CACHE_BLOCKID blockid;
2255 POOL_CACHE_BLOCK_HEADER *bh;
2256 POOL_CACHE_ITEM_POINTER *cip;
2257
2258 POOL_CACHE_ITEM ci;
2259 POOL_CACHE_ITEM_POINTER cip_body;
2260 char *item;
2261
2262 int request_size;
2263 char *p;
2264 int i;
2265 char *src;
2266 char *dst;
2267 int num_deleted;
2268 char *dcip;
2269 char *dci;
2270 bool need_pack;
2271 char *work_buffer;
2272 int index;
2273
2274 if (query_hash == NULL)
2275 {
2276 ereport(LOG,
2277 (errmsg("error while adding item to shmem cache, query hash is NULL")));
2278 return NULL;
2279 }
2280
2281 if (data == NULL)
2282 {
2283 ereport(LOG,
2284 (errmsg("error while adding item to shmem cache, data is NULL")));
2285 return NULL;
2286 }
2287
2288 if (size <= 0)
2289 {
2290 ereport(LOG,
2291 (errmsg("error while adding item to shmem cache, invalid request size: %d", size)));
2292 return NULL;
2293 }
2294
2295 /* Add overhead */
2296 request_size = size + sizeof(POOL_CACHE_ITEM_POINTER) + sizeof(POOL_CACHE_ITEM_HEADER);
2297
2298 /* Get cache block which has enough space */
2299 blockid = pool_get_block(request_size);
2300
2301 if (blockid == -1)
2302 {
2303 return NULL;
2304 }
2305
2306 /*
2307 * Initialize the block if necessary. If no live items are remained, we
2308 * also initialize the block. If there's contiguous deleted items, we turn
2309 * them into free space as well.
2310 */
2311 pool_init_cache_block(blockid);
2312
2313 /*
2314 * Make sure that we have at least one free hash element.
2315 */
2316 while (!is_free_hash_element())
2317 {
2318 /* If not, reuse next victim block */
2319 blockid = pool_reuse_block();
2320 pool_init_cache_block(blockid);
2321 }
2322
2323 /* Get block address on shmem */
2324 p = block_address(blockid);
2325 bh = (POOL_CACHE_BLOCK_HEADER *) p;
2326
2327 /*
2328 * Create contiguous free space. We assume that item bodies are ordered
2329 * from bottom to top of the block, and corresponding item pointers are
2330 * ordered from the youngest to the oldest in the beginning of the block.
2331 */
2332
2333 /*
2334 * Optimization. If there's no deleted items, we don't need to pack it to
2335 * create contiguous free space.
2336 */
2337 need_pack = false;
2338 for (i = 0; i < bh->num_items; i++)
2339 {
2340 cip = item_pointer(p, i);
2341
2342 if (POOL_ITEM_DELETED & cip->flags) /* Deleted item? */
2343 {
2344 need_pack = true;
2345 ereport(DEBUG1,
2346 (errmsg("memcache adding item"),
2347 errdetail("start creating contiguous space")));
2348 break;
2349 }
2350 }
2351
2352 /*
2353 * We disable packing for now. Revisit and remove following code fragment
2354 * later.
2355 */
2356 need_pack = false;
2357
2358 if (need_pack)
2359 {
2360 /*
2361 * Pack and create contiguous free space.
2362 */
2363 dcip = calloc(1, pool_config->memqcache_cache_block_size);
2364 if (!dcip)
2365 {
2366 ereport(WARNING,
2367 (errmsg("memcache: adding item to cache: calloc failed")));
2368 return NULL;
2369 }
2370
2371 work_buffer = dcip;
2372 dci = dcip + pool_config->memqcache_cache_block_size;
2373 num_deleted = 0;
2374 index = 0;
2375
2376 for (i = 0; i < bh->num_items; i++)
2377 {
2378 int total_length;
2379 POOL_CACHEID cid;
2380
2381 cip = item_pointer(p, i);
2382
2383 if (POOL_ITEM_DELETED & cip->flags) /* Deleted item? */
2384 {
2385 num_deleted++;
2386 continue;
2387 }
2388
2389 /* Copy item body */
2390 src = p + cip->offset;
2391 total_length = item_header(p, i)->total_length;
2392 dst = dci - total_length;
2393 cip->offset = dst - work_buffer;
2394 memcpy(dst, src, total_length);
2395
2396 dci -= total_length;
2397
2398 /* Copy item pointer */
2399 src = (char *) cip;
2400 dst = (char *) item_pointer(dcip, index);
2401 memcpy(dst, src, sizeof(POOL_CACHE_ITEM_POINTER));
2402
2403 /* Update hash index */
2404 cid.blockid = blockid;
2405 cid.itemid = index;
2406 pool_hash_insert(&cip->query_hash, &cid, true);
2407 ereport(DEBUG1,
2408 (errmsg("memcache adding item"),
2409 errdetail("item cid updated. old:%d %d new:%d %d",
2410 blockid, i, blockid, index)));
2411 index++;
2412 }
2413
2414 /* All items deleted? */
2415 if (num_deleted > 0 && num_deleted == bh->num_items)
2416 {
2417 ereport(DEBUG1,
2418 (errmsg("memcache adding item"),
2419 errdetail("all items deleted, total deleted:%d", num_deleted)));
2420 bh->flags = 0;
2421 pool_init_cache_block(blockid);
2422 pool_update_fsmm(blockid, POOL_MAX_FREE_SPACE);
2423 }
2424 else
2425 {
2426 /* Update number of items */
2427 bh->num_items -= num_deleted;
2428
2429 /* Copy back the packed block except block header */
2430 memcpy(p + sizeof(POOL_CACHE_BLOCK_HEADER),
2431 work_buffer + sizeof(POOL_CACHE_BLOCK_HEADER),
2432 pool_config->memqcache_cache_block_size - sizeof(POOL_CACHE_BLOCK_HEADER));
2433 }
2434 free(work_buffer);
2435 }
2436
2437 /*
2438 * Make sure that we have enough free space
2439 */
2440 if (bh->free_bytes < request_size)
2441 {
2442 /* This should not happen */
2443 ereport(WARNING,
2444 (errmsg("memcache: adding item to cache: not enough space"),
2445 errdetail("free space: %d required: %d block id:%d",
2446 bh->free_bytes, request_size, blockid)));
2447 return NULL;
2448 }
2449
2450 /*
2451 * At this point, new item can be located at block_header->num_items
2452 */
2453
2454 /* Fill in cache item header */
2455 ci.header.timestamp = time(NULL);
2456 ci.header.total_length = sizeof(POOL_CACHE_ITEM_HEADER) + size;
2457
2458 /* Calculate item body address */
2459 if (bh->num_items == 0)
2460 {
2461 /*
2462 * This is the #0 item. So address is block_bottom - data_length
2463 */
2464 item = p + pool_config->memqcache_cache_block_size - ci.header.total_length;
2465
2466 /* Mark this block used */
2467 bh->flags = POOL_BLOCK_USED;
2468 }
2469 else
2470 {
2471 cip = item_pointer(p, bh->num_items - 1);
2472 item = p + cip->offset - ci.header.total_length;
2473 }
2474
2475 /* Copy item header */
2476 memcpy(item, &ci, sizeof(POOL_CACHE_ITEM_HEADER));
2477 bh->free_bytes -= sizeof(POOL_CACHE_ITEM_HEADER);
2478
2479 /* Copy item body */
2480 memcpy(item + sizeof(POOL_CACHE_ITEM_HEADER), data, size);
2481 bh->free_bytes -= size;
2482
2483 /* Copy cache item pointer */
2484 memcpy(&cip_body.query_hash, query_hash, sizeof(POOL_QUERY_HASH));
2485 memset(&cip_body.next, 0, sizeof(POOL_CACHEID));
2486 cip_body.offset = item - p;
2487 cip_body.flags = POOL_ITEM_USED;
2488 memcpy(item_pointer(p, bh->num_items), &cip_body, sizeof(POOL_CACHE_ITEM_POINTER));
2489 bh->free_bytes -= sizeof(POOL_CACHE_ITEM_POINTER);
2490
2491 /* Update FSMM */
2492 pool_update_fsmm(blockid, bh->free_bytes);
2493
2494 cacheid.blockid = blockid;
2495 cacheid.itemid = bh->num_items;
2496 ereport(DEBUG1,
2497 (errmsg("memcache adding item"),
2498 errdetail("new item inserted. blockid: %d itemid:%d",
2499 cacheid.blockid, cacheid.itemid)));
2500
2501 /* Add up number of items */
2502 bh->num_items++;
2503
2504 /* Update hash table */
2505 if (pool_hash_insert(query_hash, &cacheid, false) < 0)
2506 {
2507 ereport(LOG,
2508 (errmsg("error while adding item to shmem cache, hash insert failed")));
2509
2510 /*
2511 * Since we have failed to insert hash index entry, we need to undo
2512 * the addition of cache entry.
2513 */
2514 pool_delete_item_shmem_cache(&cacheid);
2515 return NULL;
2516 }
2517 ereport(DEBUG1,
2518 (errmsg("memcache adding item"),
2519 errdetail("block: %d item: %d", cacheid.blockid, cacheid.itemid)));
2520
2521 #ifdef SHMEMCACHE_DEBUG
2522 dump_shmem_cache(blockid);
2523 #endif
2524 return &cacheid;
2525 }
2526
2527 /*
2528 * Returns item data address on shared memory cache specified by query hash.
2529 * Also data length is set to *size.
2530 * On error or data not found case returns NULL.
2531 * Detail is set to *sts. (0: success, 1: not found, -1: error)
2532 */
2533 static char *
pool_get_item_shmem_cache(POOL_QUERY_HASH * query_hash,int * size,int * sts)2534 pool_get_item_shmem_cache(POOL_QUERY_HASH * query_hash, int *size, int *sts)
2535 {
2536 POOL_CACHEID *cacheid;
2537 POOL_CACHE_ITEM_HEADER *cih;
2538
2539 if (sts == NULL)
2540 {
2541 ereport(LOG,
2542 (errmsg("error while getting item from shmem cache, sts is NULL")));
2543 return NULL;
2544 }
2545
2546 *sts = -1;
2547
2548 if (query_hash == NULL)
2549 {
2550 ereport(LOG,
2551 (errmsg("error while getting item from shmem cache, query hash is NULL")));
2552 return NULL;
2553 }
2554
2555 if (size == NULL)
2556 {
2557 ereport(LOG,
2558 (errmsg("error while getting item from shmem cache, size is NULL")));
2559 return NULL;
2560 }
2561
2562 /*
2563 * Find cache header by using hash table
2564 */
2565 cacheid = pool_find_item_on_shmem_cache(query_hash);
2566 if (cacheid == NULL)
2567 {
2568 /* Not found */
2569 *sts = 1;
2570 return NULL;
2571 }
2572
2573 cih = pool_cache_item_header(cacheid);
2574
2575 *size = cih->total_length - sizeof(POOL_CACHE_ITEM_HEADER);
2576 return (char *) cih + sizeof(POOL_CACHE_ITEM_HEADER);
2577 }
2578
2579 /*
2580 * Find data on shared memory cache specified query hash.
2581 * On success returns cache id.
2582 * The cache id is overwritten by the subsequent call to this function.
2583 */
pool_find_item_on_shmem_cache(POOL_QUERY_HASH * query_hash)2584 static POOL_CACHEID * pool_find_item_on_shmem_cache(POOL_QUERY_HASH * query_hash)
2585 {
2586 static POOL_CACHEID cacheid;
2587 POOL_CACHEID *c;
2588 POOL_CACHE_ITEM_HEADER *cih;
2589 time_t now;
2590
2591 c = pool_hash_search(query_hash);
2592 if (!c)
2593 {
2594 return NULL;
2595 }
2596
2597 cih = item_header(block_address(c->blockid), c->itemid);
2598
2599 /* Check cache expiration */
2600 if (pool_config->memqcache_expire > 0)
2601 {
2602 now = time(NULL);
2603 if (now > (cih->timestamp + pool_config->memqcache_expire))
2604 {
2605 ereport(DEBUG1,
2606 (errmsg("memcache finding item"),
2607 errdetail("cache expired: now: %ld timestamp: %ld",
2608 now, cih->timestamp + pool_config->memqcache_expire)));
2609 pool_delete_item_shmem_cache(c);
2610 return NULL;
2611 }
2612 }
2613
2614 cacheid.blockid = c->blockid;
2615 cacheid.itemid = c->itemid;
2616 return &cacheid;
2617 }
2618
2619 /*
2620 * Delete item data specified cache id from shmem.
2621 * On successful deletion, returns 0.
2622 * Other wise return -1.
2623 * FSMM is also updated.
2624 */
2625 static int
pool_delete_item_shmem_cache(POOL_CACHEID * cacheid)2626 pool_delete_item_shmem_cache(POOL_CACHEID * cacheid)
2627 {
2628 POOL_CACHE_BLOCK_HEADER *bh;
2629 POOL_CACHE_ITEM_POINTER *cip;
2630 POOL_CACHE_ITEM_HEADER *cih;
2631 POOL_QUERY_HASH key;
2632 int size;
2633
2634 ereport(DEBUG1,
2635 (errmsg("memcache deleting item data"),
2636 errdetail("cacheid:%d itemid:%d", cacheid->blockid, cacheid->itemid)));
2637
2638 if (cacheid->blockid >= pool_get_memqcache_blocks())
2639 {
2640 ereport(LOG,
2641 (errmsg("error while deleting item from shmem cache, invalid block: %d",
2642 cacheid->blockid)));
2643 return -1;
2644 }
2645
2646 bh = (POOL_CACHE_BLOCK_HEADER *) block_address(cacheid->blockid);
2647 if (!(bh->flags & POOL_BLOCK_USED))
2648 {
2649 ereport(LOG,
2650 (errmsg("error while deleting item from shmem cache, block: %d is not used",
2651 cacheid->blockid)));
2652 return -1;
2653 }
2654
2655 if (cacheid->itemid >= bh->num_items)
2656 {
2657 /*
2658 * This could happen if the block is reused. Since contents of oid
2659 * map file is not updated when the block is reused.
2660 */
2661 ereport(DEBUG1,
2662 (errmsg("memcache error deleting item data"),
2663 errdetail("invalid item id %d in block:%d",
2664 cacheid->itemid, cacheid->blockid)));
2665
2666 return -1;
2667 }
2668
2669 cip = item_pointer(block_address(cacheid->blockid), cacheid->itemid);
2670 if (!(cip->flags & POOL_ITEM_USED))
2671 {
2672 ereport(LOG,
2673 (errmsg("error while deleting item from shmem cache, item: %d was not used",
2674 cacheid->itemid)));
2675 return -1;
2676 }
2677
2678 if (cip->flags & POOL_ITEM_DELETED)
2679 {
2680 ereport(LOG,
2681 (errmsg("error while deleting item from shmem cache, item: %d was already deleted",
2682 cacheid->itemid)));
2683 return -1;
2684 }
2685
2686 /* Save cache key */
2687 memcpy(&key, &cip->query_hash, sizeof(POOL_QUERY_HASH));
2688
2689 cih = pool_cache_item_header(cacheid);
2690 size = cih->total_length + sizeof(POOL_CACHE_ITEM_POINTER);
2691
2692 /* Delete item pointer */
2693 cip->flags |= POOL_ITEM_DELETED;
2694
2695 /*
2696 * We do NOT count down bh->num_items here. The deleted space will be
2697 * recycled by pool_add_item_shmem_cache(). However, if this is the last
2698 * item, we can recycle whole block.
2699 *
2700 * 2012/4/1: Now we do not pack data in pool_add_item_shmem_cache() for
2701 * performance reason. Also we count down num_items if it is the last one.
2702 */
2703 if ((bh->num_items - 1) == 0)
2704 {
2705 ereport(DEBUG1,
2706 (errmsg("memcache deleting item data"),
2707 errdetail("no item remains. initialize block")));
2708 bh->flags = 0;
2709 pool_init_cache_block(cacheid->blockid);
2710 }
2711
2712 /* Remove hash index */
2713 pool_hash_delete(&key);
2714
2715 /*
2716 * If the deleted item is last one in the block, we add it to the free
2717 * space.
2718 */
2719 if (cacheid->itemid == (bh->num_items - 1))
2720 {
2721 bh->free_bytes += size;
2722 ereport(DEBUG1,
2723 (errmsg("memcache deleting item data"),
2724 errdetail("deleted %d bytes, freebytes is = %d",
2725 size, bh->free_bytes)));
2726
2727 bh->num_items--;
2728 }
2729
2730 /* Update FSMM */
2731 pool_update_fsmm(cacheid->blockid, bh->free_bytes);
2732
2733 return 0;
2734 }
2735
2736 /*
2737 * Returns item header specified by cache id.
2738 */
pool_cache_item_header(POOL_CACHEID * cacheid)2739 static POOL_CACHE_ITEM_HEADER * pool_cache_item_header(POOL_CACHEID * cacheid)
2740 {
2741 POOL_CACHE_BLOCK_HEADER *bh;
2742
2743 if (cacheid->blockid >= pool_get_memqcache_blocks())
2744 {
2745 ereport(WARNING,
2746 (errmsg("error while getting cache item header, invalid block id: %d", cacheid->blockid)));
2747 return NULL;
2748 }
2749
2750 bh = (POOL_CACHE_BLOCK_HEADER *) block_address(cacheid->blockid);
2751 if (cacheid->itemid >= bh->num_items)
2752 {
2753 ereport(WARNING,
2754 (errmsg("error while getting cache item header, invalid item id: %d", cacheid->itemid)));
2755 return NULL;
2756 }
2757
2758 return item_header((char *) bh, cacheid->itemid);
2759 }
2760
2761 /*
2762 * Initialize specified block.
2763 */
2764 static int
pool_init_cache_block(POOL_CACHE_BLOCKID blockid)2765 pool_init_cache_block(POOL_CACHE_BLOCKID blockid)
2766 {
2767 char *p;
2768 POOL_CACHE_BLOCK_HEADER *bh;
2769
2770 if (blockid >= pool_get_memqcache_blocks())
2771 {
2772 ereport(WARNING,
2773 (errmsg("error while initializing cache block, invalid block id: %d", blockid)));
2774 return -1;
2775 }
2776
2777 p = block_address(blockid);
2778 bh = (POOL_CACHE_BLOCK_HEADER *) p;
2779
2780 /* Is this block used? */
2781 if (!(bh->flags & POOL_BLOCK_USED))
2782 {
2783 /* Initialize empty block */
2784 memset(p, 0, pool_config->memqcache_cache_block_size);
2785 bh->free_bytes = pool_config->memqcache_cache_block_size -
2786 sizeof(POOL_CACHE_BLOCK_HEADER);
2787 }
2788 return 0;
2789 }
2790
2791 #if NOT_USED
2792 /*
2793 * Delete all items in the block.
2794 */
2795 static void
pool_wipe_out_cache_block(POOL_CACHE_BLOCKID blockid)2796 pool_wipe_out_cache_block(POOL_CACHE_BLOCKID blockid)
2797 {
2798 char *p;
2799 POOL_CACHE_BLOCK_HEADER *bh;
2800 POOL_CACHE_ITEM_POINTER *cip;
2801 POOL_CACHEID cacheid;
2802 int i;
2803
2804 /* Get block address on shmem */
2805 p = block_address(blockid);
2806 bh = (POOL_CACHE_BLOCK_HEADER *) p;
2807 cacheid.blockid = blockid;
2808
2809 for (i = 0; i < bh->num_items; i++)
2810 {
2811 cip = item_pointer(p, i);
2812
2813 if ((POOL_ITEM_DELETED & cip->flags) == 0) /* Not deleted item? */
2814 {
2815 cacheid.itemid = i;
2816 pool_delete_item_shmem_cache(&cacheid);
2817 }
2818 }
2819
2820 bh->flags = 0;
2821 pool_init_cache_block(blockid);
2822 pool_update_fsmm(blockid, POOL_MAX_FREE_SPACE);
2823 }
2824 #endif
2825
2826 /*
2827 * Acquire lock: XXX giant lock
2828 */
2829 void
pool_shmem_lock(void)2830 pool_shmem_lock(void)
2831 {
2832 if (pool_is_shmem_cache())
2833 {
2834 pool_semaphore_lock(SHM_CACHE_SEM);
2835
2836 }
2837 }
2838
2839 /*
2840 * Release lock
2841 */
2842 void
pool_shmem_unlock(void)2843 pool_shmem_unlock(void)
2844 {
2845 if (pool_is_shmem_cache())
2846 {
2847 pool_semaphore_unlock(SHM_CACHE_SEM);
2848 }
2849 }
2850
2851 /*
2852 * Returns cache block address specified by block id
2853 */
2854 static char *
block_address(int blockid)2855 block_address(int blockid)
2856 {
2857 char *p;
2858
2859 p = pool_memory_cache_address() +
2860 blockid * pool_config->memqcache_cache_block_size;
2861 return p;
2862 }
2863
2864 /*
2865 * Returns i th item pointer in block address block
2866 */
item_pointer(char * block,int i)2867 static POOL_CACHE_ITEM_POINTER * item_pointer(char *block, int i)
2868 {
2869 return (POOL_CACHE_ITEM_POINTER *) (block + sizeof(POOL_CACHE_BLOCK_HEADER) +
2870 sizeof(POOL_CACHE_ITEM_POINTER) * i);
2871 }
2872
2873 /*
2874 * Returns i th item header in block address block
2875 */
item_header(char * block,int i)2876 static POOL_CACHE_ITEM_HEADER * item_header(char *block, int i)
2877 {
2878 POOL_CACHE_ITEM_POINTER *cip;
2879
2880 cip = item_pointer(block, i);
2881 return (POOL_CACHE_ITEM_HEADER *) (block + cip->offset);
2882 }
2883
2884 #ifdef SHMEMCACHE_DEBUG
2885 /*
2886 * Dump shmem cache block
2887 */
2888 static void
dump_shmem_cache(POOL_CACHE_BLOCKID blockid)2889 dump_shmem_cache(POOL_CACHE_BLOCKID blockid)
2890 {
2891 POOL_CACHE_BLOCK_HEADER *bh;
2892 POOL_CACHE_ITEM_POINTER *cip;
2893 POOL_CACHE_ITEM_HEADER *cih;
2894 int i;
2895
2896 bh = (POOL_CACHE_BLOCK_HEADER *) block_address(blockid);
2897 fprintf(stderr, "shmem: block header(%lu bytes): flags:%x num_items:%d free_bytes:%d\n",
2898 sizeof(*bh), bh->flags, bh->num_items, bh->free_bytes);
2899 for (i = 0; i < bh->num_items; i++)
2900 {
2901 cip = item_pointer((char *) bh, i);
2902 fprintf(stderr, "shmem: block: %d %d th item pointer(%lu bytes): offset:%d flags:%x\n",
2903 blockid, i, sizeof(*cip), cip->offset, cip->flags);
2904 cih = item_header((char *) bh, i);
2905 fprintf(stderr, "shmem: block: %d %d th item header(%lu bytes): timestamp:%ld length:%d\n",
2906 blockid, i, sizeof(*cih), cih->timestamp, cih->total_length);
2907 }
2908 }
2909 #endif
2910
2911 /*
2912 * SELECT query result array modules
2913 */
2914 POOL_QUERY_CACHE_ARRAY *
pool_create_query_cache_array(void)2915 pool_create_query_cache_array(void)
2916 {
2917 #define POOL_QUERY_CACHE_ARRAY_ALLOCATE_NUM 128
2918 #define POOL_QUERY_CACHE_ARRAY_HEADER_SIZE (sizeof(int)+sizeof(int))
2919
2920 size_t size;
2921 POOL_QUERY_CACHE_ARRAY *p;
2922 POOL_SESSION_CONTEXT *session_context = pool_get_session_context(false);
2923 MemoryContext old_context = MemoryContextSwitchTo(session_context->memory_context);
2924
2925 size = POOL_QUERY_CACHE_ARRAY_HEADER_SIZE + POOL_QUERY_CACHE_ARRAY_ALLOCATE_NUM *
2926 sizeof(POOL_TEMP_QUERY_CACHE *);
2927 p = palloc(size);
2928 p->num_caches = 0;
2929 p->array_size = POOL_QUERY_CACHE_ARRAY_ALLOCATE_NUM;
2930 MemoryContextSwitchTo(old_context);
2931 return p;
2932 }
2933
2934 /*
2935 * Discard query cache array
2936 */
2937 void
pool_discard_query_cache_array(POOL_QUERY_CACHE_ARRAY * cache_array)2938 pool_discard_query_cache_array(POOL_QUERY_CACHE_ARRAY * cache_array)
2939 {
2940 int i;
2941
2942 if (!cache_array)
2943 return;
2944
2945 ereport(DEBUG1,
2946 (errmsg("memcache discarding query cache array"),
2947 errdetail("num_caches: %d", cache_array->num_caches)));
2948
2949 for (i = 0; i < cache_array->num_caches; i++)
2950 {
2951 ereport(DEBUG2,
2952 (errmsg("memcache discarding query cache array"),
2953 errdetail("cache no: %d cache: %p", i, cache_array->caches[i])));
2954 pool_discard_temp_query_cache(cache_array->caches[i]);
2955 }
2956 pfree(cache_array);
2957 }
2958
2959 /*
2960 * Add query cache array
2961 */
pool_add_query_cache_array(POOL_QUERY_CACHE_ARRAY * cache_array,POOL_TEMP_QUERY_CACHE * cache)2962 static POOL_QUERY_CACHE_ARRAY * pool_add_query_cache_array(POOL_QUERY_CACHE_ARRAY * cache_array, POOL_TEMP_QUERY_CACHE * cache)
2963 {
2964 size_t size;
2965 POOL_QUERY_CACHE_ARRAY *cp = cache_array;
2966
2967 if (!cache_array)
2968 return cp;
2969
2970 ereport(DEBUG2,
2971 (errmsg("memcache adding query cache array"),
2972 errdetail("num_caches: %d cache: %p", cache_array->num_caches, cache)));
2973 if (cache_array->num_caches >= cache_array->array_size)
2974 {
2975 cache_array->array_size += POOL_QUERY_CACHE_ARRAY_ALLOCATE_NUM;
2976 size = POOL_QUERY_CACHE_ARRAY_HEADER_SIZE + cache_array->array_size *
2977 sizeof(POOL_TEMP_QUERY_CACHE *);
2978 cache_array = repalloc(cache_array, size);
2979 }
2980 cache_array->caches[cache_array->num_caches++] = cache;
2981 return cache_array;
2982 }
2983
2984 /*
2985 * SELECT query result temporary cache modules
2986 */
2987
2988 /*
2989 * Create SELECT result temporary cache
2990 */
2991 POOL_TEMP_QUERY_CACHE *
pool_create_temp_query_cache(char * query)2992 pool_create_temp_query_cache(char *query)
2993 {
2994 POOL_SESSION_CONTEXT *session_context = pool_get_session_context(false);
2995 MemoryContext old_context = MemoryContextSwitchTo(session_context->memory_context);
2996 POOL_TEMP_QUERY_CACHE *p;
2997
2998 p = palloc(sizeof(*p));
2999 p->query = pstrdup(query);
3000
3001 p->buffer = pool_create_buffer();
3002 p->oids = pool_create_buffer();
3003 p->num_oids = 0;
3004 p->is_exceeded = false;
3005 p->is_discarded = false;
3006
3007 MemoryContextSwitchTo(old_context);
3008
3009 ereport(DEBUG1,
3010 (errmsg("pool_create_temp_query_cache: cache created: %p", p)));
3011
3012 return p;
3013 }
3014
3015 /*
3016 * Discard temp query cache
3017 */
3018 void
pool_discard_temp_query_cache(POOL_TEMP_QUERY_CACHE * temp_cache)3019 pool_discard_temp_query_cache(POOL_TEMP_QUERY_CACHE * temp_cache)
3020 {
3021 if (!temp_cache)
3022 return;
3023
3024 if (temp_cache->query)
3025 pfree(temp_cache->query);
3026 if (temp_cache->buffer)
3027 pool_discard_buffer(temp_cache->buffer);
3028 if (temp_cache->oids)
3029 pool_discard_buffer(temp_cache->oids);
3030
3031 ereport(DEBUG1,
3032 (errmsg("pool_discard_temp_query_cache: cache discarded: %p", temp_cache)));
3033
3034 pfree(temp_cache);
3035 }
3036
3037 /*
3038 * Add data to temp query cache.
3039 * Data must be FE/BE protocol packet.
3040 */
3041 static void
pool_add_temp_query_cache(POOL_TEMP_QUERY_CACHE * temp_cache,char kind,char * data,int data_len)3042 pool_add_temp_query_cache(POOL_TEMP_QUERY_CACHE * temp_cache, char kind, char *data, int data_len)
3043 {
3044 POOL_INTERNAL_BUFFER *buffer;
3045 size_t buflen;
3046 int send_len;
3047
3048 if (temp_cache == NULL)
3049 {
3050 /*
3051 * This could happen if cache exceeded in previous query execution in
3052 * the same unnamed portal.
3053 */
3054 ereport(DEBUG1,
3055 (errmsg("memcache adding temporary query cache"),
3056 errdetail("POOL_TEMP_QUERY_CACHE is NULL")));
3057 return;
3058 }
3059
3060 if (temp_cache->is_exceeded)
3061 {
3062 ereport(DEBUG1,
3063 (errmsg("memcache adding temporary query cache"),
3064 errdetail("memqcache_maxcache exceeds")));
3065 return;
3066 }
3067
3068 /*
3069 * We only store T(Table Description), D(Data row), C(Command Complete),
3070 * 1(ParseComplete), 2(BindComplete)
3071 */
3072 if (kind != 'T' && kind != 'D' && kind != 'C' && kind != '1' && kind != '2')
3073 {
3074 return;
3075 }
3076
3077 /* Check data limit */
3078 buffer = temp_cache->buffer;
3079 buflen = pool_get_buffer_length(buffer);
3080
3081 if ((buflen + data_len + sizeof(int) + 1) > pool_config->memqcache_maxcache)
3082 {
3083 ereport(DEBUG1,
3084 (errmsg("memcache adding temporary query cache"),
3085 errdetail("data size exceeds memqcache_maxcache. current:%zd requested:%zd memq_maxcache:%d",
3086 buflen, data_len + sizeof(int) + 1, pool_config->memqcache_maxcache)));
3087 temp_cache->is_exceeded = true;
3088 return;
3089 }
3090
3091 pool_add_buffer(buffer, &kind, 1);
3092 send_len = htonl(data_len + sizeof(int));
3093 pool_add_buffer(buffer, (char *) &send_len, sizeof(int));
3094 pool_add_buffer(buffer, data, data_len);
3095
3096 return;
3097 }
3098
3099 /*
3100 * Add table oids used by SELECT to temp query cache.
3101 */
3102 static void
pool_add_oids_temp_query_cache(POOL_TEMP_QUERY_CACHE * temp_cache,int num_oids,int * oids)3103 pool_add_oids_temp_query_cache(POOL_TEMP_QUERY_CACHE * temp_cache, int num_oids, int *oids)
3104 {
3105 POOL_INTERNAL_BUFFER *buffer;
3106
3107 if (!temp_cache || num_oids <= 0)
3108 return;
3109
3110 buffer = temp_cache->oids;
3111 pool_add_buffer(buffer, oids, num_oids * sizeof(int));
3112 temp_cache->num_oids = num_oids;
3113 }
3114
3115 /*
3116 * Internal buffer management modules.
3117 * Usage:
3118 * 1) Create buffer using pool_create_buffer().
3119 * 2) Add data to buffer using pool_add_buffer().
3120 * 3) Extract (copied) data from buffer using pool_get_buffer().
3121 * 4) Optionally you can:
3122 * Obtain buffer length by using pool_get_buffer_length().
3123 * Obtain buffer pointer by using pool_get_buffer_pointer().
3124 * 5) Discard buffer using pool_discard_buffer().
3125 */
3126
3127 /*
3128 * Create and return internal buffer
3129 */
pool_create_buffer(void)3130 static POOL_INTERNAL_BUFFER * pool_create_buffer(void)
3131 {
3132 POOL_INTERNAL_BUFFER *p;
3133
3134 p = palloc0(sizeof(*p));
3135 return p;
3136 }
3137
3138 /*
3139 * Discard internal buffer
3140 */
3141 static void
pool_discard_buffer(POOL_INTERNAL_BUFFER * buffer)3142 pool_discard_buffer(POOL_INTERNAL_BUFFER * buffer)
3143 {
3144 if (buffer)
3145 {
3146 if (buffer->buf)
3147 pfree(buffer->buf);
3148 pfree(buffer);
3149 }
3150 }
3151
3152 /*
3153 * Add data to internal buffer
3154 */
3155 static void
pool_add_buffer(POOL_INTERNAL_BUFFER * buffer,void * data,size_t len)3156 pool_add_buffer(POOL_INTERNAL_BUFFER * buffer, void *data, size_t len)
3157 {
3158 #define POOL_ALLOCATE_UNIT 8192
3159
3160 /* Sanity check */
3161 if (!buffer || !data || len == 0)
3162 return;
3163 POOL_SESSION_CONTEXT *session_context = pool_get_session_context(false);
3164 MemoryContext old_context = MemoryContextSwitchTo(session_context->memory_context);
3165
3166 /* Check if we need to increase the buffer size */
3167 if ((buffer->buflen + len) > buffer->bufsize)
3168 {
3169 size_t allocate_size = ((buffer->buflen + len) / POOL_ALLOCATE_UNIT + 1) * POOL_ALLOCATE_UNIT;
3170
3171 ereport(DEBUG2,
3172 (errmsg("memcache adding data to internal buffer"),
3173 errdetail("realloc old size:%zd new size:%zd",
3174 buffer->bufsize, allocate_size)));
3175 buffer->bufsize = allocate_size;
3176 buffer->buf = (char *) repalloc(buffer->buf, buffer->bufsize);
3177 }
3178 /* Add data to buffer */
3179 memcpy(buffer->buf + buffer->buflen, data, len);
3180 buffer->buflen += len;
3181 ereport(DEBUG2,
3182 (errmsg("memcache adding data to internal buffer"),
3183 errdetail("len:%zd, total:%zd bufsize:%zd",
3184 len, buffer->buflen, buffer->bufsize)));
3185 MemoryContextSwitchTo(old_context);
3186 return;
3187 }
3188
3189 /*
3190 * Get data from internal buffer.
3191 * Data is stored in newly malloc memory.
3192 * Data length is returned to len.
3193 */
3194 static void *
pool_get_buffer(POOL_INTERNAL_BUFFER * buffer,size_t * len)3195 pool_get_buffer(POOL_INTERNAL_BUFFER * buffer, size_t *len)
3196 {
3197 void *p;
3198
3199 if (buffer->bufsize == 0 || buffer->buflen == 0 ||
3200 buffer->buf == NULL)
3201 {
3202 *len = 0;
3203 return NULL;
3204 }
3205
3206 p = palloc(buffer->buflen);
3207 memcpy(p, buffer->buf, buffer->buflen);
3208 *len = buffer->buflen;
3209 return p;
3210 }
3211
3212 /*
3213 * Get internal buffer length.
3214 */
3215 static size_t
pool_get_buffer_length(POOL_INTERNAL_BUFFER * buffer)3216 pool_get_buffer_length(POOL_INTERNAL_BUFFER * buffer)
3217 {
3218 if (buffer == NULL)
3219 return 0;
3220
3221 return buffer->buflen;
3222 }
3223
3224 #ifdef NOT_USED
3225 /*
3226 * Get internal buffer pointer.
3227 */
3228 static char *
pool_get_buffer_pointer(POOL_INTERNAL_BUFFER * buffer)3229 pool_get_buffer_pointer(POOL_INTERNAL_BUFFER * buffer)
3230 {
3231 if (buffer == NULL)
3232 return NULL;
3233 return buffer->buf;
3234 }
3235 #endif
3236 /*
3237 * Get query cache buffer struct of current query context
3238 */
3239 POOL_TEMP_QUERY_CACHE *
pool_get_current_cache(void)3240 pool_get_current_cache(void)
3241 {
3242 POOL_SESSION_CONTEXT *session_context;
3243 POOL_QUERY_CONTEXT *query_context;
3244 POOL_TEMP_QUERY_CACHE *p = NULL;
3245
3246 session_context = pool_get_session_context(true);
3247 if (session_context)
3248 {
3249 query_context = session_context->query_context;
3250 if (query_context)
3251 {
3252 p = query_context->temp_cache;
3253 }
3254 }
3255 return p;
3256 }
3257
3258 /*
3259 * Get query cache buffer of current query context
3260 */
3261 static char *
pool_get_current_cache_buffer(size_t * len)3262 pool_get_current_cache_buffer(size_t *len)
3263 {
3264 char *p = NULL;
3265
3266 *len = 0;
3267 POOL_TEMP_QUERY_CACHE *cache;
3268
3269 cache = pool_get_current_cache();
3270 if (cache)
3271 {
3272 p = pool_get_buffer(cache->buffer, len);
3273 }
3274 return p;
3275 }
3276
3277 /*
3278 * Mark this temporary query cache buffer discarded if the SELECT
3279 * uses the table oid specified by oids.
3280 */
3281 static void
pool_check_and_discard_cache_buffer(int num_oids,int * oids)3282 pool_check_and_discard_cache_buffer(int num_oids, int *oids)
3283 {
3284 POOL_SESSION_CONTEXT *session_context;
3285 POOL_TEMP_QUERY_CACHE *cache;
3286 int num_caches;
3287 size_t len;
3288 int *soids;
3289 int i,
3290 j,
3291 k;
3292
3293 session_context = pool_get_session_context(true);
3294
3295 if (!session_context || !session_context->query_cache_array)
3296 return;
3297
3298 num_caches = session_context->query_cache_array->num_caches;
3299
3300 for (i = 0; i < num_caches; i++)
3301 {
3302 cache = session_context->query_cache_array->caches[i];
3303 if (!cache || cache->is_discarded)
3304 continue;
3305
3306 soids = (int *) pool_get_buffer(cache->oids, &len);
3307 if (!soids || !len)
3308 continue;
3309
3310 for (j = 0; j < cache->num_oids; j++)
3311 {
3312 if (cache->is_discarded)
3313 break;
3314
3315 for (k = 0; k < num_oids; k++)
3316 {
3317 if (soids[j] == oids[k])
3318 {
3319 ereport(DEBUG1,
3320 (errmsg("discard cache for \"%s\"", cache->query)));
3321 cache->is_discarded = true;
3322 break;
3323 }
3324 }
3325 }
3326 pfree(soids);
3327 }
3328 }
3329
3330 /*
3331 * At Ready for Query or Comand Complete handle query cache. For streaming
3332 * replication mode and extended query at Comand Complete handle query cache.
3333 * For other case At Ready for Query handle query cache.
3334 */
3335 void
pool_handle_query_cache(POOL_CONNECTION_POOL * backend,char * query,Node * node,char state)3336 pool_handle_query_cache(POOL_CONNECTION_POOL * backend, char *query, Node *node, char state)
3337 {
3338 POOL_SESSION_CONTEXT *session_context;
3339 pool_sigset_t oldmask;
3340 char *cache_buffer;
3341 size_t len;
3342 int num_oids;
3343 int *oids;
3344 int i;
3345
3346 session_context = pool_get_session_context(true);
3347
3348 /* Ok to cache SELECT result? */
3349 if (pool_is_cache_safe())
3350 {
3351 SelectContext ctx;
3352 MemoryContext old_context;
3353
3354 old_context = MemoryContextSwitchTo(session_context->memory_context);
3355 num_oids = pool_extract_table_oids_from_select_stmt(node, &ctx);
3356 MemoryContextSwitchTo(old_context);
3357 oids = ctx.table_oids;
3358 ereport(DEBUG2,
3359 (errmsg("query cache handler for ReadyForQuery"),
3360 errdetail("num_oids: %d oid: %d", num_oids, *oids)));
3361
3362 if (state == 'I') /* Not inside a transaction? */
3363 {
3364 /*
3365 * Make sure that temporary cache is not exceeded.
3366 */
3367 if (!pool_is_cache_exceeded())
3368 {
3369 POOL_TEMP_QUERY_CACHE *cache;
3370
3371 /*
3372 * If we are not inside a transaction, we can immediately
3373 * register to cache storage.
3374 */
3375 /* Register to memcached or shmem */
3376 POOL_SETMASK2(&BlockSig, &oldmask);
3377 pool_shmem_lock();
3378
3379 cache_buffer = pool_get_current_cache_buffer(&len);
3380 if (cache_buffer)
3381 {
3382 if (session_context->query_context->skip_cache_commit == false)
3383 {
3384 if (pool_commit_cache(backend, query, cache_buffer, len, num_oids, oids) != 0)
3385 {
3386 ereport(WARNING,
3387 (errmsg("ReadyForQuery: pool_commit_cache failed")));
3388 }
3389 }
3390
3391 /*
3392 * Reset temporary query cache buffer. This is necessary
3393 * if extended query protocol is used and a bind/execute
3394 * message arrives which uses a statement created by prior
3395 * parse message. In this case since the temp_cache is not
3396 * initialized by a parse message, messages are added to
3397 * pre existing temp cache buffer. The problem was found
3398 * in bug#152.
3399 * http://www.pgpool.net/mantisbt/view.php?id=152
3400 */
3401 cache = pool_get_current_cache();
3402 ereport(DEBUG1,
3403 (errmsg("pool_handle_query_cache: temp_cache: %p", cache)));
3404 pool_discard_temp_query_cache(cache);
3405
3406 if (SL_MODE && pool_is_doing_extended_query_message())
3407 session_context->query_context->temp_cache = NULL;
3408 else
3409 session_context->query_context->temp_cache = pool_create_temp_query_cache(query);
3410 pfree(cache_buffer);
3411 }
3412 pool_shmem_unlock();
3413 POOL_SETMASK(&oldmask);
3414 }
3415
3416 /* Count up SELECT stats */
3417 pool_stats_count_up_num_selects(1);
3418
3419 /* Reset temp buffer */
3420 pool_reset_memqcache_buffer(true);
3421 }
3422 else
3423 {
3424 POOL_TEMP_QUERY_CACHE *cache = pool_get_current_cache();
3425
3426 /* In transaction. Keep to temp query cache array */
3427 pool_add_oids_temp_query_cache(cache, num_oids, oids);
3428
3429 /*
3430 * If temp cache has been overflowed, just trash the half baked
3431 * temp cache.
3432 */
3433 if (pool_is_cache_exceeded())
3434 {
3435 POOL_TEMP_QUERY_CACHE *cache;
3436
3437 cache = pool_get_current_cache();
3438 pool_discard_temp_query_cache(cache);
3439
3440 /*
3441 * Reset temp_cache pointer in the current query context so
3442 * that we don't double free memory.
3443 */
3444 session_context->query_context->temp_cache = NULL;
3445
3446 }
3447
3448 /*
3449 * Otherwise add to the temp cache array.
3450 */
3451 else
3452 {
3453 session_context->query_cache_array =
3454 pool_add_query_cache_array(session_context->query_cache_array, cache);
3455
3456 /*
3457 * Reset temp_cache pointer in the current query context so
3458 * that we don't add the same temp cache to the cache array.
3459 * This is necessary such that case when next query is just a
3460 * "bind message", without "parse message". In the case the
3461 * query context is reused and same cache pointer will be
3462 * added to the query_cache_array which we do not want.
3463 */
3464 session_context->query_context->temp_cache = NULL;
3465 }
3466
3467 /* Count up temporary SELECT stats */
3468 pool_tmp_stats_count_up_num_selects();
3469 }
3470 }
3471 else if (is_rollback_query(node)) /* Rollback? */
3472 {
3473 /* Discard buffered data */
3474 pool_reset_memqcache_buffer(true);
3475 }
3476 else if (is_commit_query(node)) /* Commit? */
3477 {
3478 int num_caches;
3479
3480 POOL_SETMASK2(&BlockSig, &oldmask);
3481 pool_shmem_lock();
3482
3483 /* Invalidate query cache */
3484 if (pool_config->memqcache_auto_cache_invalidation)
3485 {
3486 num_oids = pool_get_dml_table_oid(&oids);
3487 pool_invalidate_query_cache(num_oids, oids, true, 0);
3488 }
3489
3490 /*--------------------------------------------------------------------
3491 * If we have something in the query cache buffer, that means either:
3492 * - We only had SELECTs in the transaction
3493 * - We had only SELECTs after the last DML
3494 * Thus we can register SELECT results to cache storage.
3495 *--------------------------------------------------------------------
3496 */
3497 num_caches = session_context->query_cache_array->num_caches;
3498 for (i = 0; i < num_caches; i++)
3499 {
3500 POOL_TEMP_QUERY_CACHE *cache;
3501
3502 cache = session_context->query_cache_array->caches[i];
3503 if (!cache || cache->is_discarded)
3504 continue;
3505
3506 num_oids = cache->num_oids;
3507 oids = pool_get_buffer(cache->oids, &len);
3508 cache_buffer = pool_get_buffer(cache->buffer, &len);
3509
3510 if (pool_commit_cache(backend, cache->query, cache_buffer, len, num_oids, oids) != 0)
3511 {
3512 ereport(WARNING,
3513 (errmsg("ReadyForQuery: pool_commit_cache failed")));
3514 }
3515 if (oids)
3516 pfree(oids);
3517 if (cache_buffer)
3518 pfree(cache_buffer);
3519 }
3520 pool_shmem_unlock();
3521 POOL_SETMASK(&oldmask);
3522
3523 /* Count up number of SELECT stats */
3524 pool_stats_count_up_num_selects(pool_tmp_stats_get_num_selects());
3525
3526 pool_reset_memqcache_buffer(true);
3527 }
3528 else /* Non cache safe queries */
3529 {
3530 /* Non cachable SELECT */
3531 if (node && IsA(node, SelectStmt))
3532 {
3533 /* Extract table oids from buffer */
3534 num_oids = pool_get_dml_table_oid(&oids);
3535
3536 if (state == 'I')
3537 {
3538 /*
3539 * If Data-modifying statements in SELECT's WITH clause,
3540 * invalidate query cache.
3541 */
3542 if (num_oids > 0 && pool_config->memqcache_auto_cache_invalidation)
3543 {
3544 POOL_SETMASK2(&BlockSig, &oldmask);
3545 pool_shmem_lock();
3546 pool_invalidate_query_cache(num_oids, oids, true, 0);
3547 pool_shmem_unlock();
3548 POOL_SETMASK(&oldmask);
3549 }
3550
3551 /* Count up SELECT stats */
3552 pool_stats_count_up_num_selects(1);
3553 pool_reset_memqcache_buffer(true);
3554 }
3555 else
3556 {
3557 /*
3558 * If we are inside a transaction, we cannot invalidate
3559 * query cache yet. However we can clear cache buffer, if
3560 * DML/DDL modifies the TABLE which SELECT uses.
3561 */
3562 if (num_oids > 0 && pool_config->memqcache_auto_cache_invalidation)
3563 {
3564 pool_check_and_discard_cache_buffer(num_oids, oids);
3565 pool_reset_memqcache_buffer(false);
3566 }
3567
3568 /* Count up temporary SELECT stats */
3569 pool_tmp_stats_count_up_num_selects();
3570 }
3571 }
3572
3573 /*
3574 * If the query is DROP DATABASE, discard both of caches in
3575 * shmem/memcached and oidmap in memqcache_oiddir.
3576 */
3577 else if (is_drop_database(node) && session_context->query_context->dboid != 0)
3578 {
3579 int dboid = session_context->query_context->dboid;
3580
3581 num_oids = pool_get_dropdb_table_oids(&oids, dboid);
3582
3583 if (num_oids > 0 && pool_config->memqcache_auto_cache_invalidation)
3584 {
3585 pool_shmem_lock();
3586 pool_invalidate_query_cache(num_oids, oids, true, dboid);
3587 pool_discard_oid_maps_by_db(dboid);
3588 pool_shmem_unlock();
3589 pool_reset_memqcache_buffer(true);
3590
3591 pfree(oids);
3592 ereport(DEBUG2,
3593 (errmsg("query cache handler for ReadyForQuery"),
3594 errdetail("deleted all cache files for the DROPped DB")));
3595 }
3596 }
3597 else
3598 {
3599 /*
3600 * DML/DCL/DDL case
3601 */
3602
3603 /* Extract table oids from buffer */
3604 num_oids = pool_get_dml_table_oid(&oids);
3605 if (num_oids > 0 && pool_config->memqcache_auto_cache_invalidation)
3606 {
3607 /*
3608 * If we are not inside a transaction, we can immediately
3609 * invalidate query cache.
3610 */
3611 if (state == 'I')
3612 {
3613 POOL_SETMASK2(&BlockSig, &oldmask);
3614 pool_shmem_lock();
3615 pool_invalidate_query_cache(num_oids, oids, true, 0);
3616 pool_shmem_unlock();
3617 POOL_SETMASK(&oldmask);
3618 pool_reset_memqcache_buffer(true);
3619 }
3620 else
3621 {
3622 /*
3623 * If we are inside a transaction, we cannot invalidate
3624 * query cache yet. However we can clear cache buffer, if
3625 * DML/DDL modifies the TABLE which SELECT uses.
3626 */
3627 pool_check_and_discard_cache_buffer(num_oids, oids);
3628 pool_reset_memqcache_buffer(false);
3629 }
3630 }
3631 else if (num_oids == 0)
3632 {
3633 /*
3634 * It is also necessary to clear cache buffers in case of no
3635 * oid queries (like BEGIN, CHECKPOINT, VACUUM, etc) too.
3636 */
3637 pool_reset_memqcache_buffer(true);
3638 }
3639 }
3640 }
3641 }
3642
3643 /*
3644 * Create and initialize query cache stats
3645 */
3646 static POOL_QUERY_CACHE_STATS * stats;
3647 int
pool_init_memqcache_stats(void)3648 pool_init_memqcache_stats(void)
3649 {
3650 stats = pool_shared_memory_create(sizeof(POOL_QUERY_CACHE_STATS));
3651 pool_reset_memqcache_stats();
3652 return 0;
3653 }
3654
3655 /*
3656 * Returns copy of stats area. The copy is in static area and will be
3657 * overwritten by next call to this function.
3658 */
3659 POOL_QUERY_CACHE_STATS *
pool_get_memqcache_stats(void)3660 pool_get_memqcache_stats(void)
3661 {
3662 static POOL_QUERY_CACHE_STATS mystats;
3663 pool_sigset_t oldmask;
3664
3665 memset(&mystats, 0, sizeof(POOL_QUERY_CACHE_STATS));
3666
3667 if (stats)
3668 {
3669 POOL_SETMASK2(&BlockSig, &oldmask);
3670 pool_semaphore_lock(QUERY_CACHE_STATS_SEM);
3671 memcpy(&mystats, stats, sizeof(POOL_QUERY_CACHE_STATS));
3672 pool_semaphore_unlock(QUERY_CACHE_STATS_SEM);
3673 POOL_SETMASK(&oldmask);
3674 }
3675
3676 return &mystats;
3677 }
3678
3679 /*
3680 * Reset query cache stats. Caller must lock QUERY_CACHE_STATS_SEM if
3681 * necessary.
3682 */
3683 void
pool_reset_memqcache_stats(void)3684 pool_reset_memqcache_stats(void)
3685 {
3686 memset(stats, 0, sizeof(POOL_QUERY_CACHE_STATS));
3687 stats->start_time = time(NULL);
3688 }
3689
3690 /*
3691 * Count up number of successful SELECTs and returns the number.
3692 * QUERY_CACHE_STATS_SEM lock is acquired in this function.
3693 */
3694 long long int
pool_stats_count_up_num_selects(long long int num)3695 pool_stats_count_up_num_selects(long long int num)
3696 {
3697 pool_sigset_t oldmask;
3698
3699 POOL_SETMASK2(&BlockSig, &oldmask);
3700 pool_semaphore_lock(QUERY_CACHE_STATS_SEM);
3701 stats->num_selects += num;
3702 pool_semaphore_unlock(QUERY_CACHE_STATS_SEM);
3703 POOL_SETMASK(&oldmask);
3704 return stats->num_selects;
3705 }
3706
3707 /*
3708 * Count up number of successful SELECTs in temporary area and returns
3709 * the number.
3710 */
3711 long long int
pool_tmp_stats_count_up_num_selects(void)3712 pool_tmp_stats_count_up_num_selects(void)
3713 {
3714 POOL_SESSION_CONTEXT *session_context;
3715
3716 session_context = pool_get_session_context(false);
3717 session_context->num_selects++;
3718 return session_context->num_selects;
3719 }
3720
3721 /*
3722 * Return number of successful SELECTs in temporary area.
3723 */
3724 long long int
pool_tmp_stats_get_num_selects(void)3725 pool_tmp_stats_get_num_selects(void)
3726 {
3727 POOL_SESSION_CONTEXT *session_context;
3728
3729 session_context = pool_get_session_context(false);
3730 return session_context->num_selects;
3731 }
3732
3733 /*
3734 * Reset number of successful SELECTs in temporary area.
3735 */
3736 void
pool_tmp_stats_reset_num_selects(void)3737 pool_tmp_stats_reset_num_selects(void)
3738 {
3739 POOL_SESSION_CONTEXT *session_context;
3740
3741 session_context = pool_get_session_context(false);
3742 session_context->num_selects = 0;
3743 }
3744
3745 /*
3746 * Count up number of SELECTs extracted from cache returns the number.
3747 * QUERY_CACHE_STATS_SEM lock is acquired in this function.
3748 */
3749 long long int
pool_stats_count_up_num_cache_hits(void)3750 pool_stats_count_up_num_cache_hits(void)
3751 {
3752 pool_sigset_t oldmask;
3753
3754 POOL_SETMASK2(&BlockSig, &oldmask);
3755 pool_semaphore_lock(QUERY_CACHE_STATS_SEM);
3756 stats->num_cache_hits++;
3757 pool_semaphore_unlock(QUERY_CACHE_STATS_SEM);
3758 POOL_SETMASK(&oldmask);
3759 return stats->num_cache_hits;
3760 }
3761
3762 /*
3763 * On shared memory hash table implementation. We use sub part of md5
3764 * hash key as hash function. The experiment has shown that has_any()
3765 * of PostgreSQL is a little bit better than the method using part of
3766 * md5 hash value, but it seems adding some cpu cycles to call
3767 * hash_any() is not worth the trouble.
3768 */
3769
3770 static volatile POOL_HASH_HEADER *hash_header;
3771 static volatile POOL_HASH_ELEMENT *hash_elements;
3772 static volatile POOL_HASH_ELEMENT *hash_free;
3773
3774 /*
3775 * Initialize hash table on shared memory "nelements" is max number of
3776 * hash keys. The actual number of hash key is rounded up to power of
3777 * 2.
3778 */
3779 #undef POOL_HASH_DEBUG
3780
3781 int
pool_hash_init(int nelements)3782 pool_hash_init(int nelements)
3783 {
3784 size_t size;
3785 int nelements2; /* number of rounded up hash keys */
3786 int shift;
3787 uint32 mask;
3788 POOL_HASH_HEADER hh;
3789 int i;
3790
3791 if (nelements <= 0)
3792 ereport(ERROR,
3793 (errmsg("initializing hash table on shared memory, invalid number of elements: %d", nelements)));
3794
3795 /* Round up to power of 2 */
3796 shift = 32;
3797 nelements2 = 1;
3798 do
3799 {
3800 nelements2 <<= 1;
3801 shift--;
3802 } while (nelements2 < nelements);
3803
3804 mask = ~0;
3805 mask >>= shift;
3806 size = (char *) &hh.elements - (char *) &hh + sizeof(POOL_HEADER_ELEMENT) * nelements2;
3807 hash_header = pool_shared_memory_create(size);
3808 hash_header->nhash = nelements2;
3809 hash_header->mask = mask;
3810
3811 #ifdef POOL_HASH_DEBUG
3812 ereport(LOG,
3813 (errmsg("initializing hash table on shared memory"),
3814 errdetail("size:%zd nelements2:%d", size, nelements2)));
3815
3816 #endif
3817
3818 size = sizeof(POOL_HASH_ELEMENT) * nelements2;
3819 hash_elements = pool_shared_memory_create(size);
3820
3821 #ifdef POOL_HASH_DEBUG
3822 ereport(LOG,
3823 (errmsg("initializing hash table on shared memory"),
3824 errdetail("size:%zd nelements2:%d", size, nelements2)));
3825 #endif
3826
3827 for (i = 0; i < nelements2 - 1; i++)
3828 {
3829 hash_elements[i].next = (POOL_HASH_ELEMENT *) & hash_elements[i + 1];
3830 }
3831 hash_elements[nelements2 - 1].next = NULL;
3832 hash_free = hash_elements;
3833
3834 return 0;
3835 }
3836
3837 /*
3838 * Reset hash table on shared memory "nelements" is max number of
3839 * hash keys. The actual number of hash key is rounded up to power of
3840 * 2.
3841 */
3842 static int
pool_hash_reset(int nelements)3843 pool_hash_reset(int nelements)
3844 {
3845 size_t size;
3846 int nelements2; /* number of rounded up hash keys */
3847 int shift;
3848 uint32 mask;
3849 POOL_HASH_HEADER hh;
3850 int i;
3851
3852 if (nelements <= 0)
3853 ereport(ERROR,
3854 (errmsg("clearing hash table on shared memory, invalid number of elements: %d", nelements)));
3855
3856 /* Round up to power of 2 */
3857 shift = 32;
3858 nelements2 = 1;
3859 do
3860 {
3861 nelements2 <<= 1;
3862 shift--;
3863 } while (nelements2 < nelements);
3864
3865 mask = ~0;
3866 mask >>= shift;
3867
3868 size = (char *) &hh.elements - (char *) &hh + sizeof(POOL_HEADER_ELEMENT) * nelements2;
3869 memset((void *) hash_header, 0, size);
3870
3871 hash_header->nhash = nelements2;
3872 hash_header->mask = mask;
3873
3874 size = sizeof(POOL_HASH_ELEMENT) * nelements2;
3875 memset((void *) hash_elements, 0, size);
3876
3877 for (i = 0; i < nelements2 - 1; i++)
3878 {
3879 hash_elements[i].next = (POOL_HASH_ELEMENT *) & hash_elements[i + 1];
3880 }
3881 hash_elements[nelements2 - 1].next = NULL;
3882 hash_free = hash_elements;
3883
3884 return 0;
3885 }
3886
3887 /*
3888 * Search cacheid by MD5 hash key string
3889 * If found, returns cache id, otherwise NULL.
3890 */
3891 POOL_CACHEID *
pool_hash_search(POOL_QUERY_HASH * key)3892 pool_hash_search(POOL_QUERY_HASH * key)
3893 {
3894 volatile POOL_HASH_ELEMENT *element;
3895
3896 uint32 hash_key = create_hash_key(key);
3897
3898 if (hash_key >= hash_header->nhash)
3899 {
3900 ereport(WARNING,
3901 (errmsg("memcache: searching cacheid from hash. invalid hash key"),
3902 errdetail("invalid hash key: %uld nhash: %ld",
3903 hash_key, hash_header->nhash)));
3904 return NULL;
3905 }
3906
3907 {
3908 char md5[POOL_MD5_HASHKEYLEN + 1];
3909
3910 memcpy(md5, key->query_hash, POOL_MD5_HASHKEYLEN);
3911 md5[POOL_MD5_HASHKEYLEN] = '\0';
3912 #ifdef POOL_HASH_DEBUG
3913 ereport(LOG,
3914 (errmsg("searching hash table"),
3915 errdetail("hash_key:%d md5:%s", hash_key, md5)));
3916 #endif
3917 }
3918
3919 element = hash_header->elements[hash_key].element;
3920 while (element)
3921 {
3922 {
3923 char md5[POOL_MD5_HASHKEYLEN + 1];
3924
3925 memcpy(md5, key->query_hash, POOL_MD5_HASHKEYLEN);
3926 md5[POOL_MD5_HASHKEYLEN] = '\0';
3927 #ifdef POOL_HASH_DEBUG
3928 ereport(LOG,
3929 (errmsg("searching hash table"),
3930 errdetail("element md5:%s", md5)));
3931 #endif
3932 }
3933
3934 if (memcmp((const void *) element->hashkey.query_hash,
3935 (const void *) key->query_hash, sizeof(key->query_hash)) == 0)
3936 {
3937 return (POOL_CACHEID *) & element->cacheid;
3938 }
3939 element = element->next;
3940 }
3941 return NULL;
3942 }
3943
3944 /*
3945 * Insert MD5 key and associated cache id into shmem hash table. If
3946 * "update" is true, replace cacheid associated with the MD5 key,
3947 * rather than throw an error.
3948 */
3949 static int
pool_hash_insert(POOL_QUERY_HASH * key,POOL_CACHEID * cacheid,bool update)3950 pool_hash_insert(POOL_QUERY_HASH * key, POOL_CACHEID * cacheid, bool update)
3951 {
3952 POOL_HASH_ELEMENT *element;
3953 POOL_HASH_ELEMENT *new_element;
3954
3955 uint32 hash_key = create_hash_key(key);
3956
3957 if (hash_key >= hash_header->nhash)
3958 {
3959 ereport(WARNING,
3960 (errmsg("memcache: adding cacheid to hash. invalid hash key"),
3961 errdetail("invalid hash key: %uld nhash: %ld",
3962 hash_key, hash_header->nhash)));
3963 return -1;
3964 }
3965
3966 {
3967 char md5[POOL_MD5_HASHKEYLEN + 1];
3968
3969 memcpy(md5, key->query_hash, POOL_MD5_HASHKEYLEN);
3970 md5[POOL_MD5_HASHKEYLEN] = '\0';
3971 #ifdef POOL_HASH_DEBUG
3972 ereport(LOG,
3973 (errmsg("searching hash table"),
3974 errdetail("hash_key:%d md5:%s block:%d item:%d", hash_key, md5, cacheid->blockid, cacheid->itemid)));
3975 #endif
3976 }
3977
3978 /*
3979 * Look for hash key.
3980 */
3981 element = hash_header->elements[hash_key].element;
3982
3983 while (element)
3984 {
3985 if (memcmp((const void *) element->hashkey.query_hash,
3986 (const void *) key->query_hash, sizeof(key->query_hash)) == 0)
3987 {
3988 /* Hash key found. If "update" is false, just throw an error. */
3989 char md5[POOL_MD5_HASHKEYLEN + 1];
3990
3991 if (!update)
3992 {
3993 memcpy(md5, key->query_hash, POOL_MD5_HASHKEYLEN);
3994 md5[POOL_MD5_HASHKEYLEN] = '\0';
3995 ereport(LOG,
3996 (errmsg("memcache: adding cacheid to hash. hash key:\"%s\" already exists", md5)));
3997 return -1;
3998 }
3999 else
4000 {
4001 /* Update cache id */
4002 memcpy((void *) &element->cacheid, cacheid, sizeof(POOL_CACHEID));
4003 return 0;
4004 }
4005 }
4006 element = element->next;
4007 }
4008
4009 /*
4010 * Ok, same key did not exist. Just insert new hash key.
4011 */
4012 new_element = (POOL_HASH_ELEMENT *) get_new_hash_element();
4013 if (!new_element)
4014 {
4015 ereport(LOG,
4016 (errmsg("memcache: adding cacheid to hash. failed to get new element")));
4017 return -1;
4018 }
4019
4020 element = hash_header->elements[hash_key].element;
4021
4022 hash_header->elements[hash_key].element = new_element;
4023 new_element->next = element;
4024
4025 memcpy((void *) new_element->hashkey.query_hash, key->query_hash, POOL_MD5_HASHKEYLEN);
4026 memcpy((void *) &new_element->cacheid, cacheid, sizeof(POOL_CACHEID));
4027
4028 return 0;
4029 }
4030
4031 /*
4032 * Delete MD5 key and associated cache id from shmem hash table.
4033 */
4034 int
pool_hash_delete(POOL_QUERY_HASH * key)4035 pool_hash_delete(POOL_QUERY_HASH * key)
4036 {
4037 POOL_HASH_ELEMENT *element;
4038 POOL_HASH_ELEMENT **delete_point;
4039 bool found;
4040
4041 uint32 hash_key = create_hash_key(key);
4042
4043 if (hash_key >= hash_header->nhash)
4044 {
4045 ereport(LOG,
4046 (errmsg("memcache: deleting key from hash. invalid key"),
4047 errdetail("invalid hash key: %uld nhash: %ld",
4048 hash_key, hash_header->nhash)));
4049 return -1;
4050 }
4051
4052 /*
4053 * Look for delete location
4054 */
4055 found = false;
4056 delete_point = (POOL_HASH_ELEMENT * *) & (hash_header->elements[hash_key].element);
4057 element = hash_header->elements[hash_key].element;
4058
4059 while (element)
4060 {
4061 if (memcmp(element->hashkey.query_hash, key->query_hash, sizeof(key->query_hash)) == 0)
4062 {
4063 found = true;
4064 break;
4065 }
4066 delete_point = &element->next;
4067 element = element->next;
4068 }
4069
4070 if (!found)
4071 {
4072 char md5[POOL_MD5_HASHKEYLEN + 1];
4073
4074 memcpy(md5, key->query_hash, POOL_MD5_HASHKEYLEN);
4075 md5[POOL_MD5_HASHKEYLEN] = '\0';
4076 ereport(LOG,
4077 (errmsg("memcache: deleting key from hash. key:\"%s\" not found", md5)));
4078 return -1;
4079 }
4080
4081 /*
4082 * Put back the element to free list
4083 */
4084 *delete_point = element->next;
4085 put_back_hash_element(element);
4086
4087 return 0;
4088 }
4089
4090 /*
4091 * Calculate 32bit binary hash key (i.e. location in hash header) from MD5
4092 * string. We use top most 8 characters of MD5 string for calculation.
4093 */
4094 static uint32
create_hash_key(POOL_QUERY_HASH * key)4095 create_hash_key(POOL_QUERY_HASH * key)
4096 {
4097 #define POOL_HASH_NCHARS 8
4098
4099 char md5[POOL_HASH_NCHARS + 1];
4100 uint32 mask;
4101
4102 memcpy(md5, key->query_hash, POOL_HASH_NCHARS);
4103 md5[POOL_HASH_NCHARS] = '\0';
4104 mask = strtoul(md5, NULL, 16);
4105 mask &= hash_header->mask;
4106 return mask;
4107 }
4108
4109 /*
4110 * Get new free hash element from free list.
4111 */
4112 static volatile POOL_HASH_ELEMENT *
get_new_hash_element(void)4113 get_new_hash_element(void)
4114 {
4115 volatile POOL_HASH_ELEMENT *elm;
4116
4117 if (!hash_free->next)
4118 {
4119 /* No free element */
4120 return NULL;
4121 }
4122
4123 #ifdef POOL_HASH_DEBUG
4124 ereport(LOG,
4125 (errmsg("getting new hash element"),
4126 errdetail("hash_free->next:%p hash_free->next->next:%p",
4127 hash_free->next, hash_free->next->next)));
4128 #endif
4129
4130 elm = hash_free->next;
4131 hash_free->next = elm->next;
4132
4133 return elm;
4134 }
4135
4136 /*
4137 * Put back hash element to free list.
4138 */
4139 static void
put_back_hash_element(volatile POOL_HASH_ELEMENT * element)4140 put_back_hash_element(volatile POOL_HASH_ELEMENT * element)
4141 {
4142 POOL_HASH_ELEMENT *elm;
4143
4144 #ifdef POOL_HASH_DEBUG
4145 ereport(LOG,
4146 (errmsg("getting new hash element"),
4147 errdetail("hash_free->next:%p hash_free->next->next:%p",
4148 hash_free->next, hash_free->next->next)));
4149 #endif
4150
4151 elm = hash_free->next;
4152 hash_free->next = (POOL_HASH_ELEMENT *) element;
4153 element->next = elm;
4154 }
4155
4156 /*
4157 * Return true if there's a free hash element.
4158 */
4159 static bool
is_free_hash_element(void)4160 is_free_hash_element(void)
4161 {
4162 return hash_free->next != NULL;
4163 }
4164
4165 /*
4166 * Returns shared memory cache stats.
4167 * Subsequent call to this function will break return value
4168 * because its in static memory.
4169 * Caller must hold shmem_lock before calling this function.
4170 * If in memory query cache is not enabled, all stats are 0.
4171 */
4172 POOL_SHMEM_STATS *
pool_get_shmem_storage_stats(void)4173 pool_get_shmem_storage_stats(void)
4174 {
4175 static POOL_SHMEM_STATS mystats;
4176 POOL_HASH_ELEMENT *element;
4177 int nblocks;
4178 int i;
4179
4180 memset(&mystats, 0, sizeof(POOL_SHMEM_STATS));
4181
4182 if (!pool_config->memory_cache_enabled)
4183 return &mystats;
4184
4185 /*
4186 * Copy cache hit data
4187 */
4188 mystats.cache_stats.num_selects = stats->num_selects;
4189 mystats.cache_stats.num_cache_hits = stats->num_cache_hits;
4190
4191 if (pool_config->memqcache_method != SHMEM_CACHE)
4192 return &mystats;
4193
4194 /* number of total hash entries */
4195 mystats.num_hash_entries = hash_header->nhash;
4196
4197 /* number of used hash entries */
4198 for (i = 0; i < hash_header->nhash; i++)
4199 {
4200 element = hash_header->elements[i].element;
4201 while (element)
4202 {
4203 mystats.used_hash_entries++;
4204 element = element->next;
4205 }
4206 }
4207
4208 nblocks = pool_get_memqcache_blocks();
4209
4210 for (i = 0; i < nblocks; i++)
4211 {
4212 POOL_CACHE_BLOCK_HEADER *bh;
4213 POOL_CACHE_ITEM_POINTER *cip;
4214 char *p = block_address(i);
4215
4216 bh = (POOL_CACHE_BLOCK_HEADER *) p;
4217 int j;
4218
4219 if (bh->flags & POOL_BLOCK_USED)
4220 {
4221 for (j = 0; j < bh->num_items; j++)
4222 {
4223 cip = item_pointer(p, j);
4224 if (POOL_ITEM_DELETED & cip->flags)
4225 {
4226 mystats.fragment_cache_entries_size += item_header(p, j)->total_length;
4227 }
4228 else
4229 {
4230 /* number of used cache entries */
4231 mystats.num_cache_entries++;
4232 /* total size of used cache entries */
4233 mystats.used_cache_entries_size += (item_header(p, j)->total_length + sizeof(POOL_CACHE_ITEM_POINTER));
4234 }
4235 }
4236 mystats.used_cache_entries_size += sizeof(POOL_CACHE_BLOCK_HEADER);
4237 /* total size of free(usable) cache entries */
4238 mystats.free_cache_entries_size += bh->free_bytes;
4239 }
4240 else
4241 {
4242 mystats.free_cache_entries_size += pool_config->memqcache_cache_block_size;
4243 }
4244 }
4245
4246 /*
4247 * Copy POOL_QUERY_CACHE_STATS
4248 */
4249 memcpy(&mystats.cache_stats, stats, sizeof(mystats.cache_stats));
4250
4251 return &mystats;
4252 }
4253
4254 /*
4255 * Inject cached message to the target backend buffer to pretend as if backend
4256 * actually replies with Data row and Command Complete message.
4257 */
4258 static void
inject_cached_message(POOL_CONNECTION * backend,char * qcache,int qcachelen)4259 inject_cached_message(POOL_CONNECTION * backend, char *qcache, int qcachelen)
4260 {
4261 char kind;
4262 int len;
4263 char *buf;
4264 int timeout;
4265 int i = 0;
4266 bool is_prepared_stmt = false;
4267 POOL_SESSION_CONTEXT *session_context;
4268 POOL_QUERY_CONTEXT *query_context;
4269 POOL_PENDING_MESSAGE *msg;
4270
4271 session_context = pool_get_session_context(false);
4272 query_context = session_context->query_context;
4273 msg = pool_pending_message_find_lastest_by_query_context(query_context);
4274
4275 if (msg)
4276 {
4277 /*
4278 * If pending message found, we should extract target backend from it
4279 */
4280 int backend_id;
4281
4282 backend_id = pool_pending_message_get_target_backend_id(msg);
4283 backend = CONNECTION(session_context->backend, backend_id);
4284 timeout = -1;
4285 }
4286 else
4287 timeout = 0;
4288
4289 /* Send flush messsage to backend to retrieve response of backend */
4290 pool_write(backend, "H", 1);
4291 len = htonl(sizeof(len));
4292 pool_write_and_flush(backend, &len, sizeof(len));
4293
4294 /*
4295 * Push any response from backend
4296 */
4297 for (;;)
4298 {
4299 /* check if there's any pending data */
4300 if (!pool_ssl_pending(backend) && pool_read_buffer_is_empty(backend))
4301 {
4302 pool_set_timeout(timeout);
4303 if (pool_check_fd(backend) != 0)
4304 {
4305 ereport(DEBUG1,
4306 (errmsg("inject_cached_message: select shows no pending data")));
4307 pool_set_timeout(-1);
4308 break;
4309 }
4310 pool_set_timeout(-1);
4311 }
4312
4313 pool_read(backend, &kind, 1);
4314 ereport(DEBUG1,
4315 (errmsg("inject_cached_message: push message kind: '%c'", kind)));
4316 if (msg &&
4317 ((kind == 'T' && msg->type == POOL_DESCRIBE) ||
4318 (kind == '2' && msg->type == POOL_BIND)))
4319 {
4320 /* Pending message seen. Now it is likely to end of pending data */
4321 timeout = 0;
4322 }
4323 pool_push(backend, &kind, sizeof(kind));
4324 pool_read(backend, &len, sizeof(len));
4325 pool_push(backend, &len, sizeof(len));
4326 if ((ntohl(len) - sizeof(len)) > 0)
4327 {
4328 buf = pool_read2(backend, ntohl(len) - sizeof(len));
4329 pool_push(backend, buf, ntohl(len) - sizeof(len));
4330 }
4331 }
4332
4333 /*
4334 * Inject row data and command complete
4335 */
4336 while (i < qcachelen)
4337 {
4338 char tmpkind;
4339 int tmplen;
4340 char *p;
4341
4342 tmpkind = qcache[i];
4343 i++;
4344
4345 memcpy(&tmplen, qcache + i, sizeof(tmplen));
4346 i += sizeof(tmplen);
4347 len = ntohl(tmplen);
4348 p = qcache + i;
4349 i += len - sizeof(tmplen);
4350
4351 /* No need to cache PARSE and BIND responses */
4352 if (tmpkind == '1' || tmpkind == '2')
4353 {
4354 is_prepared_stmt = true;
4355 continue;
4356 }
4357
4358 /*
4359 * In the prepared statement execution, there is no need to send 'T'
4360 * response to the frontend.
4361 */
4362 if (is_prepared_stmt && tmpkind == 'T')
4363 {
4364 continue;
4365 }
4366
4367 /* push message */
4368 ereport(DEBUG1,
4369 (errmsg("inject_cached_message: push cached messages: '%c' len: %d", tmpkind, len)));
4370 pool_push(backend, &tmpkind, 1);
4371 pool_push(backend, &tmplen, sizeof(tmplen));
4372 if (len > 0)
4373 pool_push(backend, p, len - sizeof(tmplen));
4374 }
4375
4376 /*
4377 * Pop data.
4378 */
4379 pool_pop(backend, &len);
4380 }
4381