1 /* -*-pgsql-c-*- */
2 /*
3 * pgpool: a language independent connection pool server for PostgreSQL
4 * written by Tatsuo Ishii
5 *
6 * Copyright (c) 2003-2020 PgPool Global Development Group
7 *
8 * Permission to use, copy, modify, and distribute this software and
9 * its documentation for any purpose and without fee is hereby
10 * granted, provided that the above copyright notice appear in all
11 * copies and that both that copyright notice and this permission
12 * notice appear in supporting documentation, and that the name of the
13 * author not be used in advertising or publicity pertaining to
14 * distribution of the software without specific, written prior
15 * permission. The author makes no representations about the
16 * suitability of this software for any purpose. It is provided "as
17 * is" without express or implied warranty.
18 *
19 * pool_memqcache.c: query cache on shmem or memcached
20 *
21 */
22 #define DATABASE_TO_OID_QUERY "SELECT oid FROM pg_database WHERE datname = '%s'"
23
24 #include "pool.h"
25
26 #include <stdio.h>
27 #include <string.h>
28 #include <unistd.h>
29 #include <sys/file.h>
30 #include <sys/stat.h>
31 #include <sys/types.h>
32 #include <fcntl.h>
33 #include <errno.h>
34 #include <string.h>
35 #include <stdlib.h>
36 #include <ctype.h>
37 #include <arpa/inet.h>
38 #include <dirent.h>
39
40 #ifdef USE_MEMCACHED
41 #include <libmemcached/memcached.h>
42 #endif
43
44 #include "auth/md5.h"
45 #include "pool_config.h"
46 #include "protocol/pool_proto_modules.h"
47 #include "parser/parsenodes.h"
48 #include "context/pool_session_context.h"
49 #include "query_cache/pool_memqcache.h"
50 #include "utils/pool_relcache.h"
51 #include "utils/pool_select_walker.h"
52 #include "utils/pool_stream.h"
53 #include "utils/pool_stream.h"
54 #include "utils/elog.h"
55 #include "utils/palloc.h"
56 #include "utils/memutils.h"
57
58
59 #ifdef USE_MEMCACHED
60 memcached_st *memc;
61 #endif
62
63 static char* encode_key(const char *s, char *buf, POOL_CONNECTION_POOL *backend);
64 #ifdef DEBUG
65 static void dump_cache_data(const char *data, size_t len);
66 #endif
67 static int pool_commit_cache(POOL_CONNECTION_POOL *backend, char *query, char *data, size_t datalen, int num_oids, int *oids);
68 static int pool_fetch_cache(POOL_CONNECTION_POOL *backend, const char *query, char **buf, size_t *len);
69 static int send_cached_messages(POOL_CONNECTION *frontend, const char *qcache, int qcachelen);
70 static void send_message(POOL_CONNECTION *conn, char kind, int len, const char *data);
71 #ifdef USE_MEMCACHED
72 static int delete_cache_on_memcached(const char *key);
73 #endif
74 static int pool_get_dml_table_oid(int **oid);
75 static int pool_get_dropdb_table_oids(int **oids, int dboid);
76 static void pool_discard_dml_table_oid(void);
77 static void pool_invalidate_query_cache(int num_table_oids, int *table_oid, bool unlink, int dboid);
78
79 static int pool_get_database_oid(void);
80 static void pool_add_table_oid_map(POOL_CACHEKEY *cachkey, int num_table_oids, int *table_oids);
81 static void pool_reset_memqcache_buffer(bool reset_dml_oids);
82 static POOL_CACHEID *pool_add_item_shmem_cache(POOL_QUERY_HASH *query_hash, char *data, int size);
83 static POOL_CACHEID *pool_find_item_on_shmem_cache(POOL_QUERY_HASH *query_hash);
84 static char *pool_get_item_shmem_cache(POOL_QUERY_HASH *query_hash, int *size, int *sts);
85 static POOL_QUERY_CACHE_ARRAY * pool_add_query_cache_array(POOL_QUERY_CACHE_ARRAY *cache_array, POOL_TEMP_QUERY_CACHE *cache);
86 static void pool_add_temp_query_cache(POOL_TEMP_QUERY_CACHE *temp_cache, char kind, char *data, int data_len);
87 static void pool_add_oids_temp_query_cache(POOL_TEMP_QUERY_CACHE *temp_cache, int num_oids, int *oids);
88 static POOL_INTERNAL_BUFFER *pool_create_buffer(void);
89 static void pool_discard_buffer(POOL_INTERNAL_BUFFER *buffer);
90 static void pool_add_buffer(POOL_INTERNAL_BUFFER *buffer, void *data, size_t len);
91 static void *pool_get_buffer(POOL_INTERNAL_BUFFER *buffer, size_t *len);
92 #ifdef NOT_USED
93 static char *pool_get_buffer_pointer(POOL_INTERNAL_BUFFER *buffer);
94 #endif
95 static char *pool_get_current_cache_buffer(size_t *len);
96 static size_t pool_get_buffer_length(POOL_INTERNAL_BUFFER *buffer);
97 static void pool_check_and_discard_cache_buffer(int num_oids, int *oids);
98
99 static void pool_set_memqcache_blocks(int num_blocks);
100 static int pool_get_memqcache_blocks(void);
101 static void *pool_memory_cache_address(void);
102 static void pool_reset_fsmm(size_t size);
103 static void *pool_fsmm_address(void);
104 static void pool_update_fsmm(POOL_CACHE_BLOCKID blockid, size_t free_space);
105 static POOL_CACHE_BLOCKID pool_get_block(size_t free_space);
106 static POOL_CACHE_ITEM_HEADER *pool_cache_item_header(POOL_CACHEID *cacheid);
107 static int pool_init_cache_block(POOL_CACHE_BLOCKID blockid);
108 #if NOT_USED
109 static void pool_wipe_out_cache_block(POOL_CACHE_BLOCKID blockid);
110 #endif
111 static int pool_delete_item_shmem_cache(POOL_CACHEID *cacheid);
112 static char *block_address(int blockid);
113 static POOL_CACHE_ITEM_POINTER *item_pointer(char *block, int i);
114 static POOL_CACHE_ITEM_HEADER *item_header(char *block, int i);
115 static POOL_CACHE_BLOCKID pool_reuse_block(void);
116 #ifdef SHMEMCACHE_DEBUG
117 static void dump_shmem_cache(POOL_CACHE_BLOCKID blockid);
118 #endif
119
120 static int pool_hash_reset(int nelements);
121 static int pool_hash_insert(POOL_QUERY_HASH *key, POOL_CACHEID *cacheid, bool update);
122 static uint32 create_hash_key(POOL_QUERY_HASH *key);
123 static volatile POOL_HASH_ELEMENT *get_new_hash_element(void);
124 static void put_back_hash_element(volatile POOL_HASH_ELEMENT *element);
125 static bool is_free_hash_element(void);
126 static void inject_cached_message(POOL_CONNECTION *backend, char *qcache, int qcachelen);
127
128 /*
129 * Connect to Memcached
130 */
memcached_connect(void)131 int memcached_connect(void)
132 {
133 char *memqcache_memcached_host;
134 int memqcache_memcached_port;
135 #ifdef USE_MEMCACHED
136 memcached_server_st *servers;
137 memcached_return rc;
138
139 /* Already connected? */
140 if (memc)
141 {
142 return 0;
143 }
144 #endif
145
146 memqcache_memcached_host = pool_config->memqcache_memcached_host;
147 memqcache_memcached_port = pool_config->memqcache_memcached_port;
148
149 ereport(DEBUG1,
150 (errmsg("connecting to memcached on Host:\"%s:%d\"", memqcache_memcached_host,memqcache_memcached_port)));
151
152 #ifdef USE_MEMCACHED
153 memc = memcached_create(NULL);
154 servers = memcached_server_list_append(NULL,
155 memqcache_memcached_host,
156 memqcache_memcached_port,
157 &rc);
158
159 rc = memcached_server_push(memc, servers);
160 if (rc != MEMCACHED_SUCCESS)
161 {
162 ereport(WARNING,
163 (errmsg("failed to connect to memcached, server push error:\"%s\"\n", memcached_strerror(memc, rc))));
164 memc = (memcached_st *)-1;
165 return -1;
166 }
167 memcached_server_list_free(servers);
168 #else
169 ereport(WARNING,
170 (errmsg("failed to connect to memcached, memcached support is not enabled")));
171 return -1;
172 #endif
173 return 0;
174 }
175
176 /*
177 * Disconnect to Memcached
178 */
memcached_disconnect(void)179 void memcached_disconnect (void)
180 {
181 #ifdef USE_MEMCACHED
182 if (!memc)
183 {
184 return;
185 }
186 memcached_free(memc);
187 #else
188 ereport(WARNING,
189 (errmsg("failed to disconnect from memcached, memcached support is not enabled")));
190 #endif
191 }
192
193 /*
194 * Register buffer data for query cache in memory cache
195 */
memqcache_register(char kind,POOL_CONNECTION * frontend,char * data,int data_len)196 void memqcache_register(char kind,
197 POOL_CONNECTION *frontend,
198 char *data,
199 int data_len)
200 {
201 POOL_TEMP_QUERY_CACHE *cache;
202 POOL_SESSION_CONTEXT *session_context;
203 POOL_QUERY_CONTEXT *query_context;
204
205 cache = pool_get_current_cache();
206
207 if (cache == NULL)
208 {
209 session_context = pool_get_session_context(true);
210
211 if (session_context && pool_is_query_in_progress())
212 {
213 char *query = pool_get_query_string();
214 query_context = session_context->query_context;
215
216 if (query)
217 query_context->temp_cache = pool_create_temp_query_cache(query);
218 }
219 }
220
221 pool_add_temp_query_cache(cache, kind, data, data_len);
222 }
223
224 /*
225 * Commit SELECT results to cache storage.
226 */
pool_commit_cache(POOL_CONNECTION_POOL * backend,char * query,char * data,size_t datalen,int num_oids,int * oids)227 static int pool_commit_cache(POOL_CONNECTION_POOL *backend, char *query, char *data, size_t datalen, int num_oids, int *oids)
228 {
229 #ifdef USE_MEMCACHED
230 memcached_return rc;
231 #endif
232 POOL_CACHEKEY cachekey;
233 char tmpkey[MAX_KEY];
234 time_t memqcache_expire;
235
236 /*
237 * get_buflen() will return -1 if query result exceeds memqcache_maxcache
238 */
239 if (datalen == -1)
240 {
241 return -1;
242 }
243
244 /* query disabled */
245 if (strlen(query) <= 0)
246 {
247 return -1;
248 }
249 ereport(DEBUG1,
250 (errmsg("commiting SELECT results to cache storage"),
251 errdetail("Query=\"%s\"", query)));
252
253 #ifdef DEBUG
254 dump_cache_data(data, datalen);
255 #endif
256
257 /* encode md5key for memcached */
258 encode_key(query, tmpkey, backend);
259 ereport(DEBUG2,
260 (errmsg("commiting SELECT results to cache storage"),
261 errdetail("search key : \"%s\"", tmpkey)));
262
263 memcpy(cachekey.hashkey, tmpkey, 32);
264
265 memqcache_expire = pool_config->memqcache_expire;
266 ereport(DEBUG1,
267 (errmsg("commiting SELECT results to cache storage"),
268 errdetail("memqcache_expire = %ld", memqcache_expire)));
269
270 if (pool_is_shmem_cache())
271 {
272 POOL_CACHEID *cacheid;
273 POOL_QUERY_HASH query_hash;
274
275 memcpy(query_hash.query_hash, tmpkey, sizeof(query_hash.query_hash));
276
277 cacheid = pool_hash_search(&query_hash);
278
279 if (cacheid != NULL)
280 {
281 ereport(DEBUG1,
282 (errmsg("commiting SELECT results to cache storage"),
283 errdetail("item already exists")));
284
285 return 0;
286 }
287 else
288 {
289 cacheid = pool_add_item_shmem_cache(&query_hash, data, datalen);
290 if (cacheid == NULL)
291 {
292 ereport(LOG,
293 (errmsg("failed to add item to shmem cache")));
294 return -1;
295 }
296 else
297 {
298 ereport(DEBUG2,
299 (errmsg("commiting SELECT results to cache storage"),
300 errdetail("blockid: %d itemid: %d",
301 cacheid->blockid, cacheid->itemid)));
302 }
303 cachekey.cacheid.blockid = cacheid->blockid;
304 cachekey.cacheid.itemid = cacheid->itemid;
305 }
306 }
307
308 #ifdef USE_MEMCACHED
309 else
310 {
311 rc = memcached_set(memc, tmpkey, 32,
312 data, datalen, (time_t)memqcache_expire, 0);
313 if (rc != MEMCACHED_SUCCESS)
314 {
315 ereport(WARNING,
316 (errmsg("cache commit failed with error:\"%s\"",memcached_strerror(memc, rc))));
317 return -1;
318 }
319 ereport(DEBUG1,
320 (errmsg("commiting SELECT results to cache storage"),
321 errdetail("set cache succeeded")));
322 }
323 #endif
324
325 /*
326 * Register cache id to oid map
327 */
328 pool_add_table_oid_map(&cachekey, num_oids, oids);
329
330 return 0;
331 }
332
333 /*
334 * Fetch from memory cache.
335 * Return:
336 * 0: fetch success,
337 * 1: not found
338 */
pool_fetch_cache(POOL_CONNECTION_POOL * backend,const char * query,char ** buf,size_t * len)339 static int pool_fetch_cache(POOL_CONNECTION_POOL *backend, const char *query, char **buf, size_t *len)
340 {
341 char *ptr;
342 char tmpkey[MAX_KEY];
343 int sts;
344 char *p;
345
346 if (strlen(query) <= 0)
347 ereport(ERROR,
348 (errmsg("fetching from cache storage, no query")));
349
350 /* encode md5key for memcached */
351 encode_key(query, tmpkey, backend);
352 ereport(DEBUG1,
353 (errmsg("fetching from cache storage"),
354 errdetail("search key \"%s\"", tmpkey)));
355
356
357 if (pool_is_shmem_cache())
358 {
359 POOL_QUERY_HASH query_hash;
360 int mylen;
361
362 memcpy(query_hash.query_hash, tmpkey, sizeof(query_hash.query_hash));
363
364 ptr = pool_get_item_shmem_cache(&query_hash, &mylen, &sts);
365 if (ptr == NULL)
366 {
367 ereport(DEBUG1,
368 (errmsg("fetching from cache storage"),
369 errdetail("cache not found on shared memory")));
370
371 return 1;
372 }
373 *len = mylen;
374 }
375 #ifdef USE_MEMCACHED
376 else
377 {
378 memcached_return rc;
379 unsigned int flags;
380
381 ptr = memcached_get(memc, tmpkey, strlen(tmpkey), len, &flags, &rc);
382
383 if (rc != MEMCACHED_SUCCESS)
384 {
385 if (rc != MEMCACHED_NOTFOUND)
386 {
387 ereport(LOG,
388 (errmsg("fetching from cache storage, memcached_get failed with error: \"%s\"", memcached_strerror(memc, rc))));
389 /*
390 * Turn off memory cache support to prevent future errors.
391 */
392 pool_config->memory_cache_enabled = 0;
393 /* Behave as if cache not found */
394 return 1;
395 }
396 else
397 {
398 /* Not found */
399 ereport(DEBUG1,
400 (errmsg("fetching from cache storage"),
401 errdetail("cache item not found for key: \"%s\" and query:\"%s\"",tmpkey,query)));
402 return 1;
403 }
404 }
405 }
406 #else
407 else
408 {
409 ereport(ERROR,
410 (errmsg("memcached support is not enabled")));
411 }
412 #endif
413
414 p = palloc(*len);
415
416 memcpy(p, ptr, *len);
417
418 if (!pool_is_shmem_cache())
419 {
420 free(ptr);
421 }
422
423 ereport(DEBUG1,
424 (errmsg("fetching from cache storage"),
425 errdetail("query=\"%s\" len:%zd", query, *len)));
426 #ifdef DEBUG
427 dump_cache_data(p, *len);
428 #endif
429
430 *buf = p;
431
432 return 0;
433 }
434
435 /*
436 * encode key.
437 * create cache key as md5(username + query string + database name)
438 */
encode_key(const char * s,char * buf,POOL_CONNECTION_POOL * backend)439 static char* encode_key(const char *s, char *buf, POOL_CONNECTION_POOL *backend)
440 {
441 char* strkey;
442 int u_length;
443 int d_length;
444 int q_length;
445 int length;
446
447 u_length = strlen(backend->info->user);
448 ereport(DEBUG1,
449 (errmsg("memcache encode key"),
450 errdetail("username: \"%s\" database_name: \"%s\"", backend->info->user,backend->info->database)));
451
452 d_length = strlen(backend->info->database);
453
454 q_length = strlen(s);
455 ereport(DEBUG1,
456 (errmsg("memcache encode key"),
457 errdetail("query: \"%s\"", s)));
458
459 length = u_length + d_length + q_length + 1;
460
461 strkey = (char*)palloc(sizeof(char) * length);
462
463 snprintf(strkey, length, "%s%s%s", backend->info->user, s, backend->info->database);
464
465 pool_md5_hash(strkey, strlen(strkey), buf);
466 ereport(DEBUG1,
467 (errmsg("memcache encode key"),
468 errdetail("`%s' -> `%s'", strkey, buf)));
469 pfree(strkey);
470 return buf;
471 }
472
473 #ifdef DEBUG
474 /*
475 * dump cache data
476 */
dump_cache_data(const char * data,size_t len)477 static void dump_cache_data(const char *data, size_t len)
478 {
479 int i;
480 int plen;
481
482
483 fprintf(stderr,"shmem: len = %zd\n", len);
484
485 while (len > 0)
486 {
487 fprintf(stderr,"shmem: kind:%c\n", *data++);
488 len--;
489 memmove(&plen, data, 4);
490 len -= 4;
491 data += 4;
492 plen = ntohl(plen);
493 fprintf(stderr,"shmem: len:%d\n", plen);
494 plen -= 4;
495
496 fprintf(stderr, "shmem: ");
497 for (i=0;i<plen;i++)
498 {
499 fprintf(stderr, "%02x ", (unsigned char)(*data++));
500 len--;
501 }
502 fprintf(stderr, "\n");
503 }
504 }
505 #endif
506
507 /*
508 * send cached messages
509 */
send_cached_messages(POOL_CONNECTION * frontend,const char * qcache,int qcachelen)510 static int send_cached_messages(POOL_CONNECTION *frontend, const char *qcache, int qcachelen)
511 {
512 int msg = 0;
513 int i = 0;
514 int is_prepared_stmt = 0;
515 int len;
516 const char *p;
517
518 while (i < qcachelen)
519 {
520 char tmpkind;
521 int tmplen;
522
523 tmpkind = qcache[i];
524 i++;
525
526 memcpy(&tmplen, qcache+i, sizeof(tmplen));
527 i += sizeof(tmplen);
528 len = ntohl(tmplen);
529 p = qcache + i;
530 i += len - sizeof(tmplen);
531
532 /* No need to cache PARSE and BIND responses */
533 if (tmpkind == '1' || tmpkind == '2')
534 {
535 is_prepared_stmt = 1;
536 continue;
537 }
538
539 /*
540 * In the prepared statement execution, there is no need to send
541 * 'T' response to the frontend.
542 */
543 if (is_prepared_stmt && tmpkind == 'T')
544 {
545 continue;
546 }
547
548 /* send message to frontend */
549 ereport(DEBUG1,
550 (errmsg("memcache: sending cached messages: '%c' len: %d", tmpkind, len)));
551 send_message(frontend, tmpkind, len, p);
552
553 msg++;
554 }
555
556 return msg;
557 }
558
559 /*
560 * send message to frontend
561 */
send_message(POOL_CONNECTION * conn,char kind,int len,const char * data)562 static void send_message(POOL_CONNECTION *conn, char kind, int len, const char *data)
563 {
564 ereport(DEBUG2,
565 (errmsg("memcache: sending messages: kind '%c', len=%d, data=%p", kind, len, data)));
566
567 pool_write(conn, &kind, 1);
568
569 len = htonl(len);
570 pool_write(conn, &len, sizeof(len));
571
572 len = ntohl(len);
573 pool_write(conn, (void *)data, len-sizeof(len));
574 }
575
576 #ifdef USE_MEMCACHED
577 /*
578 * delete query cache on memcached
579 */
delete_cache_on_memcached(const char * key)580 static int delete_cache_on_memcached(const char *key)
581 {
582
583 memcached_return rc;
584
585 ereport(DEBUG2,
586 (errmsg("memcache: deleteing cache on memcached with key: \"%s\"", key)));
587
588
589 /* delete cache data on memcached. key is md5 hash query */
590 rc= memcached_delete(memc, key, 32, (time_t)0);
591
592 /* delete cache data on memcached is failed */
593 if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_BUFFERED)
594 {
595 ereport(LOG,
596 (errmsg("failed to delete cache on memcached, error:\"%s\"", memcached_strerror(memc, rc))));
597 return 0;
598 }
599 return 1;
600
601 }
602 #endif
603
604 /*
605 * Fetch SELECT data from cache if possible.
606 */
pool_fetch_from_memory_cache(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend,char * contents,bool * foundp)607 POOL_STATUS pool_fetch_from_memory_cache(POOL_CONNECTION *frontend,
608 POOL_CONNECTION_POOL *backend,
609 char *contents, bool *foundp)
610 {
611 char *qcache;
612 size_t qcachelen;
613 int sts;
614 pool_sigset_t oldmask;
615
616 ereport(DEBUG1,
617 (errmsg("pool_fetch_from_memory_cache called")));
618
619 *foundp = false;
620
621 POOL_SETMASK2(&BlockSig, &oldmask);
622 pool_shmem_lock();
623
624 PG_TRY();
625 {
626 sts = pool_fetch_cache(backend, contents, &qcache, &qcachelen);
627 }
628 PG_CATCH();
629 {
630 pool_shmem_unlock();
631 POOL_SETMASK(&oldmask);
632 PG_RE_THROW();
633 }
634 PG_END_TRY();
635
636 pool_shmem_unlock();
637 POOL_SETMASK(&oldmask);
638
639 if (sts != 0)
640 /* Cache not found */
641 return POOL_CONTINUE;
642
643 /*
644 * Cache found. If we are doing extended query and in streaming
645 * replication mode, we need to retrieve any responses from backend and
646 * forward them to frontend.
647 */
648 if (pool_is_doing_extended_query_message() && SL_MODE)
649 {
650 POOL_SESSION_CONTEXT *session_context;
651 POOL_CONNECTION *target_backend;
652
653 ereport(DEBUG1,
654 (errmsg("memcache: injecting cache data")));
655
656 session_context = pool_get_session_context(true);
657 target_backend = CONNECTION(backend, session_context->load_balance_node_id);
658 inject_cached_message(target_backend, qcache, qcachelen);
659 }
660 else
661 {
662 /*
663 * Send each messages to frontend
664 */
665 send_cached_messages(frontend, qcache, qcachelen);
666 }
667
668 pfree(qcache);
669
670 /*
671 * Send a "READY FOR QUERY" if not in extended query.
672 */
673 if (!pool_is_doing_extended_query_message() && MAJOR(backend) == PROTO_MAJOR_V3)
674 {
675 signed char state;
676
677 /*
678 * We keep previous transaction state.
679 */
680 state = MASTER(backend)->tstate;
681 send_message(frontend, 'Z', 5, (char *)&state);
682 }
683
684 if (!pool_is_doing_extended_query_message() || !SL_MODE)
685 {
686 if (pool_flush(frontend))
687 {
688 return POOL_END;
689 }
690 }
691
692 *foundp = true;
693
694 if (pool_config->log_per_node_statement)
695 ereport(LOG,
696 (errmsg("fetch from memory cache"),
697 errdetail("query result fetched from cache. statement: %s", contents)));
698
699 ereport(DEBUG1,
700 (errmsg("fetch from memory cache"),
701 errdetail("query result found in the query cache, %s", contents)));
702
703 return POOL_CONTINUE;
704 }
705
706 /*
707 * Simple and rough (thus unreliable) check if the query is likely
708 * SELECT. Just check if the query starts with SELECT or WITH. This
709 * can be used before parse tree is available.
710 */
pool_is_likely_select(char * query)711 bool pool_is_likely_select(char *query)
712 {
713 bool do_continue = false;
714
715 if (query == NULL)
716 return false;
717
718 if (pool_config->ignore_leading_white_space)
719 {
720 /* Ignore leading white spaces */
721 while (*query && isspace(*query))
722 query++;
723 }
724 if (! *query)
725 {
726 return false;
727 }
728
729 /*
730 * Get rid of head comment.
731 * It is sure that the query is in correct format, because the parser
732 * has rejected bad queries such as the one with not-ended comment.
733 */
734 while (*query)
735 {
736 /* Ignore spaces and return marks */
737 do_continue = false;
738 while (*query && isspace(*query))
739 {
740 query++;
741 do_continue = true;
742 }
743 if (do_continue)
744 {
745 continue;
746 }
747
748 while (*query && !strncmp(query, "\n", 2))
749 {
750 query++;
751 do_continue = true;
752 }
753 if (do_continue)
754 {
755 query += 2;
756 continue;
757 }
758
759 /* Ignore comments like C */
760 if (!strncmp(query, "/*", 2))
761 {
762 while (*query && strncmp(query, "*/", 2))
763 query++;
764
765 query += 2;
766 continue;
767 }
768
769 /* Ignore SQL comments */
770 if (!strncmp(query, "--", 2))
771 {
772 while (*query && strncmp(query, "\n", 2))
773 query++;
774
775 query += 2;
776 continue;
777 }
778
779 if (!strncasecmp(query, "SELECT", 6) || !strncasecmp(query, "WITH", 4))
780 {
781 return true;
782 }
783
784 query++;
785 }
786
787 return false;
788 }
789
790 /*
791 * Return true if SELECT can be cached. "node" is the parse tree for
792 * the query and "query" is the query string.
793 * The query must be SELECT or WITH.
794 */
pool_is_allow_to_cache(Node * node,char * query)795 bool pool_is_allow_to_cache(Node *node, char *query)
796 {
797 int i = 0;
798 int num_oids = -1;
799 SelectContext ctx;
800
801 /*
802 * If NO QUERY CACHE comment exists, do not cache.
803 */
804 if (!strncasecmp(query, NO_QUERY_CACHE, NO_QUERY_CACHE_COMMENT_SZ))
805 return false;
806 /*
807 * Check black table list first.
808 */
809 if (pool_config->num_black_memqcache_table_list > 0)
810 {
811 /* Extract oids in from clause of SELECT, and check if SELECT to them could be cached. */
812 num_oids = pool_extract_table_oids_from_select_stmt(node, &ctx);
813 if (num_oids > 0)
814 {
815 for (i = 0; i < num_oids; i++)
816 {
817 ereport(DEBUG1,
818 (errmsg("memcache: checking if node is allowed to cache: check table_names[%d] = \"%s\"", i, ctx.table_names[i])));
819 if (pool_is_table_in_black_list(ctx.table_names[i]) == true)
820 {
821 ereport(DEBUG1,
822 (errmsg("memcache: node is not allowed to cache")));
823 return false;
824 }
825 }
826 }
827 }
828
829 /* SELECT INTO or SELECT FOR SHARE or UPDATE cannot be cached */
830 if (pool_has_insertinto_or_locking_clause(node))
831 return false;
832
833 /*
834 * If SELECT uses non immutable functions, it's not allowed to
835 * cache.
836 */
837 if (pool_has_non_immutable_function_call(node))
838 return false;
839
840 /*
841 * If SELECT uses temporary tables it's not allowed to cache.
842 */
843 if (pool_config->check_temp_table && pool_has_temp_table(node))
844 return false;
845
846 /*
847 * If SELECT uses system catalogs, it's not allowed to cache.
848 */
849 if (pool_has_system_catalog(node))
850 return false;
851
852 /*
853 * TABLESAMPLE is not allowed to cache.
854 */
855 if (IsA(node, SelectStmt) && ((SelectStmt *)node)->fromClause)
856 {
857 List *tbl_list = ((SelectStmt *)node)->fromClause;
858 ListCell *tbl;
859 foreach(tbl, tbl_list)
860 {
861 if (IsA(lfirst(tbl), RangeTableSample))
862 return false;
863 }
864 }
865
866
867 /*
868 * If the table is in the while list, allow to cache even if it is
869 * VIEW or unlogged table.
870 */
871 if (pool_config->num_white_memqcache_table_list > 0)
872 {
873 if (num_oids < 0)
874 num_oids = pool_extract_table_oids_from_select_stmt(node, &ctx);
875
876 if (num_oids > 0)
877 {
878 for (i = 0; i < num_oids; i++)
879 {
880 char *table = ctx.table_names[i];
881 ereport(DEBUG1,
882 (errmsg("memcache: checking if node is allowed to cache: check table_names[%d] = \"%s\"", i, table)));
883 if (is_view(table) || is_unlogged_table(table))
884 {
885 if (pool_is_table_in_white_list(table) == false)
886 {
887 ereport(DEBUG1,
888 (errmsg("memcache: node is not allowed to cache")));
889 return false;
890 }
891 }
892 }
893 }
894 }
895 else
896 {
897 /*
898 * If SELECT uses views, it's not allowed to cache.
899 */
900 if (pool_has_view(node))
901 return false;
902
903 /*
904 * If SELECT uses unlogged tables, it's not allowed to cache.
905 */
906 if (pool_has_unlogged_table(node))
907 return false;
908 }
909
910 /*
911 * If Data-modifying statements in WITH clause, it's not allowed to cache.
912 */
913 if(IsA(node, SelectStmt) && ((SelectStmt *) node)->withClause)
914 {
915 ListCell *lc;
916 WithClause *withClause = ((SelectStmt *) node)->withClause;
917
918 foreach(lc, withClause->ctes)
919 {
920 CommonTableExpr *cte = (CommonTableExpr *)lfirst(lc);
921 if(IsA(cte->ctequery, InsertStmt) ||
922 IsA(cte->ctequery, DeleteStmt) ||
923 IsA(cte->ctequery, UpdateStmt))
924 {
925 return false;
926 }
927 }
928 }
929
930 return true;
931 }
932
933
934 /*
935 * Return true If the SELECTed table is in back list.
936 */
pool_is_table_in_black_list(const char * table_name)937 bool pool_is_table_in_black_list(const char *table_name)
938 {
939
940 if (pool_config->num_black_memqcache_table_list > 0 &&
941 pattern_compare((char *)table_name, BLACKLIST, "black_memqcache_table_list") == 1)
942 {
943 return true;
944 }
945
946 return false;
947 }
948
949 /*
950 * Return true If the SELECTed table is in white list.
951 */
pool_is_table_in_white_list(const char * table_name)952 bool pool_is_table_in_white_list(const char *table_name)
953 {
954 if (pool_config->num_white_memqcache_table_list > 0 &&
955 pattern_compare((char *)table_name, WHITELIST, "white_memqcache_table_list") == 1)
956 {
957 return true;
958 }
959
960 return false;
961 }
962
963 /*
964 * Extract table oid from INSERT/UPDATE/DELETE/TRUNCATE/
965 * DROP TABLE/ALTER TABLE/COPY FROM statement.
966 * For SELECT, if Data-modifying statements in its WITH clause,
967 * extract table oid from Data-modifying statements too.
968 * Returns number of oids.
969 * In case of error, returns 0 (InvalidOid).
970 * oids buffer (oidsp) will be discarded by subsequent call.
971 */
pool_extract_table_oids(Node * node,int ** oidsp)972 int pool_extract_table_oids(Node *node, int **oidsp)
973 {
974 #define POOL_MAX_DML_OIDS 128
975 char *table;
976 static int oids[POOL_MAX_DML_OIDS];
977 int num_oids;
978 int oid;
979
980 if (node == NULL)
981 {
982 ereport(LOG,
983 (errmsg("memcache: error while extracting table oids. statement is NULL")));
984 return 0;
985 }
986
987 num_oids = 0;
988 *oidsp = oids;
989
990 if (IsA(node, InsertStmt))
991 {
992 InsertStmt *stmt = (InsertStmt *) node;
993
994 num_oids = pool_extract_withclause_oids((Node *) stmt->withClause, *oidsp);
995 table = make_table_name_from_rangevar(stmt->relation);
996 }
997 else if (IsA(node, UpdateStmt))
998 {
999 UpdateStmt *stmt = (UpdateStmt *) node;
1000
1001 num_oids = pool_extract_withclause_oids((Node *) stmt->withClause, *oidsp);
1002 table = make_table_name_from_rangevar(stmt->relation);
1003 }
1004 else if (IsA(node, DeleteStmt))
1005 {
1006 DeleteStmt *stmt = (DeleteStmt *) node;
1007
1008 num_oids = pool_extract_withclause_oids((Node *) stmt->withClause, *oidsp);
1009 table = make_table_name_from_rangevar(stmt->relation);
1010 }
1011 else if(IsA(node, SelectStmt))
1012 {
1013 SelectStmt *stmt = (SelectStmt *) node;
1014 num_oids = pool_extract_withclause_oids((Node *) stmt->withClause, *oidsp);
1015 table = NULL;
1016 }
1017 #ifdef NOT_USED
1018 /*
1019 * We do not handle CREATE TABLE here. It is possible that
1020 * pool_extract_table_oids() is called before CREATE TABLE gets
1021 * executed.
1022 */
1023 else if (IsA(node, CreateStmt))
1024 {
1025 CreateStmt *stmt = (CreateStmt *)node;
1026 table = make_table_name_from_rangevar(stmt->relation);
1027 }
1028 #endif
1029
1030 else if (IsA(node, AlterTableStmt))
1031 {
1032 AlterTableStmt *stmt = (AlterTableStmt *)node;
1033 table = make_table_name_from_rangevar(stmt->relation);
1034 }
1035
1036 else if (IsA(node, CopyStmt))
1037 {
1038 CopyStmt *stmt = (CopyStmt *)node;
1039 if (stmt->is_from) /* COPY FROM? */
1040 {
1041 table = make_table_name_from_rangevar(stmt->relation);
1042 }
1043 else
1044 {
1045 return 0;
1046 }
1047 }
1048
1049 else if (IsA(node, DropStmt))
1050 {
1051 ListCell *cell;
1052
1053 DropStmt *stmt = (DropStmt *)node;
1054
1055 if (stmt->removeType != OBJECT_TABLE)
1056 {
1057 return 0;
1058 }
1059
1060 /* Here, stmt->objects is list of target relation info. The
1061 * first cell of target relation info is a list (possibly)
1062 * consists of database, schema and relation. We need to call
1063 * makeRangeVarFromNameList() before passing to
1064 * make_table_name_from_rangevar. Otherwise we get weird excessively
1065 * decorated relation name (''table_name'').
1066 */
1067 foreach(cell, stmt->objects)
1068 {
1069 if (num_oids >= POOL_MAX_DML_OIDS)
1070 {
1071 ereport(LOG,
1072 (errmsg("memcache: error while extracting table oids. too many oids:%d", num_oids)));
1073 return 0;
1074 }
1075
1076 table = make_table_name_from_rangevar(makeRangeVarFromNameList(lfirst(cell)));
1077 oid = pool_table_name_to_oid(table);
1078 if (oid > 0)
1079 {
1080 oids[num_oids++] = pool_table_name_to_oid(table);
1081 ereport(DEBUG1,
1082 (errmsg("memcache: extracting table oids: table: \"%s\" oid:%d", table, oids[num_oids-1])));
1083 }
1084 }
1085 return num_oids;
1086 }
1087 else if (IsA(node, TruncateStmt))
1088 {
1089 ListCell *cell;
1090
1091 TruncateStmt *stmt = (TruncateStmt *)node;
1092
1093 foreach(cell, stmt->relations)
1094 {
1095 if (num_oids >= POOL_MAX_DML_OIDS)
1096 {
1097 ereport(LOG,
1098 (errmsg("memcache: error while extracting table oids. too many oids:%d", num_oids)));
1099 return 0;
1100 }
1101
1102 table = make_table_name_from_rangevar(lfirst(cell));
1103 oid = pool_table_name_to_oid(table);
1104 if (oid > 0)
1105 {
1106 oids[num_oids++] = pool_table_name_to_oid(table);
1107 ereport(DEBUG1,
1108 (errmsg("memcache: extracting table oids: table: \"%s\" oid:%d", table, oids[num_oids-1])));
1109 }
1110 }
1111 return num_oids;
1112 }
1113 else if (IsA(node, ExplainStmt))
1114 {
1115 ListCell *cell;
1116 DefElem *def;
1117 ExplainStmt *stmt = (ExplainStmt *) node;
1118
1119 foreach(cell, stmt->options)
1120 {
1121 def = lfirst(cell);
1122 if (strncmp("analyze", def->defname, 7) == 0)
1123 {
1124 return pool_extract_table_oids(stmt->query, oidsp);
1125 }
1126 }
1127
1128 table = NULL;
1129 }
1130 else
1131 {
1132 ereport(DEBUG1,
1133 (errmsg("memcache: extracting table oids: statment is different from INSERT/UPDATE/DELETE/TRUNCATE/DROP TABLE/ALTER TABLE")));
1134 return 0;
1135 }
1136
1137 oid = pool_table_name_to_oid(table);
1138 if (oid > 0)
1139 {
1140 if (num_oids >= POOL_MAX_DML_OIDS)
1141 {
1142 ereport(LOG,
1143 (errmsg("memcache: error while extracting table oids. too many oids:%d", num_oids)));
1144 return 0;
1145 }
1146
1147 oids[num_oids++] = pool_table_name_to_oid(table);
1148 ereport(DEBUG1,
1149 (errmsg("memcache: extracting table oids: table: \"%s\" oid:%d", table, oid)));
1150 }
1151 return num_oids;
1152 }
1153
1154 /*
1155 * Extract table oid from INSERT/UPDATE/DELETE
1156 * FROM statement in WITH clause.
1157 * Returns number of oids.
1158 * oids buffer (oidsp) will be discarded by subsequent call.
1159 */
1160 int
pool_extract_withclause_oids(Node * node,int * oidsp)1161 pool_extract_withclause_oids(Node *node, int *oidsp)
1162 {
1163 int num_oids = 0;
1164 int oid;
1165 char *table;
1166 ListCell *lc;
1167 WithClause *with;
1168
1169 if(oidsp == NULL)
1170 {
1171 return 0;
1172 }
1173
1174 if(!node || !IsA(node, WithClause))
1175 {
1176 return 0;
1177 }
1178
1179 with = (WithClause *) node;
1180 foreach(lc, with->ctes)
1181 {
1182 CommonTableExpr *cte = (CommonTableExpr *)lfirst(lc);
1183 if(IsA(cte->ctequery, InsertStmt))
1184 {
1185 InsertStmt *stmt = (InsertStmt *) cte->ctequery;
1186 table = make_table_name_from_rangevar(stmt->relation);
1187 }
1188 else if(IsA(cte->ctequery, DeleteStmt))
1189 {
1190 DeleteStmt *stmt = (DeleteStmt *) cte->ctequery;
1191 table = make_table_name_from_rangevar(stmt->relation);
1192 }
1193 else if(IsA(cte->ctequery, UpdateStmt))
1194 {
1195 UpdateStmt *stmt = (UpdateStmt *) cte->ctequery;
1196 table = make_table_name_from_rangevar(stmt->relation);
1197 }
1198 else
1199 {
1200 /* only check INSERT/DELETE/UPDATE in WITH clause */
1201 table = NULL;
1202 }
1203
1204 oid = pool_table_name_to_oid(table);
1205 if (oid > 0)
1206 {
1207 if (num_oids >= POOL_MAX_DML_OIDS)
1208 {
1209 break;
1210 }
1211
1212 oidsp[num_oids++] = pool_table_name_to_oid(table);
1213 ereport(DEBUG1,
1214 (errmsg("memcache: extracting table oids: table: \"%s\" oid:%d", table, oidsp[num_oids - 1])));
1215 }
1216 }
1217
1218 return num_oids;
1219 }
1220
1221 #define POOL_OIDBUF_SIZE 1024
1222 static int* oidbuf;
1223 static int oidbufp;
1224 static int oidbuf_size;
1225
1226 /*
1227 * Add table oid to internal buffer
1228 */
pool_add_dml_table_oid(int oid)1229 void pool_add_dml_table_oid(int oid)
1230 {
1231 int i;
1232 int* tmp;
1233
1234 if (oid == 0)
1235 return;
1236
1237 if (oidbufp >= oidbuf_size)
1238 {
1239 MemoryContext oldcxt;
1240 oidbuf_size += POOL_OIDBUF_SIZE;
1241 /*
1242 * This need to live throughout the life of child so home it in
1243 * TopMemoryContext
1244 */
1245 oldcxt = MemoryContextSwitchTo(TopMemoryContext);
1246 tmp = repalloc(oidbuf, sizeof(int) * oidbuf_size);
1247 MemoryContextSwitchTo(oldcxt);
1248 if (tmp == NULL)
1249 return;
1250
1251 oidbuf = tmp;
1252 }
1253
1254 for (i=0;i<oidbufp;i++)
1255 {
1256 if (oidbuf[i] == oid)
1257 /* Already same oid exists */
1258 return;
1259 }
1260 oidbuf[oidbufp++] = oid;
1261 }
1262
1263
1264 /*
1265 * Get table oid buffer
1266 */
pool_get_dml_table_oid(int ** oid)1267 static int pool_get_dml_table_oid(int **oid)
1268 {
1269 *oid = oidbuf;
1270 return oidbufp;
1271 }
1272
pool_get_dropdb_table_oids(int ** oids,int dboid)1273 static int pool_get_dropdb_table_oids(int **oids, int dboid)
1274 {
1275 int *rtn = 0;
1276 int oids_size = 0;
1277 int *tmp;
1278
1279 int num_oids = 0;
1280 DIR *dir;
1281 struct dirent *dp;
1282 char path[1024];
1283
1284 snprintf(path, sizeof(path), "%s/%d", pool_config->memqcache_oiddir, dboid);
1285 if ((dir = opendir(path)) == NULL)
1286 {
1287 ereport(DEBUG1,
1288 (errmsg("memcache: getting drop table oids"),
1289 errdetail("Failed to open dir: %s", path)));
1290 return 0;
1291 }
1292
1293 while ((dp = readdir(dir)) != NULL)
1294 {
1295 if (strcmp(dp->d_name, ".") == 0 || strcmp(dp->d_name, "..") == 0)
1296 continue;
1297
1298 if (num_oids >= oids_size)
1299 {
1300 oids_size += POOL_OIDBUF_SIZE;
1301 tmp = repalloc(rtn, sizeof(int) * oids_size);
1302 if (tmp == NULL)
1303 {
1304 closedir(dir);
1305 return 0;
1306 }
1307 rtn = tmp;
1308 }
1309
1310 rtn[num_oids] = atol(dp->d_name);
1311 num_oids++;
1312 }
1313
1314 closedir(dir);
1315 *oids = rtn;
1316
1317 return num_oids;
1318 }
1319
1320 /* Discard oid internal buffer */
pool_discard_dml_table_oid(void)1321 static void pool_discard_dml_table_oid(void)
1322 {
1323 oidbufp = 0;
1324 }
1325
1326 /*
1327 * Management modules for oid map. When caching SELECT results, we
1328 * record table oids to file, which has following structure.
1329 *
1330 * memqcache_oiddir -+- database_oid -+-table_oid_file1
1331 * |
1332 * +-table_oid_file2
1333 * |
1334 * +-table_oid_file3...
1335 *
1336 * table_oid_file's name is table oid, which was used by the SELECT
1337 * statement. The file has 1 or more cacheid(s). When SELECT result is
1338 * cached, the file is created and cache id is appended. Later SELECT
1339 * using same table oid will add to the same file. If the SELECT uses
1340 * multiple tables, multiple table_oid_file will be created. When
1341 * INSERT/UPDATE/DELETE is executed, corresponding caches must be
1342 * deleted (cache invalidation) (when DROP TABLE, ALTER TABLE is
1343 * executed, the caches must be deleted as well). When database is
1344 * dropped, all caches belonging to the database must be deleted.
1345 */
1346
1347 /*
1348 * Get oid of current database
1349 */
pool_get_database_oid(void)1350 static int pool_get_database_oid(void)
1351 {
1352 /*
1353 * Query to convert table name to oid
1354 */
1355 int oid = 0;
1356 static POOL_RELCACHE *relcache;
1357 POOL_CONNECTION_POOL *backend;
1358
1359 backend = pool_get_session_context(false)->backend;
1360
1361 /*
1362 * If relcache does not exist, create it.
1363 */
1364 if (!relcache)
1365 {
1366 relcache = pool_create_relcache(pool_config->relcache_size, DATABASE_TO_OID_QUERY,
1367 int_register_func, int_unregister_func,
1368 false);
1369 if (relcache == NULL)
1370 {
1371 ereport(LOG,
1372 (errmsg("memcache: error creating relcache while getting database OID")));
1373 return oid;
1374 }
1375 }
1376
1377 /*
1378 * Search relcache.
1379 */
1380 oid = (int)(intptr_t)pool_search_relcache(relcache, backend,
1381 MASTER_CONNECTION(backend)->sp->database);
1382 return oid;
1383 }
1384
1385 /*
1386 * Get oid of current database for discarding cache files
1387 * after executing DROP DATABASE
1388 */
pool_get_database_oid_from_dbname(char * dbname)1389 int pool_get_database_oid_from_dbname(char *dbname)
1390 {
1391 int dboid = 0;
1392 POOL_SELECT_RESULT *res;
1393 char query[1024];
1394
1395 POOL_CONNECTION_POOL *backend;
1396 backend = pool_get_session_context(false)->backend;
1397
1398 snprintf(query, sizeof(query), DATABASE_TO_OID_QUERY, dbname);
1399 do_query(MASTER(backend), query, &res, MAJOR(backend));
1400
1401 if (res->numrows != 1)
1402 {
1403 ereport(DEBUG1,
1404 (errmsg("memcache: getting oid of current database"),
1405 errdetail("received %d rows", res->numrows)));
1406 free_select_result(res);
1407 return 0;
1408 }
1409
1410 dboid = atol(res->data[0]);
1411 free_select_result(res);
1412
1413 return dboid;
1414 }
1415
1416 /*
1417 * Add cache id (shmem case) or hash key (memcached case) to table oid
1418 * map file. Caller must hold shmem lock before calling this function
1419 * to avoid file extension conflict among different pgpool child
1420 * process.
1421 * As of pgpool-II 3.2, pool_handle_query_cache is responsible for that.
1422 * (pool_handle_query_cache -> pool_commit_cache -> pool_add_table_oid_map)
1423 */
pool_add_table_oid_map(POOL_CACHEKEY * cachekey,int num_table_oids,int * table_oids)1424 static void pool_add_table_oid_map(POOL_CACHEKEY *cachekey, int num_table_oids, int *table_oids)
1425 {
1426 char *dir;
1427 int dboid;
1428 char path[1024];
1429 int i;
1430 int len;
1431
1432 /*
1433 * Create memqcache_oiddir
1434 */
1435 dir = pool_config->memqcache_oiddir;
1436
1437 if (mkdir(dir, S_IREAD|S_IWRITE|S_IEXEC) == -1)
1438 {
1439 if (errno != EEXIST)
1440 {
1441 ereport(WARNING,
1442 (errmsg("memcache: adding table oid maps, failed to create directory:\"%s\". error:\"%s\"", dir, strerror(errno))));
1443 return;
1444 }
1445 }
1446
1447 /*
1448 * Create memqcache_oiddir/database_oid
1449 */
1450 dboid = pool_get_database_oid();
1451 ereport(DEBUG1,
1452 (errmsg("memcache: adding table oid maps"),
1453 errdetail("dboid %d", dboid)));
1454
1455 if (dboid <= 0)
1456 {
1457 ereport(WARNING,
1458 (errmsg("memcache: adding table oid maps, failed to get database OID")));
1459 return;
1460 }
1461
1462 snprintf(path, sizeof(path), "%s/%d", dir, dboid);
1463 if (mkdir(path, S_IREAD|S_IWRITE|S_IEXEC) == -1)
1464 {
1465 if (errno != EEXIST)
1466 {
1467 ereport(WARNING,
1468 (errmsg("memcache: adding table oid maps, failed to create directory:\"%s\". error:\"%s\"", path, strerror(errno))));
1469 return;
1470 }
1471 }
1472
1473 if (pool_is_shmem_cache())
1474 {
1475 len = sizeof(cachekey->cacheid);
1476 }
1477 else
1478 {
1479 len = sizeof(cachekey->hashkey);
1480 }
1481
1482 for (i=0;i<num_table_oids;i++)
1483 {
1484 int fd;
1485 int oid = table_oids[i];
1486 int sts;
1487 struct flock fl;
1488
1489 /*
1490 * Create or open each memqcache_oiddir/database_oid/table_oid
1491 */
1492 snprintf(path, sizeof(path), "%s/%d/%d", dir, dboid, oid);
1493 if ((fd = open(path, O_CREAT|O_RDWR, S_IRUSR|S_IWUSR)) == -1)
1494 {
1495 ereport(WARNING,
1496 (errmsg("memcache: adding table oid maps, failed to open file:\"%s\". error:\"%s\"", path, strerror(errno))));
1497 return;
1498 }
1499
1500 fl.l_type = F_WRLCK;
1501 fl.l_whence = SEEK_SET;
1502 fl.l_start = 0; /* Offset from l_whence */
1503 fl.l_len = 0; /* length, 0 = to EOF */
1504
1505 sts = fcntl(fd, F_SETLKW, &fl);
1506 if (sts == -1)
1507 {
1508 ereport(WARNING,
1509 (errmsg("memcache: adding table oid maps, failed to lock file:\"%s\". error:\"%s\"", path, strerror(errno))));
1510
1511 close(fd);
1512 return;
1513 }
1514
1515 /*
1516 * Below was ifdef-out because of a performance reason.
1517 * Looking for duplicate cache entries in a file needed
1518 * unacceptably high cost. So we gave up this and decided not
1519 * to care about duplicate entries in the file.
1520 */
1521 #ifdef NOT_USED
1522 for (;;)
1523 {
1524 sts = read(fd, (char *)&buf, len);
1525 if (sts == -1)
1526 {
1527 ereport(WARNING,
1528 (errmsg("memcache: adding table oid maps, failed to read file:\"%s\". error:\"%s\"", path, strerror(errno))));
1529 close(fd);
1530 return;
1531 }
1532 else if (sts == len)
1533 {
1534 if (memcmp(cachekey, &buf, len) == 0)
1535 {
1536 /* Same key found. Skip this */
1537 close(fd);
1538 return;
1539 }
1540 continue;
1541 }
1542 /*
1543 * Must be EOF
1544 */
1545 if (sts != 0)
1546 {
1547 ereport(WARNING,
1548 (errmsg("memcache: adding table oid maps, invalid data length:%d in file:\"%s\". error:\"%s\"",sts, path)));
1549 close(fd);
1550 return;
1551 }
1552 break;
1553 }
1554 #endif
1555
1556 if (lseek(fd, 0, SEEK_END) == -1)
1557 {
1558 ereport(WARNING,
1559 (errmsg("memcache: adding table oid maps, failed seek on file:\"%s\". error:\"%s\"", path, strerror(errno))));
1560 close(fd);
1561 return;
1562 }
1563
1564 /*
1565 * Write cache_id or cache key at the end of file
1566 */
1567 sts = write(fd, (char *)cachekey, len);
1568 if (sts == -1 || sts != len)
1569 {
1570 ereport(WARNING,
1571 (errmsg("memcache: adding table oid maps, failed to write file:\"%s\". error:\"%s\"", path, strerror(errno))));
1572 close(fd);
1573 return;
1574 }
1575 close(fd);
1576 }
1577 }
1578
1579 /*
1580 * Discard all oid maps at pgpool-II startup.
1581 * This is necessary for shmem case.
1582 */
pool_discard_oid_maps(void)1583 void pool_discard_oid_maps(void)
1584 {
1585 char command[1024];
1586
1587 snprintf(command, sizeof(command), "/bin/rm -fr %s/[0-9]*",
1588 pool_config->memqcache_oiddir);
1589 if(system(command) == -1)
1590 ereport(WARNING,
1591 (errmsg("unable to execute command \"%s\"",command),
1592 errdetail("system() command failed with error \"%s\"",strerror(errno))));
1593
1594
1595 }
1596
pool_discard_oid_maps_by_db(int dboid)1597 void pool_discard_oid_maps_by_db(int dboid)
1598 {
1599 char command[1024];
1600
1601 if (pool_is_shmem_cache())
1602 {
1603 snprintf(command, sizeof(command), "/bin/rm -fr %s/%d/",
1604 pool_config->memqcache_oiddir, dboid);
1605
1606 ereport(DEBUG1,
1607 (errmsg("memcache: discarding oid maps by db"),
1608 errdetail("command: '%s\'", command)));
1609
1610 if(system(command) == -1)
1611 ereport(WARNING,
1612 (errmsg("unable to execute command \"%s\"",command),
1613 errdetail("system() command failed with error \"%s\"",strerror(errno))));
1614 }
1615 }
1616
1617 /*
1618 * Read cache id (shmem case) or hash key (memcached case) from table
1619 * oid map file according to table_oids and discard cache entries. If
1620 * unlink is true, the file will be unlinked after successful cache
1621 * removal.
1622 */
pool_invalidate_query_cache(int num_table_oids,int * table_oid,bool unlinkp,int dboid)1623 static void pool_invalidate_query_cache(int num_table_oids, int *table_oid, bool unlinkp, int dboid)
1624 {
1625 char *dir;
1626 char path[1024];
1627 int i;
1628 int len;
1629 POOL_CACHEKEY buf;
1630
1631 /*
1632 * Create memqcache_oiddir
1633 */
1634 dir = pool_config->memqcache_oiddir;
1635 if (mkdir(dir, S_IREAD|S_IWRITE|S_IEXEC) == -1)
1636 {
1637 if (errno != EEXIST)
1638 {
1639 ereport(WARNING,
1640 (errmsg("memcache: invalidating query cache, failed to create directory:\"%s\". error:\"%s\"", dir, strerror(errno))));
1641 return;
1642 }
1643 }
1644
1645 /*
1646 * Create memqcache_oiddir/database_oid
1647 */
1648 if (dboid == 0)
1649 {
1650 dboid = pool_get_database_oid();
1651 ereport(DEBUG1,
1652 (errmsg("memcache invalidating query cache"),
1653 errdetail("dboid %d", dboid)));
1654
1655 if (dboid <= 0)
1656 {
1657 ereport(WARNING,
1658 (errmsg("memcache: invalidating query cache, could not get database OID")));
1659 return;
1660 }
1661 }
1662
1663 snprintf(path, sizeof(path), "%s/%d", dir, dboid);
1664 if (mkdir(path, S_IREAD|S_IWRITE|S_IEXEC) == -1)
1665 {
1666 if (errno != EEXIST)
1667 {
1668 ereport(WARNING,
1669 (errmsg("memcache: invalidating query cache, failed to create directory:\"%s\". error:\"%s\"", path, strerror(errno))));
1670 return;
1671 }
1672 }
1673
1674 if (pool_is_shmem_cache())
1675 {
1676 len = sizeof(buf.cacheid);
1677 }
1678 else
1679 {
1680 len = sizeof(buf.hashkey);
1681 }
1682
1683 for (i=0;i<num_table_oids;i++)
1684 {
1685 int fd;
1686 int oid = table_oid[i];
1687 int sts;
1688 struct flock fl;
1689
1690 /*
1691 * Open each memqcache_oiddir/database_oid/table_oid
1692 */
1693 snprintf(path, sizeof(path), "%s/%d/%d", dir, dboid, oid);
1694 if ((fd = open(path, O_RDONLY)) == -1)
1695 {
1696 /* This may be normal. It is possible that no SELECT has
1697 * been issued since the table has been created or since
1698 * pgpool-II started up.
1699 */
1700 ereport(DEBUG1,
1701 (errmsg("memcache invalidating query cache"),
1702 errdetail("failed to open \"%s\". reason:\"%s\"",path, strerror(errno))));
1703 continue;
1704 }
1705
1706 fl.l_type = F_RDLCK;
1707 fl.l_whence = SEEK_SET;
1708 fl.l_start = 0; /* Offset from l_whence */
1709 fl.l_len = 0; /* length, 0 = to EOF */
1710
1711 sts = fcntl(fd, F_SETLKW, &fl);
1712 if (sts == -1)
1713 {
1714 ereport(WARNING,
1715 (errmsg("memcache: invalidating query cache, failed to lock file:\"%s\". error:\"%s\"", path, strerror(errno))));
1716 close(fd);
1717 return;
1718 }
1719 for (;;)
1720 {
1721 sts = read(fd, (char *)&buf, len);
1722 if (sts == -1)
1723 {
1724 ereport(WARNING,
1725 (errmsg("memcache: invalidating query cache, failed to read file:\"%s\". error:\"%s\"", path, strerror(errno))));
1726
1727 close(fd);
1728 return;
1729 }
1730 else if (sts == len)
1731 {
1732 if (pool_is_shmem_cache())
1733 {
1734 ereport(DEBUG1,
1735 (errmsg("memcache invalidating query cache"),
1736 errdetail("deleting cacheid:%d itemid:%d",
1737 buf.cacheid.blockid, buf.cacheid.itemid)));
1738 pool_delete_item_shmem_cache(&buf.cacheid);
1739 }
1740 #ifdef USE_MEMCACHED
1741 else
1742 {
1743 char delbuf[33];
1744
1745 memcpy(delbuf, buf.hashkey, 32);
1746 delbuf[32] = 0;
1747 ereport(DEBUG1,
1748 (errmsg("memcache invalidating query cache"),
1749 errdetail("deleting %s", delbuf)));
1750
1751 delete_cache_on_memcached(delbuf);
1752 }
1753 #endif
1754 continue;
1755 }
1756
1757 /*
1758 * Must be EOF
1759 */
1760 if (sts != 0)
1761 {
1762 ereport(WARNING,
1763 (errmsg("memcache: invalidating query cache, invalid data length:%d in file:\"%s\"",sts, path)));
1764 close(fd);
1765 return;
1766 }
1767 break;
1768 }
1769
1770 if (unlinkp)
1771 {
1772 unlink(path);
1773 }
1774 close(fd);
1775 }
1776 #ifdef SHMEMCACHE_DEBUG
1777 dump_shmem_cache(0);
1778 #endif
1779 }
1780
1781 /*
1782 * Reset SELECT data buffers. If reset_dml_oids is true, call
1783 * pool_discard_dml_table_oid() to reset table oids used in DML statements.
1784 */
pool_reset_memqcache_buffer(bool reset_dml_oids)1785 static void pool_reset_memqcache_buffer(bool reset_dml_oids)
1786 {
1787 POOL_SESSION_CONTEXT * session_context;
1788
1789 session_context = pool_get_session_context(true);
1790
1791 if (session_context && session_context->query_cache_array)
1792 {
1793 POOL_TEMP_QUERY_CACHE *cache;
1794
1795 ereport(DEBUG1,
1796 (errmsg("memcache reset buffer"),
1797 errdetail("discard: %p", session_context->query_cache_array)));
1798
1799 pool_discard_query_cache_array(session_context->query_cache_array);
1800 session_context->query_cache_array = pool_create_query_cache_array();
1801
1802 ereport(DEBUG1,
1803 (errmsg("memcache reset buffer"),
1804 errdetail("create: %p", session_context->query_cache_array)));
1805 /*
1806 * if the query context is still under use, we cannot discard
1807 * temporary cache.
1808 */
1809 if ((SL_MODE && pool_is_doing_extended_query_message()) ||
1810 can_query_context_destroy(session_context->query_context))
1811 {
1812 ereport(DEBUG1,
1813 (errmsg("memcache reset buffer"),
1814 errdetail("discard temp buffer of %p (%s)",
1815 session_context->query_context, session_context->query_context->original_query)));
1816
1817 cache = pool_get_current_cache();
1818 pool_discard_temp_query_cache(cache);
1819 /*
1820 * Reset temp_cache pointer in the current query context
1821 * so that we don't double free memory.
1822 */
1823 session_context->query_context->temp_cache = NULL;
1824 }
1825 }
1826
1827 if (reset_dml_oids)
1828 pool_discard_dml_table_oid();
1829
1830 pool_tmp_stats_reset_num_selects();
1831 }
1832
1833 /*
1834 * Return true if memory cache method is "shmem". The purpose of this
1835 * function is to cache the result of stcmp and to save a few cycle.
1836 */
pool_is_shmem_cache(void)1837 bool pool_is_shmem_cache(void)
1838 {
1839 return pool_config->memqcache_method == SHMEM_CACHE;
1840 }
1841
1842 /*
1843 * Remember memory cache number of blocks.
1844 */
1845 static int memqcache_num_blocks;
pool_set_memqcache_blocks(int num_blocks)1846 static void pool_set_memqcache_blocks(int num_blocks)
1847 {
1848 memqcache_num_blocks = num_blocks;
1849 }
1850
1851 /*
1852 * Return memory cache number of blocks.
1853 */
pool_get_memqcache_blocks(void)1854 static int pool_get_memqcache_blocks(void)
1855 {
1856 return memqcache_num_blocks;
1857 }
1858
1859 /*
1860 * Query cache on shared memory management modules.
1861 */
1862
1863 /*
1864 * Calculate necessary shared memory size.
1865 */
pool_shared_memory_cache_size(void)1866 size_t pool_shared_memory_cache_size(void)
1867 {
1868 int64 num_blocks;
1869 size_t size;
1870
1871 if (pool_config->memqcache_maxcache > pool_config->memqcache_cache_block_size)
1872 ereport(FATAL,
1873 (errmsg("invalid memory cache configuration"),
1874 errdetail("memqcache_cache_block_size %d should be greater or equal to memqcache_maxcache %d",
1875 pool_config->memqcache_cache_block_size,
1876 pool_config->memqcache_maxcache)));
1877
1878
1879 num_blocks = pool_config->memqcache_total_size/
1880 pool_config->memqcache_cache_block_size;
1881 if (num_blocks == 0)
1882 ereport(FATAL,
1883 (errmsg("invalid memory cache configuration"),
1884 errdetail("memqcache_total_size %ld should be greater or equal to memqcache_cache_block_size %d",
1885 pool_config->memqcache_total_size,
1886 pool_config->memqcache_cache_block_size)));
1887
1888 ereport(LOG,
1889 (errmsg("memory cache initialized"),
1890 errdetail("memcache blocks :%ld",num_blocks)));
1891 /* Remember # of blocks */
1892 pool_set_memqcache_blocks(num_blocks);
1893 size = pool_config->memqcache_cache_block_size * num_blocks;
1894 return size;
1895 }
1896
1897 /*
1898 * Acquire and initialize shared memory cache. This should be called
1899 * only once from pgpool main process at the process staring up time.
1900 */
1901 static void *shmem;
pool_init_memory_cache(size_t size)1902 int pool_init_memory_cache(size_t size)
1903 {
1904 ereport(DEBUG1,
1905 (errmsg("memory cache request size : %zd",size)));
1906
1907 shmem = pool_shared_memory_create(size);
1908 return 0;
1909 }
1910
1911 /*
1912 * Clear all the shared memory cache and reset FSMM and hash table.
1913 */
1914 void
pool_clear_memory_cache(void)1915 pool_clear_memory_cache(void)
1916 {
1917 size_t size;
1918 pool_sigset_t oldmask;
1919
1920 POOL_SETMASK2(&BlockSig, &oldmask);
1921 pool_shmem_lock();
1922
1923 PG_TRY();
1924 {
1925 size = pool_shared_memory_cache_size();
1926 memset(shmem, 0, size);
1927
1928 size = pool_shared_memory_fsmm_size();
1929 pool_reset_fsmm(size);
1930
1931 pool_discard_oid_maps();
1932
1933 pool_hash_reset(pool_config->memqcache_max_num_cache);
1934 }
1935 PG_CATCH();
1936 {
1937 pool_shmem_unlock();
1938 POOL_SETMASK(&oldmask);
1939 PG_RE_THROW();
1940 }
1941 PG_END_TRY();
1942
1943 pool_shmem_unlock();
1944 POOL_SETMASK(&oldmask);
1945 }
1946
1947 /*
1948 * Return shared memory cache address
1949 */
pool_memory_cache_address(void)1950 static void *pool_memory_cache_address(void)
1951 {
1952 return shmem;
1953 }
1954
1955 /*
1956 * Initialize new block
1957 */
1958
1959 /*
1960 * Free space management map
1961 *
1962 * Free space management map (FSMM) consists of bytes. Each byte
1963 * corresponds to block id. For example, if you have 1GB cache and
1964 * block size is 8Kb, number of blocks = 131,072, thus total size of
1965 * FSMM is 128Kb. Each FSMM entry has value from 0 to 255. Those
1966 * values describes total free space in each block.
1967 * For example, if the value is 2, the free space can be between 64
1968 * bytes and 95 bytes.
1969 *
1970 * value free space(in bytes)
1971 * 0 0-31
1972 * 1 32-63
1973 * 2 64-95
1974 * 3 96-127
1975 * :
1976 * 255 8160-8192
1977 */
1978
1979 /*
1980 * Calculate necessary shared memory size for FSMM. Should be called after
1981 * pool_shared_memory_cache_size.
1982 */
pool_shared_memory_fsmm_size(void)1983 size_t pool_shared_memory_fsmm_size(void)
1984 {
1985 size_t size;
1986
1987 size = pool_get_memqcache_blocks() * sizeof(char);
1988 return size;
1989 }
1990
1991 /*
1992 * Acquire and initialize shared memory cache for FSMM. This should be
1993 * called after pool_shared_memory_cache_size only once from pgpool
1994 * main process at the process staring up time.
1995 */
1996 static void *fsmm;
pool_init_fsmm(size_t size)1997 int pool_init_fsmm(size_t size)
1998 {
1999 int maxblock = pool_get_memqcache_blocks();
2000 int encode_value;
2001
2002 fsmm = pool_shared_memory_create(size);
2003 encode_value = POOL_MAX_FREE_SPACE/POOL_FSMM_RATIO;
2004 memset(fsmm, encode_value, maxblock);
2005 return 0;
2006 }
2007
2008 /*
2009 * Return shared memory fsmm address
2010 */
pool_fsmm_address(void)2011 static void *pool_fsmm_address(void)
2012 {
2013 return fsmm;
2014 }
2015
2016 /*
2017 * Clock algorithm shared query cache management modules.
2018 */
2019
2020 /*
2021 * Clock hand pointing to next victim block
2022 */
2023 static int *pool_fsmm_clock_hand;
2024
2025 /*
2026 * Allocate and initialize clock hand on shmem
2027 */
pool_allocate_fsmm_clock_hand(void)2028 void pool_allocate_fsmm_clock_hand(void)
2029 {
2030 pool_fsmm_clock_hand = pool_shared_memory_create(sizeof(*pool_fsmm_clock_hand));
2031 *pool_fsmm_clock_hand = 0;
2032 }
2033
2034 /*
2035 * Reset FSMM.
2036 */
2037 static void
pool_reset_fsmm(size_t size)2038 pool_reset_fsmm(size_t size)
2039 {
2040 int encode_value;
2041
2042 encode_value = POOL_MAX_FREE_SPACE/POOL_FSMM_RATIO;
2043 memset(fsmm, encode_value, size);
2044
2045 *pool_fsmm_clock_hand = 0;
2046 }
2047
2048 /*
2049 * Find victim block using clock algorithm and make it free.
2050 * Returns new free block id.
2051 */
pool_reuse_block(void)2052 static POOL_CACHE_BLOCKID pool_reuse_block(void)
2053 {
2054 int maxblock = pool_get_memqcache_blocks();
2055 char *block = block_address(*pool_fsmm_clock_hand);
2056 POOL_CACHE_BLOCK_HEADER *bh = (POOL_CACHE_BLOCK_HEADER *)block;
2057 POOL_CACHE_BLOCKID reused_block;
2058 POOL_CACHE_ITEM_POINTER *cip;
2059 char *p;
2060 int i;
2061
2062 bh->flags = 0;
2063 reused_block = *pool_fsmm_clock_hand;
2064 p = block_address(reused_block);
2065
2066 for (i=0;i<bh->num_items;i++)
2067 {
2068 cip = item_pointer(p, i);
2069
2070 if (!(POOL_ITEM_DELETED & cip->flags))
2071 {
2072 pool_hash_delete(&cip->query_hash);
2073 ereport(DEBUG1,
2074 (errmsg("pool_reuse_block: blockid: %d item: %d", reused_block, i)));
2075 }
2076 }
2077
2078 pool_init_cache_block(reused_block);
2079 pool_update_fsmm(reused_block, POOL_MAX_FREE_SPACE);
2080
2081 (*pool_fsmm_clock_hand)++;
2082 if (*pool_fsmm_clock_hand >= maxblock)
2083 *pool_fsmm_clock_hand = 0;
2084
2085 ereport(LOG,
2086 (errmsg("pool_reuse_block: blockid: %d", reused_block)));
2087
2088 return reused_block;
2089 }
2090
2091 /*
2092 * Get block id which has enough space
2093 */
pool_get_block(size_t free_space)2094 static POOL_CACHE_BLOCKID pool_get_block(size_t free_space)
2095 {
2096 int encode_value;
2097 unsigned char *p = pool_fsmm_address();
2098 int i;
2099 int maxblock = pool_get_memqcache_blocks();
2100 POOL_CACHE_BLOCK_HEADER *bh;
2101
2102 if (p == NULL)
2103 {
2104 ereport(WARNING,
2105 (errmsg("memcache: getting block: FSMM is not initialized")));
2106 return -1;
2107 }
2108
2109 if (free_space > POOL_MAX_FREE_SPACE)
2110 {
2111 ereport(WARNING,
2112 (errmsg("memcache: getting block: invalid free space:%zd", free_space),
2113 errdetail("requested free space: %zd is more than maximum allowed space:%lu",free_space,POOL_MAX_FREE_SPACE)));
2114 return -1;
2115 }
2116
2117 encode_value = free_space/POOL_FSMM_RATIO;
2118
2119 for (i=0;i<maxblock;i++)
2120 {
2121 if (p[i] >= encode_value)
2122 {
2123 /*
2124 * This block *may" have enough space.
2125 * We need to make sure it actually has enough space.
2126 */
2127 bh = (POOL_CACHE_BLOCK_HEADER *)block_address(i);
2128 if (bh->free_bytes >= free_space)
2129 {
2130 return (POOL_CACHE_BLOCKID)i;
2131 }
2132 }
2133 }
2134
2135 /*
2136 * No enough space found. Reuse victim block
2137 */
2138 return pool_reuse_block();
2139 }
2140
2141 /*
2142 * Update free space info for specified block
2143 */
pool_update_fsmm(POOL_CACHE_BLOCKID blockid,size_t free_space)2144 static void pool_update_fsmm(POOL_CACHE_BLOCKID blockid, size_t free_space)
2145 {
2146 int encode_value;
2147 char *p = pool_fsmm_address();
2148
2149 if (p == NULL)
2150 {
2151 ereport(WARNING,
2152 (errmsg("memcache: updating free space in block: FSMM is not initialized")));
2153 return;
2154 }
2155
2156 if (blockid >= pool_get_memqcache_blocks())
2157 {
2158 ereport(WARNING,
2159 (errmsg("memcache: updating free space in block: block id:%d in invalid",blockid)));
2160 return;
2161 }
2162
2163 if (free_space > POOL_MAX_FREE_SPACE)
2164 {
2165 ereport(WARNING,
2166 (errmsg("memcache: updating free space in block: invalid free space:%zd", free_space),
2167 errdetail("requested free space: %zd is more than maximum allowed space:%lu",free_space,POOL_MAX_FREE_SPACE)));
2168
2169 return;
2170 }
2171
2172 encode_value = free_space/POOL_FSMM_RATIO;
2173
2174 p[blockid] = encode_value;
2175
2176 return;
2177 }
2178
2179 /*
2180 * Add item data to shared memory cache.
2181 * On successful registration, returns cache id.
2182 * The cache id is overwritten by the subsequent call to this function.
2183 * On error returns NULL.
2184 */
pool_add_item_shmem_cache(POOL_QUERY_HASH * query_hash,char * data,int size)2185 static POOL_CACHEID *pool_add_item_shmem_cache(POOL_QUERY_HASH *query_hash, char *data, int size)
2186 {
2187 static POOL_CACHEID cacheid;
2188 POOL_CACHE_BLOCKID blockid;
2189 POOL_CACHE_BLOCK_HEADER *bh;
2190 POOL_CACHE_ITEM_POINTER *cip;
2191
2192 POOL_CACHE_ITEM ci;
2193 POOL_CACHE_ITEM_POINTER cip_body;
2194 char *item;
2195
2196 int request_size;
2197 char *p;
2198 int i;
2199 char *src;
2200 char *dst;
2201 int num_deleted;
2202 char *dcip;
2203 char *dci;
2204 bool need_pack;
2205 char *work_buffer;
2206 int index;
2207
2208 if (query_hash == NULL)
2209 {
2210 ereport(LOG,
2211 (errmsg("error while adding item to shmem cache, query hash is NULL")));
2212 return NULL;
2213 }
2214
2215 if (data == NULL)
2216 {
2217 ereport(LOG,
2218 (errmsg("error while adding item to shmem cache, data is NULL")));
2219 return NULL;
2220 }
2221
2222 if (size <= 0)
2223 {
2224 ereport(LOG,
2225 (errmsg("error while adding item to shmem cache, invalid request size: %d", size)));
2226 return NULL;
2227 }
2228
2229 /* Add overhead */
2230 request_size = size + sizeof(POOL_CACHE_ITEM_POINTER) + sizeof(POOL_CACHE_ITEM_HEADER);
2231
2232 /* Get cache block which has enough space */
2233 blockid = pool_get_block(request_size);
2234
2235 if (blockid == -1)
2236 {
2237 return NULL;
2238 }
2239
2240 /*
2241 * Initialize the block if necessary. If no live items are
2242 * remained, we also initialize the block. If there's contiguous
2243 * deleted items, we turn them into free space as well.
2244 */
2245 pool_init_cache_block(blockid);
2246
2247 /*
2248 * Make sure that we have at least one free hash element.
2249 */
2250 while (!is_free_hash_element())
2251 {
2252 /* If not, reuse next victim block */
2253 blockid = pool_reuse_block();
2254 pool_init_cache_block(blockid);
2255 }
2256
2257 /* Get block address on shmem */
2258 p = block_address(blockid);
2259 bh = (POOL_CACHE_BLOCK_HEADER *)p;
2260
2261 /*
2262 * Create contiguous free space. We assume that item bodies are
2263 * ordered from bottom to top of the block, and corresponding item
2264 * pointers are ordered from the youngest to the oldest in the
2265 * beginning of the block.
2266 */
2267
2268 /*
2269 * Optimization. If there's no deleted items, we don't need to
2270 * pack it to create contiguous free space.
2271 */
2272 need_pack = false;
2273 for (i=0;i<bh->num_items;i++)
2274 {
2275 cip = item_pointer(p, i);
2276
2277 if (POOL_ITEM_DELETED & cip->flags) /* Deleted item? */
2278 {
2279 need_pack = true;
2280 ereport(DEBUG1,
2281 (errmsg("memcache adding item"),
2282 errdetail("start creating contiguous space")));
2283 break;
2284 }
2285 }
2286
2287 /*
2288 * We disable packing for now.
2289 * Revisit and remove following code fragment later.
2290 */
2291 need_pack = false;
2292
2293 if (need_pack)
2294 {
2295 /*
2296 * Pack and create contiguous free space.
2297 */
2298 dcip = calloc(1, pool_config->memqcache_cache_block_size);
2299 if (!dcip)
2300 {
2301 ereport(WARNING,
2302 (errmsg("memcache: adding item to cache: calloc failed")));
2303 return NULL;
2304 }
2305
2306 work_buffer = dcip;
2307 dci = dcip + pool_config->memqcache_cache_block_size;
2308 num_deleted = 0;
2309 index = 0;
2310
2311 for (i=0;i<bh->num_items;i++)
2312 {
2313 int total_length;
2314 POOL_CACHEID cid;
2315
2316 cip = item_pointer(p, i);
2317
2318 if (POOL_ITEM_DELETED & cip->flags) /* Deleted item? */
2319 {
2320 num_deleted++;
2321 continue;
2322 }
2323
2324 /* Copy item body */
2325 src = p + cip->offset;
2326 total_length = item_header(p, i)->total_length;
2327 dst = dci - total_length;
2328 cip->offset = dst - work_buffer;
2329 memcpy(dst, src, total_length);
2330
2331 dci -= total_length;
2332
2333 /* Copy item pointer */
2334 src = (char *)cip;
2335 dst = (char *)item_pointer(dcip, index);
2336 memcpy(dst, src, sizeof(POOL_CACHE_ITEM_POINTER));
2337
2338 /* Update hash index */
2339 cid.blockid = blockid;
2340 cid.itemid = index;
2341 pool_hash_insert(&cip->query_hash, &cid, true);
2342 ereport(DEBUG1,
2343 (errmsg("memcache adding item"),
2344 errdetail("item cid updated. old:%d %d new:%d %d",
2345 blockid, i, blockid, index)));
2346 index++;
2347 }
2348
2349 /* All items deleted? */
2350 if (num_deleted > 0 && num_deleted == bh->num_items)
2351 {
2352 ereport(DEBUG1,
2353 (errmsg("memcache adding item"),
2354 errdetail("all items deleted, total deleted:%d", num_deleted)));
2355 bh->flags = 0;
2356 pool_init_cache_block(blockid);
2357 pool_update_fsmm(blockid, POOL_MAX_FREE_SPACE);
2358 }
2359 else
2360 {
2361 /* Update number of items */
2362 bh->num_items -= num_deleted;
2363
2364 /* Copy back the packed block except block header */
2365 memcpy(p+sizeof(POOL_CACHE_BLOCK_HEADER),
2366 work_buffer+sizeof(POOL_CACHE_BLOCK_HEADER),
2367 pool_config->memqcache_cache_block_size-sizeof(POOL_CACHE_BLOCK_HEADER));
2368 }
2369 free(work_buffer);
2370 }
2371
2372 /*
2373 * Make sure that we have enough free space
2374 */
2375 if (bh->free_bytes < request_size)
2376 {
2377 /* This should not happen */
2378 ereport(WARNING,
2379 (errmsg("memcache: adding item to cache: not enough space"),
2380 errdetail("free space: %d required: %d block id:%d",
2381 bh->free_bytes, request_size, blockid)));
2382 return NULL;
2383 }
2384
2385 /*
2386 * At this point, new item can be located at block_header->num_items
2387 */
2388
2389 /* Fill in cache item header */
2390 ci.header.timestamp = time(NULL);
2391 ci.header.total_length = sizeof(POOL_CACHE_ITEM_HEADER) + size;
2392
2393 /* Calculate item body address */
2394 if (bh->num_items == 0)
2395 {
2396 /* This is the #0 item. So address is block_bottom -
2397 * data_length */
2398 item = p + pool_config->memqcache_cache_block_size - ci.header.total_length;
2399
2400 /* Mark this block used */
2401 bh->flags = POOL_BLOCK_USED;
2402 }
2403 else
2404 {
2405 cip = item_pointer(p, bh->num_items-1);
2406 item = p + cip->offset - ci.header.total_length;
2407 }
2408
2409 /* Copy item header */
2410 memcpy(item, &ci, sizeof(POOL_CACHE_ITEM_HEADER));
2411 bh->free_bytes -= sizeof(POOL_CACHE_ITEM_HEADER);
2412
2413 /* Copy item body */
2414 memcpy(item + sizeof(POOL_CACHE_ITEM_HEADER), data, size);
2415 bh->free_bytes -= size;
2416
2417 /* Copy cache item pointer */
2418 memcpy(&cip_body.query_hash, query_hash, sizeof(POOL_QUERY_HASH));
2419 memset(&cip_body.next, 0, sizeof(POOL_CACHEID));
2420 cip_body.offset = item - p;
2421 cip_body.flags = POOL_ITEM_USED;
2422 memcpy(item_pointer(p, bh->num_items), &cip_body, sizeof(POOL_CACHE_ITEM_POINTER));
2423 bh->free_bytes -= sizeof(POOL_CACHE_ITEM_POINTER);
2424
2425 /* Update FSMM */
2426 pool_update_fsmm(blockid, bh->free_bytes);
2427
2428 cacheid.blockid = blockid;
2429 cacheid.itemid = bh->num_items;
2430 ereport(DEBUG1,
2431 (errmsg("memcache adding item"),
2432 errdetail("new item inserted. blockid: %d itemid:%d",
2433 cacheid.blockid, cacheid.itemid)));
2434
2435 /* Add up number of items */
2436 bh->num_items++;
2437
2438 /* Update hash table */
2439 if (pool_hash_insert(query_hash, &cacheid, false) < 0)
2440 {
2441 ereport(LOG,
2442 (errmsg("error while adding item to shmem cache, hash insert failed")));
2443
2444 /* Since we have failed to insert hash index entry, we need to
2445 * undo the addition of cache entry.
2446 */
2447 pool_delete_item_shmem_cache(&cacheid);
2448 return NULL;
2449 }
2450 ereport(DEBUG1,
2451 (errmsg("memcache adding item"),
2452 errdetail("block: %d item: %d", cacheid.blockid, cacheid.itemid)));
2453
2454 #ifdef SHMEMCACHE_DEBUG
2455 dump_shmem_cache(blockid);
2456 #endif
2457 return &cacheid;
2458 }
2459
2460 /*
2461 * Returns item data address on shared memory cache specified by query hash.
2462 * Also data length is set to *size.
2463 * On error or data not found case returns NULL.
2464 * Detail is set to *sts. (0: success, 1: not found, -1: error)
2465 */
pool_get_item_shmem_cache(POOL_QUERY_HASH * query_hash,int * size,int * sts)2466 static char *pool_get_item_shmem_cache(POOL_QUERY_HASH *query_hash, int *size, int *sts)
2467 {
2468 POOL_CACHEID *cacheid;
2469 POOL_CACHE_ITEM_HEADER *cih;
2470
2471 if (sts == NULL)
2472 {
2473 ereport(LOG,
2474 (errmsg("error while getting item from shmem cache, sts is NULL")));
2475 return NULL;
2476 }
2477
2478 *sts = -1;
2479
2480 if (query_hash == NULL)
2481 {
2482 ereport(LOG,
2483 (errmsg("error while getting item from shmem cache, query hash is NULL")));
2484 return NULL;
2485 }
2486
2487 if (size == NULL)
2488 {
2489 ereport(LOG,
2490 (errmsg("error while getting item from shmem cache, size is NULL")));
2491 return NULL;
2492 }
2493
2494 /*
2495 * Find cache header by using hash table
2496 */
2497 cacheid = pool_find_item_on_shmem_cache(query_hash);
2498 if (cacheid == NULL)
2499 {
2500 /* Not found */
2501 *sts = 1;
2502 return NULL;
2503 }
2504
2505 cih = pool_cache_item_header(cacheid);
2506
2507 *size = cih->total_length - sizeof(POOL_CACHE_ITEM_HEADER);
2508 return (char *)cih + sizeof(POOL_CACHE_ITEM_HEADER);
2509 }
2510
2511 /*
2512 * Find data on shared memory cache specified query hash.
2513 * On success returns cache id.
2514 * The cache id is overwritten by the subsequent call to this function.
2515 */
pool_find_item_on_shmem_cache(POOL_QUERY_HASH * query_hash)2516 static POOL_CACHEID *pool_find_item_on_shmem_cache(POOL_QUERY_HASH *query_hash)
2517 {
2518 static POOL_CACHEID cacheid;
2519 POOL_CACHEID *c;
2520 POOL_CACHE_ITEM_HEADER *cih;
2521 time_t now;
2522
2523 c = pool_hash_search(query_hash);
2524 if (!c)
2525 {
2526 return NULL;
2527 }
2528
2529 cih = item_header(block_address(c->blockid), c->itemid);
2530
2531 /* Check cache expiration */
2532 if (pool_config->memqcache_expire > 0)
2533 {
2534 now = time(NULL);
2535 if (now > (cih->timestamp + pool_config->memqcache_expire))
2536 {
2537 ereport(DEBUG1,
2538 (errmsg("memcache finding item"),
2539 errdetail("cache expired: now: %ld timestamp: %ld",
2540 now, cih->timestamp + pool_config->memqcache_expire)));
2541 pool_delete_item_shmem_cache(c);
2542 return NULL;
2543 }
2544 }
2545
2546 cacheid.blockid = c->blockid;
2547 cacheid.itemid = c->itemid;
2548 return &cacheid;
2549 }
2550
2551 /*
2552 * Delete item data specified cache id from shmem.
2553 * On successful deletion, returns 0.
2554 * Other wise return -1.
2555 * FSMM is also updated.
2556 */
pool_delete_item_shmem_cache(POOL_CACHEID * cacheid)2557 static int pool_delete_item_shmem_cache(POOL_CACHEID *cacheid)
2558 {
2559 POOL_CACHE_BLOCK_HEADER *bh;
2560 POOL_CACHE_ITEM_POINTER *cip;
2561 POOL_CACHE_ITEM_HEADER *cih;
2562 POOL_QUERY_HASH key;
2563 int size;
2564 ereport(DEBUG1,
2565 (errmsg("memcache deleting item data"),
2566 errdetail("cacheid:%d itemid:%d",cacheid->blockid, cacheid->itemid)));
2567
2568 if (cacheid->blockid >= pool_get_memqcache_blocks())
2569 {
2570 ereport(LOG,
2571 (errmsg("error while deleting item from shmem cache, invalid block: %d",
2572 cacheid->blockid)));
2573 return -1;
2574 }
2575
2576 bh = (POOL_CACHE_BLOCK_HEADER *)block_address(cacheid->blockid);
2577 if (!(bh->flags & POOL_BLOCK_USED))
2578 {
2579 ereport(LOG,
2580 (errmsg("error while deleting item from shmem cache, block: %d is not used",
2581 cacheid->blockid)));
2582 return -1;
2583 }
2584
2585 if (cacheid->itemid >= bh->num_items)
2586 {
2587 /*
2588 * This could happen if the block is reused. Since contents
2589 * of oid map file is not updated when the block is reused.
2590 */
2591 ereport(DEBUG1,
2592 (errmsg("memcache error deleting item data"),
2593 errdetail("invalid item id %d in block:%d",
2594 cacheid->itemid, cacheid->blockid)));
2595
2596 return -1;
2597 }
2598
2599 cip = item_pointer(block_address(cacheid->blockid), cacheid->itemid);
2600 if (!(cip->flags & POOL_ITEM_USED))
2601 {
2602 ereport(LOG,
2603 (errmsg("error while deleting item from shmem cache, item: %d was not used",
2604 cacheid->itemid)));
2605 return -1;
2606 }
2607
2608 if (cip->flags & POOL_ITEM_DELETED)
2609 {
2610 ereport(LOG,
2611 (errmsg("error while deleting item from shmem cache, item: %d was already deleted",
2612 cacheid->itemid)));
2613 return -1;
2614 }
2615
2616 /* Save cache key */
2617 memcpy(&key, &cip->query_hash, sizeof(POOL_QUERY_HASH));
2618
2619 cih = pool_cache_item_header(cacheid);
2620 size = cih->total_length + sizeof(POOL_CACHE_ITEM_POINTER);
2621
2622 /* Delete item pointer */
2623 cip->flags |= POOL_ITEM_DELETED;
2624
2625 /*
2626 * We do NOT count down bh->num_items here. The deleted space will be recycled
2627 * by pool_add_item_shmem_cache(). However, if this is the last item, we can
2628 * recycle whole block.
2629 *
2630 * 2012/4/1: Now we do not pack data in
2631 * pool_add_item_shmem_cache() for performance reason. Also we
2632 * count down num_items if it is the last one.
2633 */
2634 if ((bh->num_items -1) == 0)
2635 {
2636 ereport(DEBUG1,
2637 (errmsg("memcache deleting item data"),
2638 errdetail("no item remains. initialize block")));
2639 bh->flags = 0;
2640 pool_init_cache_block(cacheid->blockid);
2641 }
2642
2643 /* Remove hash index */
2644 pool_hash_delete(&key);
2645
2646 /*
2647 * If the deleted item is last one in the block, we add it to the free space.
2648 */
2649 if (cacheid->itemid == (bh->num_items -1))
2650 {
2651 bh->free_bytes += size;
2652 ereport(DEBUG1,
2653 (errmsg("memcache deleting item data"),
2654 errdetail("deleted %d bytes, freebytes is = %d",
2655 size, bh->free_bytes)));
2656
2657 bh->num_items--;
2658 }
2659
2660 /* Update FSMM */
2661 pool_update_fsmm(cacheid->blockid, bh->free_bytes);
2662
2663 return 0;
2664 }
2665
2666 /*
2667 * Returns item header specified by cache id.
2668 */
pool_cache_item_header(POOL_CACHEID * cacheid)2669 static POOL_CACHE_ITEM_HEADER *pool_cache_item_header(POOL_CACHEID *cacheid)
2670 {
2671 POOL_CACHE_BLOCK_HEADER *bh;
2672
2673 if (cacheid->blockid >= pool_get_memqcache_blocks())
2674 {
2675 ereport(WARNING,
2676 (errmsg("error while getting cache item header, invalid block id: %d", cacheid->blockid)));
2677 return NULL;
2678 }
2679
2680 bh = (POOL_CACHE_BLOCK_HEADER *)block_address(cacheid->blockid);
2681 if (cacheid->itemid >= bh->num_items)
2682 {
2683 ereport(WARNING,
2684 (errmsg("error while getting cache item header, invalid item id: %d", cacheid->itemid)));
2685 return NULL;
2686 }
2687
2688 return item_header((char *)bh, cacheid->itemid);
2689 }
2690
2691 /*
2692 * Initialize specified block.
2693 */
pool_init_cache_block(POOL_CACHE_BLOCKID blockid)2694 static int pool_init_cache_block(POOL_CACHE_BLOCKID blockid)
2695 {
2696 char *p;
2697 POOL_CACHE_BLOCK_HEADER *bh;
2698
2699 if (blockid >= pool_get_memqcache_blocks())
2700 {
2701 ereport(WARNING,
2702 (errmsg("error while initializing cache block, invalid block id: %d", blockid)));
2703 return -1;
2704 }
2705
2706 p = block_address(blockid);
2707 bh = (POOL_CACHE_BLOCK_HEADER *)p;
2708
2709 /* Is this block used? */
2710 if (!(bh->flags & POOL_BLOCK_USED))
2711 {
2712 /* Initialize empty block */
2713 memset(p, 0, pool_config->memqcache_cache_block_size);
2714 bh->free_bytes = pool_config->memqcache_cache_block_size -
2715 sizeof(POOL_CACHE_BLOCK_HEADER);
2716 }
2717 return 0;
2718 }
2719
2720 #if NOT_USED
2721 /*
2722 * Delete all items in the block.
2723 */
pool_wipe_out_cache_block(POOL_CACHE_BLOCKID blockid)2724 static void pool_wipe_out_cache_block(POOL_CACHE_BLOCKID blockid)
2725 {
2726 char *p;
2727 POOL_CACHE_BLOCK_HEADER *bh;
2728 POOL_CACHE_ITEM_POINTER *cip;
2729 POOL_CACHEID cacheid;
2730 int i;
2731
2732 /* Get block address on shmem */
2733 p = block_address(blockid);
2734 bh = (POOL_CACHE_BLOCK_HEADER *)p;
2735 cacheid.blockid = blockid;
2736
2737 for (i=0;i<bh->num_items;i++)
2738 {
2739 cip = item_pointer(p, i);
2740
2741 if ((POOL_ITEM_DELETED & cip->flags) == 0) /* Not deleted item? */
2742 {
2743 cacheid.itemid = i;
2744 pool_delete_item_shmem_cache(&cacheid);
2745 }
2746 }
2747
2748 bh->flags = 0;
2749 pool_init_cache_block(blockid);
2750 pool_update_fsmm(blockid, POOL_MAX_FREE_SPACE);
2751 }
2752 #endif
2753
2754 /*
2755 * Acquire lock: XXX giant lock
2756 */
pool_shmem_lock(void)2757 void pool_shmem_lock(void)
2758 {
2759 if (pool_is_shmem_cache())
2760 {
2761 pool_semaphore_lock(SHM_CACHE_SEM);
2762
2763 }
2764 }
2765
2766 /*
2767 * Release lock
2768 */
pool_shmem_unlock(void)2769 void pool_shmem_unlock(void)
2770 {
2771 if (pool_is_shmem_cache())
2772 {
2773 pool_semaphore_unlock(SHM_CACHE_SEM);
2774 }
2775 }
2776
2777 /*
2778 * Returns cache block address specified by block id
2779 */
block_address(int blockid)2780 static char *block_address(int blockid)
2781 {
2782 char *p;
2783
2784 p = pool_memory_cache_address() +
2785 blockid * pool_config->memqcache_cache_block_size;
2786 return p;
2787 }
2788
2789 /*
2790 * Returns i th item pointer in block address block
2791 */
item_pointer(char * block,int i)2792 static POOL_CACHE_ITEM_POINTER *item_pointer(char *block, int i)
2793 {
2794 return (POOL_CACHE_ITEM_POINTER *)(block + sizeof(POOL_CACHE_BLOCK_HEADER) +
2795 sizeof(POOL_CACHE_ITEM_POINTER) * i);
2796 }
2797
2798 /*
2799 * Returns i th item header in block address block
2800 */
item_header(char * block,int i)2801 static POOL_CACHE_ITEM_HEADER *item_header(char *block, int i)
2802 {
2803 POOL_CACHE_ITEM_POINTER *cip;
2804
2805 cip = item_pointer(block, i);
2806 return (POOL_CACHE_ITEM_HEADER *)(block + cip->offset);
2807 }
2808
2809 #ifdef SHMEMCACHE_DEBUG
2810 /*
2811 * Dump shmem cache block
2812 */
dump_shmem_cache(POOL_CACHE_BLOCKID blockid)2813 static void dump_shmem_cache(POOL_CACHE_BLOCKID blockid)
2814 {
2815 POOL_CACHE_BLOCK_HEADER *bh;
2816 POOL_CACHE_ITEM_POINTER *cip;
2817 POOL_CACHE_ITEM_HEADER *cih;
2818 int i;
2819
2820 bh = (POOL_CACHE_BLOCK_HEADER *)block_address(blockid);
2821 fprintf(stderr, "shmem: block header(%lu bytes): flags:%x num_items:%d free_bytes:%d\n",
2822 sizeof(*bh), bh->flags, bh->num_items, bh->free_bytes);
2823 for (i=0;i<bh->num_items;i++)
2824 {
2825 cip = item_pointer((char *)bh, i);
2826 fprintf(stderr, "shmem: block: %d %d th item pointer(%lu bytes): offset:%d flags:%x\n",
2827 blockid, i, sizeof(*cip), cip->offset, cip->flags);
2828 cih = item_header((char *)bh, i);
2829 fprintf(stderr, "shmem: block: %d %d th item header(%lu bytes): timestamp:%ld length:%d\n",
2830 blockid, i, sizeof(*cih), cih->timestamp, cih->total_length);
2831 }
2832 }
2833 #endif
2834
2835 /*
2836 * SELECT query result array modules
2837 */
pool_create_query_cache_array(void)2838 POOL_QUERY_CACHE_ARRAY *pool_create_query_cache_array(void)
2839 {
2840 #define POOL_QUERY_CACHE_ARRAY_ALLOCATE_NUM 128
2841 #define POOL_QUERY_CACHE_ARRAY_HEADER_SIZE (sizeof(int)+sizeof(int))
2842
2843 size_t size;
2844 POOL_QUERY_CACHE_ARRAY *p;
2845 POOL_SESSION_CONTEXT *session_context =pool_get_session_context(false);
2846 MemoryContext old_context = MemoryContextSwitchTo(session_context->memory_context);
2847
2848 size = POOL_QUERY_CACHE_ARRAY_HEADER_SIZE + POOL_QUERY_CACHE_ARRAY_ALLOCATE_NUM *
2849 sizeof(POOL_TEMP_QUERY_CACHE *);
2850 p = palloc(size);
2851 p->num_caches = 0;
2852 p->array_size = POOL_QUERY_CACHE_ARRAY_ALLOCATE_NUM;
2853 MemoryContextSwitchTo(old_context);
2854 return p;
2855 }
2856
2857 /*
2858 * Discard query cache array
2859 */
pool_discard_query_cache_array(POOL_QUERY_CACHE_ARRAY * cache_array)2860 void pool_discard_query_cache_array(POOL_QUERY_CACHE_ARRAY *cache_array)
2861 {
2862 int i;
2863
2864 if (!cache_array)
2865 return;
2866
2867 ereport(DEBUG1,
2868 (errmsg("memcache discarding query cache array"),
2869 errdetail("num_caches: %d", cache_array->num_caches)));
2870
2871 for (i=0;i<cache_array->num_caches;i++)
2872 {
2873 ereport(DEBUG2,
2874 (errmsg("memcache discarding query cache array"),
2875 errdetail("cache no: %d cache: %p", i, cache_array->caches[i])));
2876 pool_discard_temp_query_cache(cache_array->caches[i]);
2877 }
2878 pfree(cache_array);
2879 }
2880
2881 /*
2882 * Add query cache array
2883 */
pool_add_query_cache_array(POOL_QUERY_CACHE_ARRAY * cache_array,POOL_TEMP_QUERY_CACHE * cache)2884 static POOL_QUERY_CACHE_ARRAY * pool_add_query_cache_array(POOL_QUERY_CACHE_ARRAY *cache_array, POOL_TEMP_QUERY_CACHE *cache)
2885 {
2886 size_t size;
2887 POOL_QUERY_CACHE_ARRAY *cp = cache_array;
2888
2889 if (!cache_array)
2890 return cp;
2891
2892 ereport(DEBUG2,
2893 (errmsg("memcache adding query cache array"),
2894 errdetail("num_caches: %d cache: %p", cache_array->num_caches, cache)));
2895 if (cache_array->num_caches >= cache_array->array_size)
2896 {
2897 cache_array->array_size += POOL_QUERY_CACHE_ARRAY_ALLOCATE_NUM;
2898 size = POOL_QUERY_CACHE_ARRAY_HEADER_SIZE + cache_array->array_size *
2899 sizeof(POOL_TEMP_QUERY_CACHE *);
2900 cache_array = repalloc(cache_array, size);
2901 }
2902 cache_array->caches[cache_array->num_caches++] = cache;
2903 return cache_array;
2904 }
2905
2906 /*
2907 * SELECT query result temporary cache modules
2908 */
2909
2910 /*
2911 * Create SELECT result temporary cache
2912 */
pool_create_temp_query_cache(char * query)2913 POOL_TEMP_QUERY_CACHE *pool_create_temp_query_cache(char *query)
2914 {
2915 POOL_SESSION_CONTEXT *session_context =pool_get_session_context(false);
2916 MemoryContext old_context = MemoryContextSwitchTo(session_context->memory_context);
2917 POOL_TEMP_QUERY_CACHE *p;
2918 p = palloc(sizeof(*p));
2919 p->query = pstrdup(query);
2920
2921 p->buffer = pool_create_buffer();
2922 p->oids = pool_create_buffer();
2923 p->num_oids = 0;
2924 p->is_exceeded = false;
2925 p->is_discarded = false;
2926
2927 MemoryContextSwitchTo(old_context);
2928
2929 ereport(DEBUG1,
2930 (errmsg("pool_create_temp_query_cache: cache created: %p", p)));
2931
2932 return p;
2933 }
2934
2935 /*
2936 * Discard temp query cache
2937 */
pool_discard_temp_query_cache(POOL_TEMP_QUERY_CACHE * temp_cache)2938 void pool_discard_temp_query_cache(POOL_TEMP_QUERY_CACHE *temp_cache)
2939 {
2940 if (!temp_cache)
2941 return;
2942
2943 if (temp_cache->query)
2944 pfree(temp_cache->query);
2945 if (temp_cache->buffer)
2946 pool_discard_buffer(temp_cache->buffer);
2947 if (temp_cache->oids)
2948 pool_discard_buffer(temp_cache->oids);
2949 pfree(temp_cache);
2950
2951 ereport(DEBUG1,
2952 (errmsg("pool_discard_temp_query_cache: cache discarded: %p", temp_cache)));
2953 }
2954
2955 /*
2956 * Add data to temp query cache.
2957 * Data must be FE/BE protocol packet.
2958 */
pool_add_temp_query_cache(POOL_TEMP_QUERY_CACHE * temp_cache,char kind,char * data,int data_len)2959 static void pool_add_temp_query_cache(POOL_TEMP_QUERY_CACHE *temp_cache, char kind, char *data, int data_len)
2960 {
2961 POOL_INTERNAL_BUFFER *buffer;
2962 size_t buflen;
2963 int send_len;
2964
2965 if (temp_cache == NULL)
2966 {
2967 /* This could happen if cache exceeded in previous query
2968 * execution in the same unnamed portal.
2969 */
2970 ereport(DEBUG1,
2971 (errmsg("memcache adding temporary query cache"),
2972 errdetail("POOL_TEMP_QUERY_CACHE is NULL")));
2973 return;
2974 }
2975
2976 if (temp_cache->is_exceeded)
2977 {
2978 ereport(DEBUG1,
2979 (errmsg("memcache adding temporary query cache"),
2980 errdetail("memqcache_maxcache exceeds")));
2981 return;
2982 }
2983
2984 /*
2985 * We only store T(Table Description), D(Data row), C(Command Complete),
2986 * 1(ParseComplete), 2(BindComplete)
2987 */
2988 if (kind != 'T' && kind != 'D' && kind != 'C' && kind != '1' && kind != '2')
2989 {
2990 return;
2991 }
2992
2993 /* Check data limit */
2994 buffer = temp_cache->buffer;
2995 buflen = pool_get_buffer_length(buffer);
2996
2997 if ((buflen+data_len+sizeof(int)+1) > pool_config->memqcache_maxcache)
2998 {
2999 ereport(DEBUG1,
3000 (errmsg("memcache adding temporary query cache"),
3001 errdetail("data size exceeds memqcache_maxcache. current:%zd requested:%zd memq_maxcache:%d",
3002 buflen, data_len+sizeof(int)+1, pool_config->memqcache_maxcache)));
3003 temp_cache->is_exceeded = true;
3004 return;
3005 }
3006
3007 pool_add_buffer(buffer, &kind, 1);
3008 send_len = htonl(data_len + sizeof(int));
3009 pool_add_buffer(buffer, (char *)&send_len, sizeof(int));
3010 pool_add_buffer(buffer, data, data_len);
3011
3012 return;
3013 }
3014
3015 /*
3016 * Add table oids used by SELECT to temp query cache.
3017 */
pool_add_oids_temp_query_cache(POOL_TEMP_QUERY_CACHE * temp_cache,int num_oids,int * oids)3018 static void pool_add_oids_temp_query_cache(POOL_TEMP_QUERY_CACHE *temp_cache, int num_oids, int *oids)
3019 {
3020 POOL_INTERNAL_BUFFER *buffer;
3021
3022 if (!temp_cache || num_oids <= 0)
3023 return;
3024
3025 buffer = temp_cache->oids;
3026 pool_add_buffer(buffer, oids, num_oids*sizeof(int));
3027 temp_cache->num_oids = num_oids;
3028 }
3029
3030 /*
3031 * Internal buffer management modules.
3032 * Usage:
3033 * 1) Create buffer using pool_create_buffer().
3034 * 2) Add data to buffer using pool_add_buffer().
3035 * 3) Extract (copied) data from buffer using pool_get_buffer().
3036 * 4) Optionally you can:
3037 * Obtain buffer length by using pool_get_buffer_length().
3038 * Obtain buffer pointer by using pool_get_buffer_pointer().
3039 * 5) Discard buffer using pool_discard_buffer().
3040 */
3041
3042 /*
3043 * Create and return internal buffer
3044 */
pool_create_buffer(void)3045 static POOL_INTERNAL_BUFFER *pool_create_buffer(void)
3046 {
3047 POOL_INTERNAL_BUFFER *p;
3048 p = palloc0(sizeof(*p));
3049 return p;
3050 }
3051
3052 /*
3053 * Discard internal buffer
3054 */
pool_discard_buffer(POOL_INTERNAL_BUFFER * buffer)3055 static void pool_discard_buffer(POOL_INTERNAL_BUFFER *buffer)
3056 {
3057 if (buffer)
3058 {
3059 if (buffer->buf)
3060 pfree(buffer->buf);
3061 pfree(buffer);
3062 }
3063 }
3064
3065 /*
3066 * Add data to internal buffer
3067 */
pool_add_buffer(POOL_INTERNAL_BUFFER * buffer,void * data,size_t len)3068 static void pool_add_buffer(POOL_INTERNAL_BUFFER *buffer, void *data, size_t len)
3069 {
3070 #define POOL_ALLOCATE_UNIT 8192
3071
3072 /* Sanity check */
3073 if (!buffer || !data || len == 0)
3074 return;
3075 POOL_SESSION_CONTEXT *session_context =pool_get_session_context(false);
3076 MemoryContext old_context = MemoryContextSwitchTo(session_context->memory_context);
3077
3078 /* Check if we need to increase the buffer size */
3079 if ((buffer->buflen + len) > buffer->bufsize)
3080 {
3081 size_t allocate_size = ((buffer->buflen + len)/POOL_ALLOCATE_UNIT +1)*POOL_ALLOCATE_UNIT;
3082 ereport(DEBUG2,
3083 (errmsg("memcache adding data to internal buffer"),
3084 errdetail("realloc old size:%zd new size:%zd",
3085 buffer->bufsize, allocate_size)));
3086 buffer->bufsize = allocate_size;
3087 buffer->buf = (char *)repalloc(buffer->buf, buffer->bufsize);
3088 }
3089 /* Add data to buffer */
3090 memcpy(buffer->buf+buffer->buflen, data, len);
3091 buffer->buflen += len;
3092 ereport(DEBUG2,
3093 (errmsg("memcache adding data to internal buffer"),
3094 errdetail("len:%zd, total:%zd bufsize:%zd",
3095 len, buffer->buflen, buffer->bufsize)));
3096 MemoryContextSwitchTo(old_context);
3097 return;
3098 }
3099
3100 /*
3101 * Get data from internal buffer.
3102 * Data is stored in newly malloc memory.
3103 * Data length is returned to len.
3104 */
pool_get_buffer(POOL_INTERNAL_BUFFER * buffer,size_t * len)3105 static void *pool_get_buffer(POOL_INTERNAL_BUFFER *buffer, size_t *len)
3106 {
3107 void *p;
3108
3109 if (buffer->bufsize == 0 || buffer->buflen == 0 ||
3110 buffer->buf == NULL)
3111 {
3112 *len = 0;
3113 return NULL;
3114 }
3115
3116 p = palloc(buffer->buflen);
3117 memcpy(p, buffer->buf, buffer->buflen);
3118 *len = buffer->buflen;
3119 return p;
3120 }
3121
3122 /*
3123 * Get internal buffer length.
3124 */
pool_get_buffer_length(POOL_INTERNAL_BUFFER * buffer)3125 static size_t pool_get_buffer_length(POOL_INTERNAL_BUFFER *buffer)
3126 {
3127 if (buffer == NULL)
3128 return 0;
3129
3130 return buffer->buflen;
3131 }
3132
3133 #ifdef NOT_USED
3134 /*
3135 * Get internal buffer pointer.
3136 */
pool_get_buffer_pointer(POOL_INTERNAL_BUFFER * buffer)3137 static char *pool_get_buffer_pointer(POOL_INTERNAL_BUFFER *buffer)
3138 {
3139 if (buffer == NULL)
3140 return NULL;
3141 return buffer->buf;
3142 }
3143 #endif
3144 /*
3145 * Get query cache buffer struct of current query context
3146 */
pool_get_current_cache(void)3147 POOL_TEMP_QUERY_CACHE *pool_get_current_cache(void)
3148 {
3149 POOL_SESSION_CONTEXT *session_context;
3150 POOL_QUERY_CONTEXT *query_context;
3151 POOL_TEMP_QUERY_CACHE *p = NULL;
3152
3153 session_context = pool_get_session_context(true);
3154 if (session_context)
3155 {
3156 query_context = session_context->query_context;
3157 if (query_context)
3158 {
3159 p = query_context->temp_cache;
3160 }
3161 }
3162 return p;
3163 }
3164
3165 /*
3166 * Get query cache buffer of current query context
3167 */
pool_get_current_cache_buffer(size_t * len)3168 static char *pool_get_current_cache_buffer(size_t *len)
3169 {
3170 char *p = NULL;
3171 *len = 0;
3172 POOL_TEMP_QUERY_CACHE *cache;
3173
3174 cache = pool_get_current_cache();
3175 if (cache)
3176 {
3177 p = pool_get_buffer(cache->buffer, len);
3178 }
3179 return p;
3180 }
3181
3182 /*
3183 * Mark this temporary query cache buffer discarded if the SELECT
3184 * uses the table oid specified by oids.
3185 */
pool_check_and_discard_cache_buffer(int num_oids,int * oids)3186 static void pool_check_and_discard_cache_buffer(int num_oids, int *oids)
3187 {
3188 POOL_SESSION_CONTEXT *session_context;
3189 POOL_TEMP_QUERY_CACHE *cache;
3190 int num_caches;
3191 size_t len;
3192 int *soids;
3193 int i, j, k;
3194
3195 session_context = pool_get_session_context(true);
3196
3197 if (!session_context || !session_context->query_cache_array)
3198 return;
3199
3200 num_caches = session_context->query_cache_array->num_caches;
3201
3202 for (i=0;i<num_caches;i++)
3203 {
3204 cache = session_context->query_cache_array->caches[i];
3205 if (!cache || cache->is_discarded)
3206 continue;
3207
3208 soids = (int *)pool_get_buffer(cache->oids, &len);
3209 if (!soids || !len)
3210 continue;
3211
3212 for(j=0;j<cache->num_oids;j++)
3213 {
3214 if (cache->is_discarded)
3215 break;
3216
3217 for (k=0;k<num_oids;k++)
3218 {
3219 if (soids[j] == oids[k])
3220 {
3221 ereport(DEBUG1,
3222 (errmsg("discard cache for \"%s\"",cache->query)));
3223 cache->is_discarded = true;
3224 break;
3225 }
3226 }
3227 }
3228 pfree(soids);
3229 }
3230 }
3231
3232 /*
3233 * At Ready for Query or Comand Complete handle query cache. For streaming
3234 * replication mode and extended query at Comand Complete handle query cache.
3235 * For other case At Ready for Query handle query cache.
3236 */
pool_handle_query_cache(POOL_CONNECTION_POOL * backend,char * query,Node * node,char state)3237 void pool_handle_query_cache(POOL_CONNECTION_POOL *backend, char *query, Node *node, char state)
3238 {
3239 POOL_SESSION_CONTEXT *session_context;
3240 pool_sigset_t oldmask;
3241 char *cache_buffer;
3242 size_t len;
3243 int num_oids;
3244 int *oids;
3245 int i;
3246
3247 session_context = pool_get_session_context(true);
3248
3249 /* Ok to cache SELECT result? */
3250 if (pool_is_cache_safe())
3251 {
3252 SelectContext ctx;
3253 MemoryContext old_context;
3254 old_context = MemoryContextSwitchTo(session_context->memory_context);
3255 num_oids = pool_extract_table_oids_from_select_stmt(node, &ctx);
3256 MemoryContextSwitchTo(old_context);
3257 oids = ctx.table_oids;
3258 ereport(DEBUG2,
3259 (errmsg("query cache handler for ReadyForQuery"),
3260 errdetail("num_oids: %d oid: %d", num_oids, *oids)));
3261
3262 if (state == 'I') /* Not inside a transaction? */
3263 {
3264 /*
3265 * Make sure that temporary cache is not exceeded.
3266 */
3267 if (!pool_is_cache_exceeded())
3268 {
3269 POOL_TEMP_QUERY_CACHE *cache;
3270 /*
3271 * If we are not inside a transaction, we can
3272 * immediately register to cache storage.
3273 */
3274 /* Register to memcached or shmem */
3275 POOL_SETMASK2(&BlockSig, &oldmask);
3276 pool_shmem_lock();
3277
3278 cache_buffer = pool_get_current_cache_buffer(&len);
3279 if (cache_buffer)
3280 {
3281 if (session_context->query_context->skip_cache_commit == false)
3282 {
3283 if (pool_commit_cache(backend, query, cache_buffer, len, num_oids, oids) != 0)
3284 {
3285 ereport(WARNING,
3286 (errmsg("ReadyForQuery: pool_commit_cache failed")));
3287 }
3288 }
3289 /*
3290 * Reset temporary query cache buffer. This is
3291 * necessary if extended query protocol is used and a
3292 * bind/execute message arrives which uses a statement
3293 * created by prior parse message. In this case since
3294 * the temp_cache is not initialized by a parse
3295 * message, messages are added to pre existing temp
3296 * cache buffer. The problem was found in bug#152.
3297 * http://www.pgpool.net/mantisbt/view.php?id=152
3298 */
3299 cache = pool_get_current_cache();
3300 ereport(DEBUG1,
3301 (errmsg("pool_handle_query_cache: temp_cache: %p", cache)));
3302 pool_discard_temp_query_cache(cache);
3303
3304 if (SL_MODE && pool_is_doing_extended_query_message())
3305 session_context->query_context->temp_cache = NULL;
3306 else
3307 session_context->query_context->temp_cache = pool_create_temp_query_cache(query);
3308 pfree(cache_buffer);
3309 }
3310 pool_shmem_unlock();
3311 POOL_SETMASK(&oldmask);
3312 }
3313
3314 /* Count up SELECT stats */
3315 pool_stats_count_up_num_selects(1);
3316
3317 /* Reset temp buffer */
3318 pool_reset_memqcache_buffer(true);
3319 }
3320 else
3321 {
3322 POOL_TEMP_QUERY_CACHE *cache = pool_get_current_cache();
3323
3324 /* In transaction. Keep to temp query cache array */
3325 pool_add_oids_temp_query_cache(cache, num_oids, oids);
3326
3327 /*
3328 * If temp cache has been overflowed, just trash the half
3329 * baked temp cache.
3330 */
3331 if (pool_is_cache_exceeded())
3332 {
3333 POOL_TEMP_QUERY_CACHE *cache;
3334
3335 cache = pool_get_current_cache();
3336 pool_discard_temp_query_cache(cache);
3337 /*
3338 * Reset temp_cache pointer in the current query context
3339 * so that we don't double free memory.
3340 */
3341 session_context->query_context->temp_cache = NULL;
3342
3343 }
3344 /*
3345 * Otherwise add to the temp cache array.
3346 */
3347 else
3348 {
3349 session_context->query_cache_array =
3350 pool_add_query_cache_array(session_context->query_cache_array, cache);
3351 /*
3352 * Reset temp_cache pointer in the current query
3353 * context so that we don't add the same temp cache to
3354 * the cache array. This is necessary such that case
3355 * when next query is just a "bind message", without
3356 * "parse message". In the case the query context is
3357 * reused and same cache pointer will be added to the
3358 * query_cache_array which we do not want.
3359 */
3360 session_context->query_context->temp_cache = NULL;
3361 }
3362
3363 /* Count up temporary SELECT stats */
3364 pool_tmp_stats_count_up_num_selects();
3365 }
3366 }
3367 else if (is_rollback_query(node)) /* Rollback? */
3368 {
3369 /* Discard buffered data */
3370 pool_reset_memqcache_buffer(true);
3371 }
3372 else if (is_commit_query(node)) /* Commit? */
3373 {
3374 int num_caches;
3375
3376 POOL_SETMASK2(&BlockSig, &oldmask);
3377 pool_shmem_lock();
3378
3379 /* Invalidate query cache */
3380 if (pool_config->memqcache_auto_cache_invalidation)
3381 {
3382 num_oids = pool_get_dml_table_oid(&oids);
3383 pool_invalidate_query_cache(num_oids, oids, true, 0);
3384 }
3385
3386 /*
3387 * If we have something in the query cache buffer, that means
3388 * either:
3389 * - We only had SELECTs in the transaction
3390 * - We had only SELECTs after last DML
3391 * Thus we can register SELECT results to cache storage.
3392 */
3393 num_caches = session_context->query_cache_array->num_caches;
3394 for (i=0;i<num_caches;i++)
3395 {
3396 POOL_TEMP_QUERY_CACHE *cache;
3397
3398 cache = session_context->query_cache_array->caches[i];
3399 if (!cache || cache->is_discarded)
3400 continue;
3401
3402 num_oids = cache->num_oids;
3403 oids = pool_get_buffer(cache->oids, &len);
3404 cache_buffer = pool_get_buffer(cache->buffer, &len);
3405
3406 if (pool_commit_cache(backend, cache->query, cache_buffer, len, num_oids, oids) != 0)
3407 {
3408 ereport(WARNING,
3409 (errmsg("ReadyForQuery: pool_commit_cache failed")));
3410 }
3411 if (oids)
3412 pfree(oids);
3413 if (cache_buffer)
3414 pfree(cache_buffer);
3415 }
3416 pool_shmem_unlock();
3417 POOL_SETMASK(&oldmask);
3418
3419 /* Count up number of SELECT stats */
3420 pool_stats_count_up_num_selects(pool_tmp_stats_get_num_selects());
3421
3422 pool_reset_memqcache_buffer(true);
3423 }
3424 else /* Non cache safe queries */
3425 {
3426 /* Non cachable SELECT */
3427 if (node && IsA(node, SelectStmt))
3428 {
3429 /* Extract table oids from buffer */
3430 num_oids = pool_get_dml_table_oid(&oids);
3431
3432 if (state == 'I')
3433 {
3434 /*
3435 * If Data-modifying statements in SELECT's WITH clause,
3436 * invalidate query cache.
3437 */
3438 if (num_oids > 0 && pool_config->memqcache_auto_cache_invalidation)
3439 {
3440 POOL_SETMASK2(&BlockSig, &oldmask);
3441 pool_shmem_lock();
3442 pool_invalidate_query_cache(num_oids, oids, true, 0);
3443 pool_shmem_unlock();
3444 POOL_SETMASK(&oldmask);
3445 }
3446
3447 /* Count up SELECT stats */
3448 pool_stats_count_up_num_selects(1);
3449 pool_reset_memqcache_buffer(true);
3450 }
3451 else
3452 {
3453 /*
3454 * If we are inside a transaction, we cannot invalidate
3455 * query cache yet. However we can clear cache buffer, if
3456 * DML/DDL modifies the TABLE which SELECT uses.
3457 */
3458 if (num_oids > 0 && pool_config->memqcache_auto_cache_invalidation)
3459 {
3460 pool_check_and_discard_cache_buffer(num_oids, oids);
3461 pool_reset_memqcache_buffer(false);
3462 }
3463
3464 /* Count up temporary SELECT stats */
3465 pool_tmp_stats_count_up_num_selects();
3466 }
3467 }
3468 /*
3469 * If the query is DROP DATABASE, discard both of caches in shmem/memcached and
3470 * oidmap in memqcache_oiddir.
3471 */
3472 else if (is_drop_database(node) && session_context->query_context->dboid != 0)
3473 {
3474 int dboid = session_context->query_context->dboid;
3475 num_oids = pool_get_dropdb_table_oids(&oids, dboid);
3476
3477 if (num_oids > 0 && pool_config->memqcache_auto_cache_invalidation)
3478 {
3479 pool_shmem_lock();
3480 pool_invalidate_query_cache(num_oids, oids, true, dboid);
3481 pool_discard_oid_maps_by_db(dboid);
3482 pool_shmem_unlock();
3483 pool_reset_memqcache_buffer(true);
3484
3485 pfree(oids);
3486 ereport(DEBUG2,
3487 (errmsg("query cache handler for ReadyForQuery"),
3488 errdetail("deleted all cache files for the DROPped DB")));
3489 }
3490 }
3491 else
3492 {
3493 /*
3494 * DML/DCL/DDL case
3495 */
3496
3497 /* Extract table oids from buffer */
3498 num_oids = pool_get_dml_table_oid(&oids);
3499 if (num_oids > 0 && pool_config->memqcache_auto_cache_invalidation)
3500 {
3501 /*
3502 * If we are not inside a transaction, we can
3503 * immediately invalidate query cache.
3504 */
3505 if (state == 'I')
3506 {
3507 POOL_SETMASK2(&BlockSig, &oldmask);
3508 pool_shmem_lock();
3509 pool_invalidate_query_cache(num_oids, oids, true, 0);
3510 pool_shmem_unlock();
3511 POOL_SETMASK(&oldmask);
3512 pool_reset_memqcache_buffer(true);
3513 }
3514 else
3515 {
3516 /*
3517 * If we are inside a transaction, we
3518 * cannot invalidate query cache
3519 * yet. However we can clear cache buffer,
3520 * if DML/DDL modifies the TABLE which SELECT uses.
3521 */
3522 pool_check_and_discard_cache_buffer(num_oids, oids);
3523 pool_reset_memqcache_buffer(false);
3524 }
3525 }
3526 else if (num_oids == 0)
3527 {
3528 /*
3529 * It is also necessary to clear cache buffers in case of
3530 * no oid queries (like BEGIN, CHECKPOINT, VACUUM, etc) too.
3531 */
3532 pool_reset_memqcache_buffer(true);
3533 }
3534 }
3535 }
3536 }
3537
3538 /*
3539 * Create and initialize query cache stats
3540 */
3541 static POOL_QUERY_CACHE_STATS *stats;
pool_init_memqcache_stats(void)3542 int pool_init_memqcache_stats(void)
3543 {
3544 stats = pool_shared_memory_create(sizeof(POOL_QUERY_CACHE_STATS));
3545 pool_reset_memqcache_stats();
3546 return 0;
3547 }
3548
3549 /*
3550 * Returns copy of stats area. The copy is in static area and will be
3551 * overwritten by next call to this function.
3552 */
pool_get_memqcache_stats(void)3553 POOL_QUERY_CACHE_STATS *pool_get_memqcache_stats(void)
3554 {
3555 static POOL_QUERY_CACHE_STATS mystats;
3556 pool_sigset_t oldmask;
3557
3558 memset(&mystats, 0, sizeof(POOL_QUERY_CACHE_STATS));
3559
3560 if (stats)
3561 {
3562 POOL_SETMASK2(&BlockSig, &oldmask);
3563 pool_semaphore_lock(QUERY_CACHE_STATS_SEM);
3564 memcpy(&mystats, stats, sizeof(POOL_QUERY_CACHE_STATS));
3565 pool_semaphore_unlock(QUERY_CACHE_STATS_SEM);
3566 POOL_SETMASK(&oldmask);
3567 }
3568
3569 return &mystats;
3570 }
3571
3572 /*
3573 * Reset query cache stats. Caller must lock QUERY_CACHE_STATS_SEM if
3574 * necessary.
3575 */
pool_reset_memqcache_stats(void)3576 void pool_reset_memqcache_stats(void)
3577 {
3578 memset(stats, 0, sizeof(POOL_QUERY_CACHE_STATS));
3579 stats->start_time = time(NULL);
3580 }
3581
3582 /*
3583 * Count up number of successful SELECTs and returns the number.
3584 * QUERY_CACHE_STATS_SEM lock is acquired in this function.
3585 */
pool_stats_count_up_num_selects(long long int num)3586 long long int pool_stats_count_up_num_selects(long long int num)
3587 {
3588 pool_sigset_t oldmask;
3589
3590 POOL_SETMASK2(&BlockSig, &oldmask);
3591 pool_semaphore_lock(QUERY_CACHE_STATS_SEM);
3592 stats->num_selects += num;
3593 pool_semaphore_unlock(QUERY_CACHE_STATS_SEM);
3594 POOL_SETMASK(&oldmask);
3595 return stats->num_selects;
3596 }
3597
3598 /*
3599 * Count up number of successful SELECTs in temporary area and returns
3600 * the number.
3601 */
pool_tmp_stats_count_up_num_selects(void)3602 long long int pool_tmp_stats_count_up_num_selects(void)
3603 {
3604 POOL_SESSION_CONTEXT *session_context;
3605
3606 session_context = pool_get_session_context(false);
3607 session_context->num_selects++;
3608 return session_context->num_selects;
3609 }
3610
3611 /*
3612 * Return number of successful SELECTs in temporary area.
3613 */
pool_tmp_stats_get_num_selects(void)3614 long long int pool_tmp_stats_get_num_selects(void)
3615 {
3616 POOL_SESSION_CONTEXT *session_context;
3617
3618 session_context = pool_get_session_context(false);
3619 return session_context->num_selects;
3620 }
3621
3622 /*
3623 * Reset number of successful SELECTs in temporary area.
3624 */
pool_tmp_stats_reset_num_selects(void)3625 void pool_tmp_stats_reset_num_selects(void)
3626 {
3627 POOL_SESSION_CONTEXT *session_context;
3628
3629 session_context = pool_get_session_context(false);
3630 session_context->num_selects = 0;
3631 }
3632
3633 /*
3634 * Count up number of SELECTs extracted from cache returns the number.
3635 * QUERY_CACHE_STATS_SEM lock is acquired in this function.
3636 */
pool_stats_count_up_num_cache_hits(void)3637 long long int pool_stats_count_up_num_cache_hits(void)
3638 {
3639 pool_sigset_t oldmask;
3640
3641 POOL_SETMASK2(&BlockSig, &oldmask);
3642 pool_semaphore_lock(QUERY_CACHE_STATS_SEM);
3643 stats->num_cache_hits++;
3644 pool_semaphore_unlock(QUERY_CACHE_STATS_SEM);
3645 POOL_SETMASK(&oldmask);
3646 return stats->num_cache_hits;
3647 }
3648
3649 /*
3650 * On shared memory hash table implementation. We use sub part of md5
3651 * hash key as hash function. The experiment has shown that has_any()
3652 * of PostgreSQL is a little bit better than the method using part of
3653 * md5 hash value, but it seems adding some cpu cycles to call
3654 * hash_any() is not worth the trouble.
3655 */
3656
3657 static volatile POOL_HASH_HEADER *hash_header;
3658 static volatile POOL_HASH_ELEMENT *hash_elements;
3659 static volatile POOL_HASH_ELEMENT *hash_free;
3660
3661 /*
3662 * Initialize hash table on shared memory "nelements" is max number of
3663 * hash keys. The actual number of hash key is rounded up to power of
3664 * 2.
3665 */
3666 #undef POOL_HASH_DEBUG
3667
pool_hash_init(int nelements)3668 int pool_hash_init(int nelements)
3669 {
3670 size_t size;
3671 int nelements2; /* number of rounded up hash keys */
3672 int shift;
3673 uint32 mask;
3674 POOL_HASH_HEADER hh;
3675 int i;
3676
3677 if (nelements <= 0)
3678 ereport(ERROR,
3679 (errmsg("initializing hash table on shared memory, invalid number of elements: %d",nelements)));
3680
3681 /* Round up to power of 2 */
3682 shift = 32;
3683 nelements2 = 1;
3684 do
3685 {
3686 nelements2 <<= 1;
3687 shift--;
3688 } while (nelements2 < nelements);
3689
3690 mask = ~0;
3691 mask >>= shift;
3692 size = (char *)&hh.elements - (char *)&hh + sizeof(POOL_HEADER_ELEMENT)*nelements2;
3693 hash_header = pool_shared_memory_create(size);
3694 hash_header->nhash = nelements2;
3695 hash_header->mask = mask;
3696
3697 #ifdef POOL_HASH_DEBUG
3698 ereport(LOG,
3699 (errmsg("initializing hash table on shared memory"),
3700 errdetail("size:%zd nelements2:%d", size, nelements2)));
3701
3702 #endif
3703
3704 size = sizeof(POOL_HASH_ELEMENT)*nelements2;
3705 hash_elements = pool_shared_memory_create(size);
3706
3707 #ifdef POOL_HASH_DEBUG
3708 ereport(LOG,
3709 (errmsg("initializing hash table on shared memory"),
3710 errdetail("size:%zd nelements2:%d", size, nelements2)));
3711 #endif
3712
3713 for (i=0;i<nelements2-1;i++)
3714 {
3715 hash_elements[i].next = (POOL_HASH_ELEMENT *)&hash_elements[i+1];
3716 }
3717 hash_elements[nelements2-1].next = NULL;
3718 hash_free = hash_elements;
3719
3720 return 0;
3721 }
3722
3723 /*
3724 * Reset hash table on shared memory "nelements" is max number of
3725 * hash keys. The actual number of hash key is rounded up to power of
3726 * 2.
3727 */
3728 static int
pool_hash_reset(int nelements)3729 pool_hash_reset(int nelements)
3730 {
3731 size_t size;
3732 int nelements2; /* number of rounded up hash keys */
3733 int shift;
3734 uint32 mask;
3735 POOL_HASH_HEADER hh;
3736 int i;
3737
3738 if (nelements <= 0)
3739 ereport(ERROR,
3740 (errmsg("clearing hash table on shared memory, invalid number of elements: %d",nelements)));
3741
3742 /* Round up to power of 2 */
3743 shift = 32;
3744 nelements2 = 1;
3745 do
3746 {
3747 nelements2 <<= 1;
3748 shift--;
3749 } while (nelements2 < nelements);
3750
3751 mask = ~0;
3752 mask >>= shift;
3753
3754 size = (char *)&hh.elements - (char *)&hh + sizeof(POOL_HEADER_ELEMENT)*nelements2;
3755 memset((void *)hash_header, 0, size);
3756
3757 hash_header->nhash = nelements2;
3758 hash_header->mask = mask;
3759
3760 size = sizeof(POOL_HASH_ELEMENT)*nelements2;
3761 memset((void *)hash_elements, 0, size);
3762
3763 for (i=0;i<nelements2-1;i++)
3764 {
3765 hash_elements[i].next = (POOL_HASH_ELEMENT *)&hash_elements[i+1];
3766 }
3767 hash_elements[nelements2-1].next = NULL;
3768 hash_free = hash_elements;
3769
3770 return 0;
3771 }
3772
3773 /*
3774 * Search cacheid by MD5 hash key string
3775 * If found, returns cache id, otherwise NULL.
3776 */
pool_hash_search(POOL_QUERY_HASH * key)3777 POOL_CACHEID *pool_hash_search(POOL_QUERY_HASH *key)
3778 {
3779 volatile POOL_HASH_ELEMENT *element;
3780
3781 uint32 hash_key = create_hash_key(key);
3782
3783 if (hash_key >= hash_header->nhash)
3784 {
3785 ereport(WARNING,
3786 (errmsg("memcache: searching cacheid from hash. invalid hash key"),
3787 errdetail("invalid hash key: %uld nhash: %ld",
3788 hash_key, hash_header->nhash)));
3789 return NULL;
3790 }
3791
3792 {
3793 char md5[POOL_MD5_HASHKEYLEN+1];
3794 memcpy(md5, key->query_hash, POOL_MD5_HASHKEYLEN);
3795 md5[POOL_MD5_HASHKEYLEN] = '\0';
3796 #ifdef POOL_HASH_DEBUG
3797 ereport(LOG,
3798 (errmsg("searching hash table"),
3799 errdetail("hash_key:%d md5:%s", hash_key, md5)));
3800 #endif
3801 }
3802
3803 element = hash_header->elements[hash_key].element;
3804 while (element)
3805 {
3806 {
3807 char md5[POOL_MD5_HASHKEYLEN+1];
3808 memcpy(md5, key->query_hash, POOL_MD5_HASHKEYLEN);
3809 md5[POOL_MD5_HASHKEYLEN] = '\0';
3810 #ifdef POOL_HASH_DEBUG
3811 ereport(LOG,
3812 (errmsg("searching hash table"),
3813 errdetail("element md5:%s", md5)));
3814 #endif
3815 }
3816
3817 if (memcmp((const void *)element->hashkey.query_hash,
3818 (const void *)key->query_hash, sizeof(key->query_hash)) == 0)
3819 {
3820 return (POOL_CACHEID *)&element->cacheid;
3821 }
3822 element = element->next;
3823 }
3824 return NULL;
3825 }
3826
3827 /*
3828 * Insert MD5 key and associated cache id into shmem hash table. If
3829 * "update" is true, replace cacheid associated with the MD5 key,
3830 * rather than throw an error.
3831 */
pool_hash_insert(POOL_QUERY_HASH * key,POOL_CACHEID * cacheid,bool update)3832 static int pool_hash_insert(POOL_QUERY_HASH *key, POOL_CACHEID *cacheid, bool update)
3833 {
3834 POOL_HASH_ELEMENT *element;
3835 POOL_HASH_ELEMENT *new_element;
3836
3837 uint32 hash_key = create_hash_key(key);
3838
3839 if (hash_key >= hash_header->nhash)
3840 {
3841 ereport(WARNING,
3842 (errmsg("memcache: adding cacheid to hash. invalid hash key"),
3843 errdetail("invalid hash key: %uld nhash: %ld",
3844 hash_key, hash_header->nhash)));
3845 return -1;
3846 }
3847
3848 {
3849 char md5[POOL_MD5_HASHKEYLEN+1];
3850 memcpy(md5, key->query_hash, POOL_MD5_HASHKEYLEN);
3851 md5[POOL_MD5_HASHKEYLEN] = '\0';
3852 #ifdef POOL_HASH_DEBUG
3853 ereport(LOG,
3854 (errmsg("searching hash table"),
3855 errdetail("hash_key:%d md5:%s block:%d item:%d", hash_key, md5, cacheid->blockid, cacheid->itemid)));
3856 #endif
3857 }
3858
3859 /*
3860 * Look for hash key.
3861 */
3862 element = hash_header->elements[hash_key].element;
3863
3864 while (element)
3865 {
3866 if (memcmp((const void *)element->hashkey.query_hash,
3867 (const void *)key->query_hash, sizeof(key->query_hash)) == 0)
3868 {
3869 /* Hash key found. If "update" is false, just throw an error. */
3870 char md5[POOL_MD5_HASHKEYLEN+1];
3871
3872 if (!update)
3873 {
3874 memcpy(md5, key->query_hash, POOL_MD5_HASHKEYLEN);
3875 md5[POOL_MD5_HASHKEYLEN] = '\0';
3876 ereport(LOG,
3877 (errmsg("memcache: adding cacheid to hash. hash key:\"%s\" already exists",md5)));
3878 return -1;
3879 }
3880 else
3881 {
3882 /* Update cache id */
3883 memcpy((void *)&element->cacheid, cacheid, sizeof(POOL_CACHEID));
3884 return 0;
3885 }
3886 }
3887 element = element->next;
3888 }
3889
3890 /*
3891 * Ok, same key did not exist. Just insert new hash key.
3892 */
3893 new_element = (POOL_HASH_ELEMENT *)get_new_hash_element();
3894 if (!new_element)
3895 {
3896 ereport(LOG,
3897 (errmsg("memcache: adding cacheid to hash. failed to get new element")));
3898 return -1;
3899 }
3900
3901 element = hash_header->elements[hash_key].element;
3902
3903 hash_header->elements[hash_key].element = new_element;
3904 new_element->next = element;
3905
3906 memcpy((void *)new_element->hashkey.query_hash, key->query_hash, POOL_MD5_HASHKEYLEN);
3907 memcpy((void *)&new_element->cacheid, cacheid, sizeof(POOL_CACHEID));
3908
3909 return 0;
3910 }
3911
3912 /*
3913 * Delete MD5 key and associated cache id from shmem hash table.
3914 */
pool_hash_delete(POOL_QUERY_HASH * key)3915 int pool_hash_delete(POOL_QUERY_HASH *key)
3916 {
3917 POOL_HASH_ELEMENT *element;
3918 POOL_HASH_ELEMENT **delete_point;
3919 bool found;
3920
3921 uint32 hash_key = create_hash_key(key);
3922
3923 if (hash_key >= hash_header->nhash)
3924 {
3925 ereport(LOG,
3926 (errmsg("memcache: deleting key from hash. invalid key"),
3927 errdetail("invalid hash key: %uld nhash: %ld",
3928 hash_key, hash_header->nhash)));
3929 return -1;
3930 }
3931
3932 /*
3933 * Look for delete location
3934 */
3935 found = false;
3936 delete_point = (POOL_HASH_ELEMENT **)&(hash_header->elements[hash_key].element);
3937 element = hash_header->elements[hash_key].element;
3938
3939 while (element)
3940 {
3941 if (memcmp(element->hashkey.query_hash, key->query_hash, sizeof(key->query_hash)) == 0)
3942 {
3943 found = true;
3944 break;
3945 }
3946 delete_point = &element->next;
3947 element = element->next;
3948 }
3949
3950 if (!found)
3951 {
3952 char md5[POOL_MD5_HASHKEYLEN+1];
3953
3954 memcpy(md5, key->query_hash, POOL_MD5_HASHKEYLEN);
3955 md5[POOL_MD5_HASHKEYLEN] = '\0';
3956 ereport(LOG,
3957 (errmsg("memcache: deleting key from hash. key:\"%s\" not found",md5)));
3958 return -1;
3959 }
3960
3961 /*
3962 * Put back the element to free list
3963 */
3964 *delete_point = element->next;
3965 put_back_hash_element(element);
3966
3967 return 0;
3968 }
3969
3970 /*
3971 * Calculate 32bit binary hash key (i.e. location in hash header) from MD5
3972 * string. We use top most 8 characters of MD5 string for calculation.
3973 */
create_hash_key(POOL_QUERY_HASH * key)3974 static uint32 create_hash_key(POOL_QUERY_HASH *key)
3975 {
3976 #define POOL_HASH_NCHARS 8
3977
3978 char md5[POOL_HASH_NCHARS+1];
3979 uint32 mask;
3980
3981 memcpy(md5, key->query_hash, POOL_HASH_NCHARS);
3982 md5[POOL_HASH_NCHARS] = '\0';
3983 mask = strtoul(md5, NULL, 16);
3984 mask &= hash_header->mask;
3985 return mask;
3986 }
3987
3988 /*
3989 * Get new free hash element from free list.
3990 */
get_new_hash_element(void)3991 static volatile POOL_HASH_ELEMENT *get_new_hash_element(void)
3992 {
3993 volatile POOL_HASH_ELEMENT *elm;
3994
3995 if (!hash_free->next)
3996 {
3997 /* No free element */
3998 return NULL;
3999 }
4000
4001 #ifdef POOL_HASH_DEBUG
4002 ereport(LOG,
4003 (errmsg("getting new hash element"),
4004 errdetail("hash_free->next:%p hash_free->next->next:%p",
4005 hash_free->next, hash_free->next->next)));
4006 #endif
4007
4008 elm = hash_free->next;
4009 hash_free->next = elm->next;
4010
4011 return elm;
4012 }
4013
4014 /*
4015 * Put back hash element to free list.
4016 */
put_back_hash_element(volatile POOL_HASH_ELEMENT * element)4017 static void put_back_hash_element(volatile POOL_HASH_ELEMENT *element)
4018 {
4019 POOL_HASH_ELEMENT *elm;
4020
4021 #ifdef POOL_HASH_DEBUG
4022 ereport(LOG,
4023 (errmsg("getting new hash element"),
4024 errdetail("hash_free->next:%p hash_free->next->next:%p",
4025 hash_free->next, hash_free->next->next)));
4026 #endif
4027
4028 elm = hash_free->next;
4029 hash_free->next = (POOL_HASH_ELEMENT *)element;
4030 element->next = elm;
4031 }
4032
4033 /*
4034 * Return true if there's a free hash element.
4035 */
is_free_hash_element(void)4036 static bool is_free_hash_element(void)
4037 {
4038 return hash_free->next != NULL;
4039 }
4040
4041 /*
4042 * Returns shared memory cache stats.
4043 * Subsequent call to this function will break return value
4044 * because its in static memory.
4045 * Caller must hold shmem_lock before calling this function.
4046 * If in memory query cache is not enabled, all stats are 0.
4047 */
pool_get_shmem_storage_stats(void)4048 POOL_SHMEM_STATS *pool_get_shmem_storage_stats(void)
4049 {
4050 static POOL_SHMEM_STATS mystats;
4051 POOL_HASH_ELEMENT *element;
4052 int nblocks;
4053 int i;
4054
4055 memset(&mystats, 0, sizeof(POOL_SHMEM_STATS));
4056
4057 if (!pool_config-> memory_cache_enabled)
4058 return &mystats;
4059
4060 /*
4061 * Copy cache hit data
4062 */
4063 mystats.cache_stats.num_selects = stats->num_selects;
4064 mystats.cache_stats.num_cache_hits = stats->num_cache_hits;
4065
4066 if (pool_config->memqcache_method != SHMEM_CACHE)
4067 return &mystats;
4068
4069 /* number of total hash entries */
4070 mystats.num_hash_entries = hash_header->nhash;
4071
4072 /* number of used hash entries */
4073 for (i=0;i<hash_header->nhash;i++)
4074 {
4075 element = hash_header->elements[i].element;
4076 while (element)
4077 {
4078 mystats.used_hash_entries++;
4079 element = element->next;
4080 }
4081 }
4082
4083 nblocks = pool_get_memqcache_blocks();
4084
4085 for (i=0;i<nblocks;i++)
4086 {
4087 POOL_CACHE_BLOCK_HEADER *bh;
4088 POOL_CACHE_ITEM_POINTER *cip;
4089 char *p = block_address(i);
4090 bh = (POOL_CACHE_BLOCK_HEADER *)p;
4091 int j;
4092
4093 if (bh->flags & POOL_BLOCK_USED)
4094 {
4095 for (j=0;j<bh->num_items;j++)
4096 {
4097 cip = item_pointer(p, j);
4098 if (POOL_ITEM_DELETED & cip->flags)
4099 {
4100 mystats.fragment_cache_entries_size += item_header(p, j)->total_length;
4101 }
4102 else
4103 {
4104 /* number of used cache entries */
4105 mystats.num_cache_entries++;
4106 /* total size of used cache entries */
4107 mystats.used_cache_entries_size += (item_header(p, j)->total_length + sizeof(POOL_CACHE_ITEM_POINTER));
4108 }
4109 }
4110 mystats.used_cache_entries_size += sizeof(POOL_CACHE_BLOCK_HEADER);
4111 /* total size of free(usable) cache entries */
4112 mystats.free_cache_entries_size += bh->free_bytes;
4113 }
4114 else
4115 {
4116 mystats.free_cache_entries_size += pool_config->memqcache_cache_block_size;
4117 }
4118 }
4119
4120 /*
4121 * Copy POOL_QUERY_CACHE_STATS
4122 */
4123 memcpy(&mystats.cache_stats, stats, sizeof(mystats.cache_stats));
4124
4125 return &mystats;
4126 }
4127
4128 /*
4129 * Inject cached message to the target backend buffer to pretend as if backend
4130 * actually replies with Data row and Command Complete message.
4131 */
inject_cached_message(POOL_CONNECTION * backend,char * qcache,int qcachelen)4132 static void inject_cached_message(POOL_CONNECTION *backend, char *qcache, int qcachelen)
4133 {
4134 char kind;
4135 int len;
4136 char *buf;
4137 int timeout;
4138 int i = 0;
4139 bool is_prepared_stmt = false;
4140 POOL_SESSION_CONTEXT *session_context;
4141 POOL_QUERY_CONTEXT* query_context;
4142 POOL_PENDING_MESSAGE *msg;
4143
4144 session_context = pool_get_session_context(false);
4145 query_context = session_context->query_context;
4146 msg = pool_pending_message_find_lastest_by_query_context(query_context);
4147
4148 if (msg)
4149 {
4150 /*
4151 * If pending message found, we should extract target backend from it
4152 */
4153 int backend_id;
4154
4155 backend_id = pool_pending_message_get_target_backend_id(msg);
4156 backend = CONNECTION(session_context->backend, backend_id);
4157 timeout = -1;
4158 }
4159 else
4160 timeout = 0;
4161
4162 /* Send flush messsage to backend to retrieve response of backend */
4163 pool_write(backend, "H", 1);
4164 len = htonl(sizeof(len));
4165 pool_write_and_flush(backend, &len, sizeof(len));
4166
4167 /*
4168 * Push any response from backend
4169 */
4170 for(;;)
4171 {
4172 /* check if there's any pending data */
4173 if (!pool_ssl_pending(backend) && pool_read_buffer_is_empty(backend))
4174 {
4175 pool_set_timeout(timeout);
4176 if (pool_check_fd(backend) != 0)
4177 {
4178 ereport(DEBUG1,
4179 (errmsg("inject_cached_message: select shows no pending data")));
4180 pool_set_timeout(-1);
4181 break;
4182 }
4183 pool_set_timeout(-1);
4184 }
4185
4186 pool_read(backend, &kind, 1);
4187 ereport(DEBUG1,
4188 (errmsg("inject_cached_message: push message kind: '%c'", kind)));
4189 if (msg &&
4190 ((kind == 'T' && msg->type == POOL_DESCRIBE) ||
4191 (kind == '2' && msg->type == POOL_BIND)))
4192 {
4193 /* Pending message seen. Now it is likely to end of pending data */
4194 timeout = 0;
4195 }
4196 pool_push(backend, &kind, sizeof(kind));
4197 pool_read(backend, &len, sizeof(len));
4198 pool_push(backend, &len, sizeof(len));
4199 if ((ntohl(len)-sizeof(len)) > 0)
4200 {
4201 buf = pool_read2(backend, ntohl(len)-sizeof(len));
4202 pool_push(backend, buf, ntohl(len)-sizeof(len));
4203 }
4204 }
4205
4206 /*
4207 * Inject row data and command complete
4208 */
4209 while (i < qcachelen)
4210 {
4211 char tmpkind;
4212 int tmplen;
4213 char *p;
4214
4215 tmpkind = qcache[i];
4216 i++;
4217
4218 memcpy(&tmplen, qcache+i, sizeof(tmplen));
4219 i += sizeof(tmplen);
4220 len = ntohl(tmplen);
4221 p = qcache + i;
4222 i += len - sizeof(tmplen);
4223
4224 /* No need to cache PARSE and BIND responses */
4225 if (tmpkind == '1' || tmpkind == '2')
4226 {
4227 is_prepared_stmt = true;
4228 continue;
4229 }
4230
4231 /*
4232 * In the prepared statement execution, there is no need to send
4233 * 'T' response to the frontend.
4234 */
4235 if (is_prepared_stmt && tmpkind == 'T')
4236 {
4237 continue;
4238 }
4239
4240 /* push message */
4241 ereport(DEBUG1,
4242 (errmsg("inject_cached_message: push cached messages: '%c' len: %d", tmpkind, len)));
4243 pool_push(backend, &tmpkind, 1);
4244 pool_push(backend, &tmplen, sizeof(tmplen));
4245 if (len > 0)
4246 pool_push(backend, p, len - sizeof(tmplen));
4247 }
4248
4249 /*
4250 * Pop data.
4251 */
4252 pool_pop(backend, &len);
4253 }
4254