1 /* -*-pgsql-c-*- */
2 /*
3 * pgpool: a language independent connection pool server for PostgreSQL
4 * written by Tatsuo Ishii
5 *
6 * Copyright (c) 2003-2018 PgPool Global Development Group
7 *
8 * Permission to use, copy, modify, and distribute this software and
9 * its documentation for any purpose and without fee is hereby
10 * granted, provided that the above copyright notice appear in all
11 * copies and that both that copyright notice and this permission
12 * notice appear in supporting documentation, and that the name of the
13 * author not be used in advertising or publicity pertaining to
14 * distribution of the software without specific, written prior
15 * permission. The author makes no representations about the
16 * suitability of this software for any purpose. It is provided "as
17 * is" without express or implied warranty.
18 *
19 * pool_memqcache.c: query cache on shmem or memcached
20 *
21 */
22 #define DATABASE_TO_OID_QUERY "SELECT oid FROM pg_database WHERE datname = '%s'"
23
24 #include "pool.h"
25
26 #include <stdio.h>
27 #include <string.h>
28 #include <unistd.h>
29 #include <sys/file.h>
30 #include <sys/stat.h>
31 #include <sys/types.h>
32 #include <fcntl.h>
33 #include <errno.h>
34 #include <string.h>
35 #include <stdlib.h>
36 #include <ctype.h>
37 #include <arpa/inet.h>
38 #include <dirent.h>
39
40 #ifdef USE_MEMCACHED
41 #include <libmemcached/memcached.h>
42 #endif
43
44 #include "auth/md5.h"
45 #include "pool_config.h"
46 #include "protocol/pool_proto_modules.h"
47 #include "parser/parsenodes.h"
48 #include "context/pool_session_context.h"
49 #include "query_cache/pool_memqcache.h"
50 #include "utils/pool_relcache.h"
51 #include "utils/pool_select_walker.h"
52 #include "utils/pool_stream.h"
53 #include "utils/pool_stream.h"
54 #include "utils/elog.h"
55 #include "utils/palloc.h"
56 #include "utils/memutils.h"
57
58
59 #ifdef USE_MEMCACHED
60 memcached_st *memc;
61 #endif
62
63 static char* encode_key(const char *s, char *buf, POOL_CONNECTION_POOL *backend);
64 #ifdef DEBUG
65 static void dump_cache_data(const char *data, size_t len);
66 #endif
67 static int pool_commit_cache(POOL_CONNECTION_POOL *backend, char *query, char *data, size_t datalen, int num_oids, int *oids);
68 static int pool_fetch_cache(POOL_CONNECTION_POOL *backend, const char *query, char **buf, size_t *len);
69 static int send_cached_messages(POOL_CONNECTION *frontend, const char *qcache, int qcachelen);
70 static void send_message(POOL_CONNECTION *conn, char kind, int len, const char *data);
71 #ifdef USE_MEMCACHED
72 static int delete_cache_on_memcached(const char *key);
73 #endif
74 static int pool_get_dml_table_oid(int **oid);
75 static int pool_get_dropdb_table_oids(int **oids, int dboid);
76 static void pool_discard_dml_table_oid(void);
77 static void pool_invalidate_query_cache(int num_table_oids, int *table_oid, bool unlink, int dboid);
78
79 static int pool_get_database_oid(void);
80 static void pool_add_table_oid_map(POOL_CACHEKEY *cachkey, int num_table_oids, int *table_oids);
81 static void pool_reset_memqcache_buffer(bool reset_dml_oids);
82 static POOL_CACHEID *pool_add_item_shmem_cache(POOL_QUERY_HASH *query_hash, char *data, int size);
83 static POOL_CACHEID *pool_find_item_on_shmem_cache(POOL_QUERY_HASH *query_hash);
84 static char *pool_get_item_shmem_cache(POOL_QUERY_HASH *query_hash, int *size, int *sts);
85 static POOL_QUERY_CACHE_ARRAY * pool_add_query_cache_array(POOL_QUERY_CACHE_ARRAY *cache_array, POOL_TEMP_QUERY_CACHE *cache);
86 static void pool_add_temp_query_cache(POOL_TEMP_QUERY_CACHE *temp_cache, char kind, char *data, int data_len);
87 static void pool_add_oids_temp_query_cache(POOL_TEMP_QUERY_CACHE *temp_cache, int num_oids, int *oids);
88 static POOL_INTERNAL_BUFFER *pool_create_buffer(void);
89 static void pool_discard_buffer(POOL_INTERNAL_BUFFER *buffer);
90 static void pool_add_buffer(POOL_INTERNAL_BUFFER *buffer, void *data, size_t len);
91 static void *pool_get_buffer(POOL_INTERNAL_BUFFER *buffer, size_t *len);
92 #ifdef NOT_USED
93 static char *pool_get_buffer_pointer(POOL_INTERNAL_BUFFER *buffer);
94 #endif
95 static char *pool_get_current_cache_buffer(size_t *len);
96 static size_t pool_get_buffer_length(POOL_INTERNAL_BUFFER *buffer);
97 static void pool_check_and_discard_cache_buffer(int num_oids, int *oids);
98
99 static void pool_set_memqcache_blocks(int num_blocks);
100 static int pool_get_memqcache_blocks(void);
101 static void *pool_memory_cache_address(void);
102 static void pool_reset_fsmm(size_t size);
103 static void *pool_fsmm_address(void);
104 static void pool_update_fsmm(POOL_CACHE_BLOCKID blockid, size_t free_space);
105 static POOL_CACHE_BLOCKID pool_get_block(size_t free_space);
106 static POOL_CACHE_ITEM_HEADER *pool_cache_item_header(POOL_CACHEID *cacheid);
107 static int pool_init_cache_block(POOL_CACHE_BLOCKID blockid);
108 #if NOT_USED
109 static void pool_wipe_out_cache_block(POOL_CACHE_BLOCKID blockid);
110 #endif
111 static int pool_delete_item_shmem_cache(POOL_CACHEID *cacheid);
112 static char *block_address(int blockid);
113 static POOL_CACHE_ITEM_POINTER *item_pointer(char *block, int i);
114 static POOL_CACHE_ITEM_HEADER *item_header(char *block, int i);
115 static POOL_CACHE_BLOCKID pool_reuse_block(void);
116 #ifdef SHMEMCACHE_DEBUG
117 static void dump_shmem_cache(POOL_CACHE_BLOCKID blockid);
118 #endif
119
120 static int pool_hash_reset(int nelements);
121 static int pool_hash_insert(POOL_QUERY_HASH *key, POOL_CACHEID *cacheid, bool update);
122 static uint32 create_hash_key(POOL_QUERY_HASH *key);
123 static volatile POOL_HASH_ELEMENT *get_new_hash_element(void);
124 static void put_back_hash_element(volatile POOL_HASH_ELEMENT *element);
125 static bool is_free_hash_element(void);
126 static void inject_cached_message(POOL_CONNECTION *backend, char *qcache, int qcachelen);
127
128 /*
129 * Connect to Memcached
130 */
memcached_connect(void)131 int memcached_connect(void)
132 {
133 char *memqcache_memcached_host;
134 int memqcache_memcached_port;
135 #ifdef USE_MEMCACHED
136 memcached_server_st *servers;
137 memcached_return rc;
138
139 /* Already connected? */
140 if (memc)
141 {
142 return 0;
143 }
144 #endif
145
146 memqcache_memcached_host = pool_config->memqcache_memcached_host;
147 memqcache_memcached_port = pool_config->memqcache_memcached_port;
148
149 ereport(DEBUG1,
150 (errmsg("connecting to memcached on Host:\"%s:%d\"", memqcache_memcached_host,memqcache_memcached_port)));
151
152 #ifdef USE_MEMCACHED
153 memc = memcached_create(NULL);
154 servers = memcached_server_list_append(NULL,
155 memqcache_memcached_host,
156 memqcache_memcached_port,
157 &rc);
158
159 rc = memcached_server_push(memc, servers);
160 if (rc != MEMCACHED_SUCCESS)
161 {
162 ereport(WARNING,
163 (errmsg("failed to connect to memcached, server push error:\"%s\"\n", memcached_strerror(memc, rc))));
164 memc = (memcached_st *)-1;
165 return -1;
166 }
167 memcached_server_list_free(servers);
168 #else
169 ereport(WARNING,
170 (errmsg("failed to connect to memcached, memcached support is not enabled")));
171 return -1;
172 #endif
173 return 0;
174 }
175
176 /*
177 * Disconnect to Memcached
178 */
memcached_disconnect(void)179 void memcached_disconnect (void)
180 {
181 #ifdef USE_MEMCACHED
182 if (!memc)
183 {
184 return;
185 }
186 memcached_free(memc);
187 #else
188 ereport(WARNING,
189 (errmsg("failed to disconnect from memcached, memcached support is not enabled")));
190 #endif
191 }
192
193 /*
194 * Register buffer data for query cache in memory cache
195 */
memqcache_register(char kind,POOL_CONNECTION * frontend,char * data,int data_len)196 void memqcache_register(char kind,
197 POOL_CONNECTION *frontend,
198 char *data,
199 int data_len)
200 {
201 POOL_TEMP_QUERY_CACHE *cache;
202 POOL_SESSION_CONTEXT *session_context;
203 POOL_QUERY_CONTEXT *query_context;
204
205 cache = pool_get_current_cache();
206
207 if (cache == NULL)
208 {
209 session_context = pool_get_session_context(true);
210
211 if (session_context && pool_is_query_in_progress())
212 {
213 char *query = pool_get_query_string();
214 query_context = session_context->query_context;
215
216 if (query)
217 query_context->temp_cache = pool_create_temp_query_cache(query);
218 }
219 }
220
221 pool_add_temp_query_cache(cache, kind, data, data_len);
222 }
223
224 /*
225 * Commit SELECT results to cache storage.
226 */
pool_commit_cache(POOL_CONNECTION_POOL * backend,char * query,char * data,size_t datalen,int num_oids,int * oids)227 static int pool_commit_cache(POOL_CONNECTION_POOL *backend, char *query, char *data, size_t datalen, int num_oids, int *oids)
228 {
229 #ifdef USE_MEMCACHED
230 memcached_return rc;
231 #endif
232 POOL_CACHEKEY cachekey;
233 char tmpkey[MAX_KEY];
234 time_t memqcache_expire;
235
236 /*
237 * get_buflen() will return -1 if query result exceeds memqcache_maxcache
238 */
239 if (datalen == -1)
240 {
241 return -1;
242 }
243
244 /* query disabled */
245 if (strlen(query) <= 0)
246 {
247 return -1;
248 }
249 ereport(DEBUG1,
250 (errmsg("commiting SELECT results to cache storage"),
251 errdetail("Query=\"%s\"", query)));
252
253 #ifdef DEBUG
254 dump_cache_data(data, datalen);
255 #endif
256
257 /* encode md5key for memcached */
258 encode_key(query, tmpkey, backend);
259 ereport(DEBUG2,
260 (errmsg("commiting SELECT results to cache storage"),
261 errdetail("search key : \"%s\"", tmpkey)));
262
263 memcpy(cachekey.hashkey, tmpkey, 32);
264
265 memqcache_expire = pool_config->memqcache_expire;
266 ereport(DEBUG1,
267 (errmsg("commiting SELECT results to cache storage"),
268 errdetail("memqcache_expire = %ld", memqcache_expire)));
269
270 if (pool_is_shmem_cache())
271 {
272 POOL_CACHEID *cacheid;
273 POOL_QUERY_HASH query_hash;
274
275 memcpy(query_hash.query_hash, tmpkey, sizeof(query_hash.query_hash));
276
277 cacheid = pool_hash_search(&query_hash);
278
279 if (cacheid != NULL)
280 {
281 ereport(DEBUG1,
282 (errmsg("commiting SELECT results to cache storage"),
283 errdetail("item already exists")));
284
285 return 0;
286 }
287 else
288 {
289 cacheid = pool_add_item_shmem_cache(&query_hash, data, datalen);
290 if (cacheid == NULL)
291 {
292 ereport(LOG,
293 (errmsg("failed to add item to shmem cache")));
294 return -1;
295 }
296 else
297 {
298 ereport(DEBUG2,
299 (errmsg("commiting SELECT results to cache storage"),
300 errdetail("blockid: %d itemid: %d",
301 cacheid->blockid, cacheid->itemid)));
302 }
303 cachekey.cacheid.blockid = cacheid->blockid;
304 cachekey.cacheid.itemid = cacheid->itemid;
305 }
306 }
307
308 #ifdef USE_MEMCACHED
309 else
310 {
311 rc = memcached_set(memc, tmpkey, 32,
312 data, datalen, (time_t)memqcache_expire, 0);
313 if (rc != MEMCACHED_SUCCESS)
314 {
315 ereport(WARNING,
316 (errmsg("cache commit failed with error:\"%s\"",memcached_strerror(memc, rc))));
317 return -1;
318 }
319 ereport(DEBUG1,
320 (errmsg("commiting SELECT results to cache storage"),
321 errdetail("set cache succeeded")));
322 }
323 #endif
324
325 /*
326 * Register cache id to oid map
327 */
328 pool_add_table_oid_map(&cachekey, num_oids, oids);
329
330 return 0;
331 }
332
333 /*
334 * Fetch from memory cache.
335 * Return:
336 * 0: fetch success,
337 * 1: not found
338 */
pool_fetch_cache(POOL_CONNECTION_POOL * backend,const char * query,char ** buf,size_t * len)339 static int pool_fetch_cache(POOL_CONNECTION_POOL *backend, const char *query, char **buf, size_t *len)
340 {
341 char *ptr;
342 char tmpkey[MAX_KEY];
343 int sts;
344 char *p;
345
346 if (strlen(query) <= 0)
347 ereport(ERROR,
348 (errmsg("fetching from cache storage, no query")));
349
350 /* encode md5key for memcached */
351 encode_key(query, tmpkey, backend);
352 ereport(DEBUG1,
353 (errmsg("fetching from cache storage"),
354 errdetail("search key \"%s\"", tmpkey)));
355
356
357 if (pool_is_shmem_cache())
358 {
359 POOL_QUERY_HASH query_hash;
360 int mylen;
361
362 memcpy(query_hash.query_hash, tmpkey, sizeof(query_hash.query_hash));
363
364 ptr = pool_get_item_shmem_cache(&query_hash, &mylen, &sts);
365 if (ptr == NULL)
366 {
367 ereport(DEBUG1,
368 (errmsg("fetching from cache storage"),
369 errdetail("cache not found on shared memory")));
370
371 return 1;
372 }
373 *len = mylen;
374 }
375 #ifdef USE_MEMCACHED
376 else
377 {
378 memcached_return rc;
379 unsigned int flags;
380
381 ptr = memcached_get(memc, tmpkey, strlen(tmpkey), len, &flags, &rc);
382
383 if (rc != MEMCACHED_SUCCESS)
384 {
385 if (rc != MEMCACHED_NOTFOUND)
386 {
387 ereport(LOG,
388 (errmsg("fetching from cache storage, memcached_get failed with error: \"%s\"", memcached_strerror(memc, rc))));
389 /*
390 * Turn off memory cache support to prevent future errors.
391 */
392 pool_config->memory_cache_enabled = 0;
393 /* Behave as if cache not found */
394 return 1;
395 }
396 else
397 {
398 /* Not found */
399 ereport(DEBUG1,
400 (errmsg("fetching from cache storage"),
401 errdetail("cache item not found for key: \"%s\" and query:\"%s\"",tmpkey,query)));
402 return 1;
403 }
404 }
405 }
406 #else
407 else
408 {
409 ereport(ERROR,
410 (errmsg("memcached support is not enabled")));
411 }
412 #endif
413
414 p = palloc(*len);
415
416 memcpy(p, ptr, *len);
417
418 if (!pool_is_shmem_cache())
419 {
420 free(ptr);
421 }
422
423 ereport(DEBUG1,
424 (errmsg("fetching from cache storage"),
425 errdetail("query=\"%s\" len:%zd", query, *len)));
426 #ifdef DEBUG
427 dump_cache_data(p, *len);
428 #endif
429
430 *buf = p;
431
432 return 0;
433 }
434
435 /*
436 * encode key.
437 * create cache key as md5(username + query string + database name)
438 */
encode_key(const char * s,char * buf,POOL_CONNECTION_POOL * backend)439 static char* encode_key(const char *s, char *buf, POOL_CONNECTION_POOL *backend)
440 {
441 char* strkey;
442 int u_length;
443 int d_length;
444 int q_length;
445 int length;
446
447 u_length = strlen(backend->info->user);
448 ereport(DEBUG1,
449 (errmsg("memcache encode key"),
450 errdetail("username: \"%s\" database_name: \"%s\"", backend->info->user,backend->info->database)));
451
452 d_length = strlen(backend->info->database);
453
454 q_length = strlen(s);
455 ereport(DEBUG1,
456 (errmsg("memcache encode key"),
457 errdetail("query: \"%s\"", s)));
458
459 length = u_length + d_length + q_length + 1;
460
461 strkey = (char*)palloc(sizeof(char) * length);
462
463 snprintf(strkey, length, "%s%s%s", backend->info->user, s, backend->info->database);
464
465 pool_md5_hash(strkey, strlen(strkey), buf);
466 ereport(DEBUG1,
467 (errmsg("memcache encode key"),
468 errdetail("`%s' -> `%s'", strkey, buf)));
469 pfree(strkey);
470 return buf;
471 }
472
473 #ifdef DEBUG
474 /*
475 * dump cache data
476 */
dump_cache_data(const char * data,size_t len)477 static void dump_cache_data(const char *data, size_t len)
478 {
479 int i;
480 int plen;
481
482
483 fprintf(stderr,"shmem: len = %zd\n", len);
484
485 while (len > 0)
486 {
487 fprintf(stderr,"shmem: kind:%c\n", *data++);
488 len--;
489 memmove(&plen, data, 4);
490 len -= 4;
491 data += 4;
492 plen = ntohl(plen);
493 fprintf(stderr,"shmem: len:%d\n", plen);
494 plen -= 4;
495
496 fprintf(stderr, "shmem: ");
497 for (i=0;i<plen;i++)
498 {
499 fprintf(stderr, "%02x ", (unsigned char)(*data++));
500 len--;
501 }
502 fprintf(stderr, "\n");
503 }
504 }
505 #endif
506
507 /*
508 * send cached messages
509 */
send_cached_messages(POOL_CONNECTION * frontend,const char * qcache,int qcachelen)510 static int send_cached_messages(POOL_CONNECTION *frontend, const char *qcache, int qcachelen)
511 {
512 int msg = 0;
513 int i = 0;
514 int is_prepared_stmt = 0;
515 int len;
516 const char *p;
517
518 while (i < qcachelen)
519 {
520 char tmpkind;
521 int tmplen;
522
523 tmpkind = qcache[i];
524 i++;
525
526 memcpy(&tmplen, qcache+i, sizeof(tmplen));
527 i += sizeof(tmplen);
528 len = ntohl(tmplen);
529 p = qcache + i;
530 i += len - sizeof(tmplen);
531
532 /* No need to cache PARSE and BIND responses */
533 if (tmpkind == '1' || tmpkind == '2')
534 {
535 is_prepared_stmt = 1;
536 continue;
537 }
538
539 /*
540 * In the prepared statement execution, there is no need to send
541 * 'T' response to the frontend.
542 */
543 if (is_prepared_stmt && tmpkind == 'T')
544 {
545 continue;
546 }
547
548 /* send message to frontend */
549 ereport(DEBUG1,
550 (errmsg("memcache: sending cached messages: '%c' len: %d", tmpkind, len)));
551 send_message(frontend, tmpkind, len, p);
552
553 msg++;
554 }
555
556 return msg;
557 }
558
559 /*
560 * send message to frontend
561 */
send_message(POOL_CONNECTION * conn,char kind,int len,const char * data)562 static void send_message(POOL_CONNECTION *conn, char kind, int len, const char *data)
563 {
564 ereport(DEBUG2,
565 (errmsg("memcache: sending messages: kind '%c', len=%d, data=%p", kind, len, data)));
566
567 pool_write(conn, &kind, 1);
568
569 len = htonl(len);
570 pool_write(conn, &len, sizeof(len));
571
572 len = ntohl(len);
573 pool_write(conn, (void *)data, len-sizeof(len));
574 }
575
576 #ifdef USE_MEMCACHED
577 /*
578 * delete query cache on memcached
579 */
delete_cache_on_memcached(const char * key)580 static int delete_cache_on_memcached(const char *key)
581 {
582
583 memcached_return rc;
584
585 ereport(DEBUG2,
586 (errmsg("memcache: deleteing cache on memcached with key: \"%s\"", key)));
587
588
589 /* delete cache data on memcached. key is md5 hash query */
590 rc= memcached_delete(memc, key, 32, (time_t)0);
591
592 /* delete cache data on memcached is failed */
593 if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_BUFFERED)
594 {
595 ereport(LOG,
596 (errmsg("failed to delete cache on memcached, error:\"%s\"", memcached_strerror(memc, rc))));
597 return 0;
598 }
599 return 1;
600
601 }
602 #endif
603
604 /*
605 * Fetch SELECT data from cache if possible.
606 */
pool_fetch_from_memory_cache(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend,char * contents,bool * foundp)607 POOL_STATUS pool_fetch_from_memory_cache(POOL_CONNECTION *frontend,
608 POOL_CONNECTION_POOL *backend,
609 char *contents, bool *foundp)
610 {
611 char *qcache;
612 size_t qcachelen;
613 int sts;
614 pool_sigset_t oldmask;
615
616 ereport(DEBUG1,
617 (errmsg("pool_fetch_from_memory_cache called")));
618
619 *foundp = false;
620
621 POOL_SETMASK2(&BlockSig, &oldmask);
622 pool_shmem_lock();
623
624 PG_TRY();
625 {
626 sts = pool_fetch_cache(backend, contents, &qcache, &qcachelen);
627 }
628 PG_CATCH();
629 {
630 pool_shmem_unlock();
631 POOL_SETMASK(&oldmask);
632 PG_RE_THROW();
633 }
634 PG_END_TRY();
635
636 pool_shmem_unlock();
637 POOL_SETMASK(&oldmask);
638
639 if (sts != 0)
640 /* Cache not found */
641 return POOL_CONTINUE;
642
643 /*
644 * Cache found. If we are doing extended query and in streaming
645 * replication mode, we need to retrieve any responses from backend and
646 * forward them to frontend.
647 */
648 if (pool_is_doing_extended_query_message() && STREAM)
649 {
650 POOL_SESSION_CONTEXT *session_context;
651 POOL_CONNECTION *target_backend;
652
653 ereport(DEBUG1,
654 (errmsg("memcache: injecting cache data")));
655
656 session_context = pool_get_session_context(true);
657 target_backend = CONNECTION(backend, session_context->load_balance_node_id);
658 inject_cached_message(target_backend, qcache, qcachelen);
659 }
660 else
661 {
662 /*
663 * Send each messages to frontend
664 */
665 send_cached_messages(frontend, qcache, qcachelen);
666 }
667
668 pfree(qcache);
669
670 /*
671 * Send a "READY FOR QUERY" if not in extended query.
672 */
673 if (!pool_is_doing_extended_query_message() && MAJOR(backend) == PROTO_MAJOR_V3)
674 {
675 signed char state;
676
677 /*
678 * We keep previous transaction state.
679 */
680 state = MASTER(backend)->tstate;
681 send_message(frontend, 'Z', 5, (char *)&state);
682 }
683
684 if (!pool_is_doing_extended_query_message() || !STREAM)
685 {
686 if (pool_flush(frontend))
687 {
688 return POOL_END;
689 }
690 }
691
692 *foundp = true;
693
694 if (pool_config->log_per_node_statement)
695 ereport(LOG,
696 (errmsg("fetch from memory cache"),
697 errdetail("query result fetched from cache. statement: %s", contents)));
698
699 ereport(DEBUG1,
700 (errmsg("fetch from memory cache"),
701 errdetail("query result found in the query cache, %s", contents)));
702
703 return POOL_CONTINUE;
704 }
705
706 /*
707 * Simple and rough (thus unreliable) check if the query is likely
708 * SELECT. Just check if the query starts with SELECT or WITH. This
709 * can be used before parse tree is available.
710 */
pool_is_likely_select(char * query)711 bool pool_is_likely_select(char *query)
712 {
713 bool do_continue = false;
714
715 if (query == NULL)
716 return false;
717
718 if (pool_config->ignore_leading_white_space)
719 {
720 /* Ignore leading white spaces */
721 while (*query && isspace(*query))
722 query++;
723 }
724 if (! *query)
725 {
726 return false;
727 }
728
729 /*
730 * Get rid of head comment.
731 * It is sure that the query is in correct format, because the parser
732 * has rejected bad queries such as the one with not-ended comment.
733 */
734 while (*query)
735 {
736 /* Ignore spaces and return marks */
737 do_continue = false;
738 while (*query && isspace(*query))
739 {
740 query++;
741 do_continue = true;
742 }
743 if (do_continue)
744 {
745 continue;
746 }
747
748 while (*query && !strncmp(query, "\n", 2))
749 {
750 query++;
751 do_continue = true;
752 }
753 if (do_continue)
754 {
755 query += 2;
756 continue;
757 }
758
759 /* Ignore comments like C */
760 if (!strncmp(query, "/*", 2))
761 {
762 while (*query && strncmp(query, "*/", 2))
763 query++;
764
765 query += 2;
766 continue;
767 }
768
769 /* Ignore SQL comments */
770 if (!strncmp(query, "--", 2))
771 {
772 while (*query && strncmp(query, "\n", 2))
773 query++;
774
775 query += 2;
776 continue;
777 }
778
779 if (!strncasecmp(query, "SELECT", 6) || !strncasecmp(query, "WITH", 4))
780 {
781 return true;
782 }
783
784 query++;
785 }
786
787 return false;
788 }
789
790 /*
791 * Return true if SELECT can be cached. "node" is the parse tree for
792 * the query and "query" is the query string.
793 * The query must be SELECT or WITH.
794 */
pool_is_allow_to_cache(Node * node,char * query)795 bool pool_is_allow_to_cache(Node *node, char *query)
796 {
797 int i = 0;
798 int num_oids = -1;
799 SelectContext ctx;
800
801 /*
802 * If NO QUERY CACHE comment exists, do not cache.
803 */
804 if (!strncasecmp(query, NO_QUERY_CACHE, NO_QUERY_CACHE_COMMENT_SZ))
805 return false;
806 /*
807 * Check black table list first.
808 */
809 if (pool_config->num_black_memqcache_table_list > 0)
810 {
811 /* Extract oids in from clause of SELECT, and check if SELECT to them could be cached. */
812 num_oids = pool_extract_table_oids_from_select_stmt(node, &ctx);
813 if (num_oids > 0)
814 {
815 for (i = 0; i < num_oids; i++)
816 {
817 ereport(DEBUG1,
818 (errmsg("memcache: checking if node is allowed to cache: check table_names[%d] = \"%s\"", i, ctx.table_names[i])));
819 if (pool_is_table_in_black_list(ctx.table_names[i]) == true)
820 {
821 ereport(DEBUG1,
822 (errmsg("memcache: node is not allowed to cache")));
823 return false;
824 }
825 }
826 }
827 }
828
829 /* SELECT INTO or SELECT FOR SHARE or UPDATE cannot be cached */
830 if (pool_has_insertinto_or_locking_clause(node))
831 return false;
832
833 /*
834 * If SELECT uses non immutable functions, it's not allowed to
835 * cache.
836 */
837 if (pool_has_non_immutable_function_call(node))
838 return false;
839
840 /*
841 * If SELECT uses temporary tables it's not allowed to cache.
842 */
843 if (pool_config->check_temp_table && pool_has_temp_table(node))
844 return false;
845
846 /*
847 * If SELECT uses system catalogs, it's not allowed to cache.
848 */
849 if (pool_has_system_catalog(node))
850 return false;
851
852 /*
853 * TABLESAMPLE is not allowed to cache.
854 */
855 if (IsA(node, SelectStmt) && ((SelectStmt *)node)->fromClause)
856 {
857 List *tbl_list = ((SelectStmt *)node)->fromClause;
858 ListCell *tbl;
859 foreach(tbl, tbl_list)
860 {
861 if (IsA(lfirst(tbl), RangeTableSample))
862 return false;
863 }
864 }
865
866
867 /*
868 * If the table is in the while list, allow to cache even if it is
869 * VIEW or unlogged table.
870 */
871 if (pool_config->num_white_memqcache_table_list > 0)
872 {
873 if (num_oids < 0)
874 num_oids = pool_extract_table_oids_from_select_stmt(node, &ctx);
875
876 if (num_oids > 0)
877 {
878 for (i = 0; i < num_oids; i++)
879 {
880 char *table = ctx.table_names[i];
881 ereport(DEBUG1,
882 (errmsg("memcache: checking if node is allowed to cache: check table_names[%d] = \"%s\"", i, table)));
883 if (is_view(table) || is_unlogged_table(table))
884 {
885 if (pool_is_table_in_white_list(table) == false)
886 {
887 ereport(DEBUG1,
888 (errmsg("memcache: node is not allowed to cache")));
889 return false;
890 }
891 }
892 }
893 }
894 }
895 else
896 {
897 /*
898 * If SELECT uses views, it's not allowed to cache.
899 */
900 if (pool_has_view(node))
901 return false;
902
903 /*
904 * If SELECT uses unlogged tables, it's not allowed to cache.
905 */
906 if (pool_has_unlogged_table(node))
907 return false;
908 }
909
910 /*
911 * If Data-modifying statements in WITH clause, it's not allowed to cache.
912 */
913 if(IsA(node, SelectStmt) && ((SelectStmt *) node)->withClause)
914 {
915 ListCell *lc;
916 WithClause *withClause = ((SelectStmt *) node)->withClause;
917
918 foreach(lc, withClause->ctes)
919 {
920 CommonTableExpr *cte = (CommonTableExpr *)lfirst(lc);
921 if(IsA(cte->ctequery, InsertStmt) ||
922 IsA(cte->ctequery, DeleteStmt) ||
923 IsA(cte->ctequery, UpdateStmt))
924 {
925 return false;
926 }
927 }
928 }
929
930 return true;
931 }
932
933
934 /*
935 * Return true If the SELECTed table is in back list.
936 */
pool_is_table_in_black_list(const char * table_name)937 bool pool_is_table_in_black_list(const char *table_name)
938 {
939
940 if (pool_config->num_black_memqcache_table_list > 0 &&
941 pattern_compare((char *)table_name, BLACKLIST, "black_memqcache_table_list") == 1)
942 {
943 return true;
944 }
945
946 return false;
947 }
948
949 /*
950 * Return true If the SELECTed table is in white list.
951 */
pool_is_table_in_white_list(const char * table_name)952 bool pool_is_table_in_white_list(const char *table_name)
953 {
954 if (pool_config->num_white_memqcache_table_list > 0 &&
955 pattern_compare((char *)table_name, WHITELIST, "white_memqcache_table_list") == 1)
956 {
957 return true;
958 }
959
960 return false;
961 }
962
963 /*
964 * Extract table oid from INSERT/UPDATE/DELETE/TRUNCATE/
965 * DROP TABLE/ALTER TABLE/COPY FROM statement.
966 * For SELECT, if Data-modifying statements in its WITH clause,
967 * extract table oid from Data-modifying statements too.
968 * Returns number of oids.
969 * In case of error, returns 0 (InvalidOid).
970 * oids buffer (oidsp) will be discarded by subsequent call.
971 */
pool_extract_table_oids(Node * node,int ** oidsp)972 int pool_extract_table_oids(Node *node, int **oidsp)
973 {
974 #define POOL_MAX_DML_OIDS 128
975 char *table;
976 static int oids[POOL_MAX_DML_OIDS];
977 int num_oids;
978 int oid;
979
980 if (node == NULL)
981 {
982 ereport(LOG,
983 (errmsg("memcache: error while extracting table oids. statement is NULL")));
984 return 0;
985 }
986
987 num_oids = 0;
988 *oidsp = oids;
989
990 if (IsA(node, InsertStmt))
991 {
992 InsertStmt *stmt = (InsertStmt *) node;
993
994 num_oids = pool_extract_withclause_oids((Node *) stmt->withClause, *oidsp);
995 table = make_table_name_from_rangevar(stmt->relation);
996 }
997 else if (IsA(node, UpdateStmt))
998 {
999 UpdateStmt *stmt = (UpdateStmt *) node;
1000
1001 num_oids = pool_extract_withclause_oids((Node *) stmt->withClause, *oidsp);
1002 table = make_table_name_from_rangevar(stmt->relation);
1003 }
1004 else if (IsA(node, DeleteStmt))
1005 {
1006 DeleteStmt *stmt = (DeleteStmt *) node;
1007
1008 num_oids = pool_extract_withclause_oids((Node *) stmt->withClause, *oidsp);
1009 table = make_table_name_from_rangevar(stmt->relation);
1010 }
1011 else if(IsA(node, SelectStmt))
1012 {
1013 SelectStmt *stmt = (SelectStmt *) node;
1014 num_oids = pool_extract_withclause_oids((Node *) stmt->withClause, *oidsp);
1015 table = NULL;
1016 }
1017 #ifdef NOT_USED
1018 /*
1019 * We do not handle CREATE TABLE here. It is possible that
1020 * pool_extract_table_oids() is called before CREATE TABLE gets
1021 * executed.
1022 */
1023 else if (IsA(node, CreateStmt))
1024 {
1025 CreateStmt *stmt = (CreateStmt *)node;
1026 table = make_table_name_from_rangevar(stmt->relation);
1027 }
1028 #endif
1029
1030 else if (IsA(node, AlterTableStmt))
1031 {
1032 AlterTableStmt *stmt = (AlterTableStmt *)node;
1033 table = make_table_name_from_rangevar(stmt->relation);
1034 }
1035
1036 else if (IsA(node, CopyStmt))
1037 {
1038 CopyStmt *stmt = (CopyStmt *)node;
1039 if (stmt->is_from) /* COPY FROM? */
1040 {
1041 table = make_table_name_from_rangevar(stmt->relation);
1042 }
1043 else
1044 {
1045 return 0;
1046 }
1047 }
1048
1049 else if (IsA(node, DropStmt))
1050 {
1051 ListCell *cell;
1052
1053 DropStmt *stmt = (DropStmt *)node;
1054
1055 if (stmt->removeType != OBJECT_TABLE)
1056 {
1057 return 0;
1058 }
1059
1060 /* Here, stmt->objects is list of target relation info. The
1061 * first cell of target relation info is a list (possibly)
1062 * consists of database, schema and relation. We need to call
1063 * makeRangeVarFromNameList() before passing to
1064 * make_table_name_from_rangevar. Otherwise we get weird excessively
1065 * decorated relation name (''table_name'').
1066 */
1067 foreach(cell, stmt->objects)
1068 {
1069 if (num_oids >= POOL_MAX_DML_OIDS)
1070 {
1071 ereport(LOG,
1072 (errmsg("memcache: error while extracting table oids. too many oids:%d", num_oids)));
1073 return 0;
1074 }
1075
1076 table = make_table_name_from_rangevar(makeRangeVarFromNameList(lfirst(cell)));
1077 oid = pool_table_name_to_oid(table);
1078 if (oid > 0)
1079 {
1080 oids[num_oids++] = pool_table_name_to_oid(table);
1081 ereport(DEBUG1,
1082 (errmsg("memcache: extracting table oids: table: \"%s\" oid:%d", table, oids[num_oids-1])));
1083 }
1084 }
1085 return num_oids;
1086 }
1087 else if (IsA(node, TruncateStmt))
1088 {
1089 ListCell *cell;
1090
1091 TruncateStmt *stmt = (TruncateStmt *)node;
1092
1093 foreach(cell, stmt->relations)
1094 {
1095 if (num_oids >= POOL_MAX_DML_OIDS)
1096 {
1097 ereport(LOG,
1098 (errmsg("memcache: error while extracting table oids. too many oids:%d", num_oids)));
1099 return 0;
1100 }
1101
1102 table = make_table_name_from_rangevar(lfirst(cell));
1103 oid = pool_table_name_to_oid(table);
1104 if (oid > 0)
1105 {
1106 oids[num_oids++] = pool_table_name_to_oid(table);
1107 ereport(DEBUG1,
1108 (errmsg("memcache: extracting table oids: table: \"%s\" oid:%d", table, oids[num_oids-1])));
1109 }
1110 }
1111 return num_oids;
1112 }
1113 else if (IsA(node, ExplainStmt))
1114 {
1115 ListCell *cell;
1116 DefElem *def;
1117 ExplainStmt *stmt = (ExplainStmt *) node;
1118
1119 foreach(cell, stmt->options)
1120 {
1121 def = lfirst(cell);
1122 if (strncmp("analyze", def->defname, 7) == 0)
1123 {
1124 return pool_extract_table_oids(stmt->query, oidsp);
1125 }
1126 }
1127
1128 table = NULL;
1129 }
1130 else
1131 {
1132 ereport(DEBUG1,
1133 (errmsg("memcache: extracting table oids: statment is different from INSERT/UPDATE/DELETE/TRUNCATE/DROP TABLE/ALTER TABLE")));
1134 return 0;
1135 }
1136
1137 oid = pool_table_name_to_oid(table);
1138 if (oid > 0)
1139 {
1140 if (num_oids >= POOL_MAX_DML_OIDS)
1141 {
1142 ereport(LOG,
1143 (errmsg("memcache: error while extracting table oids. too many oids:%d", num_oids)));
1144 return 0;
1145 }
1146
1147 oids[num_oids++] = pool_table_name_to_oid(table);
1148 ereport(DEBUG1,
1149 (errmsg("memcache: extracting table oids: table: \"%s\" oid:%d", table, oid)));
1150 }
1151 return num_oids;
1152 }
1153
1154 /*
1155 * Extract table oid from INSERT/UPDATE/DELETE
1156 * FROM statement in WITH clause.
1157 * Returns number of oids.
1158 * oids buffer (oidsp) will be discarded by subsequent call.
1159 */
1160 int
pool_extract_withclause_oids(Node * node,int * oidsp)1161 pool_extract_withclause_oids(Node *node, int *oidsp)
1162 {
1163 int num_oids = 0;
1164 int oid;
1165 char *table;
1166 ListCell *lc;
1167 WithClause *with;
1168
1169 if(oidsp == NULL)
1170 {
1171 return 0;
1172 }
1173
1174 if(!node || !IsA(node, WithClause))
1175 {
1176 return 0;
1177 }
1178
1179 with = (WithClause *) node;
1180 foreach(lc, with->ctes)
1181 {
1182 CommonTableExpr *cte = (CommonTableExpr *)lfirst(lc);
1183 if(IsA(cte->ctequery, InsertStmt))
1184 {
1185 InsertStmt *stmt = (InsertStmt *) cte->ctequery;
1186 table = make_table_name_from_rangevar(stmt->relation);
1187 }
1188 else if(IsA(cte->ctequery, DeleteStmt))
1189 {
1190 DeleteStmt *stmt = (DeleteStmt *) cte->ctequery;
1191 table = make_table_name_from_rangevar(stmt->relation);
1192 }
1193 else if(IsA(cte->ctequery, UpdateStmt))
1194 {
1195 UpdateStmt *stmt = (UpdateStmt *) cte->ctequery;
1196 table = make_table_name_from_rangevar(stmt->relation);
1197 }
1198 else
1199 {
1200 /* only check INSERT/DELETE/UPDATE in WITH clause */
1201 table = NULL;
1202 }
1203
1204 oid = pool_table_name_to_oid(table);
1205 if (oid > 0)
1206 {
1207 if (num_oids >= POOL_MAX_DML_OIDS)
1208 {
1209 break;
1210 }
1211
1212 oidsp[num_oids++] = pool_table_name_to_oid(table);
1213 ereport(DEBUG1,
1214 (errmsg("memcache: extracting table oids: table: \"%s\" oid:%d", table, oidsp[num_oids - 1])));
1215 }
1216 }
1217
1218 return num_oids;
1219 }
1220
1221 #define POOL_OIDBUF_SIZE 1024
1222 static int* oidbuf;
1223 static int oidbufp;
1224 static int oidbuf_size;
1225
1226 /*
1227 * Add table oid to internal buffer
1228 */
pool_add_dml_table_oid(int oid)1229 void pool_add_dml_table_oid(int oid)
1230 {
1231 int i;
1232 int* tmp;
1233
1234 if (oid == 0)
1235 return;
1236
1237 if (oidbufp >= oidbuf_size)
1238 {
1239 MemoryContext oldcxt;
1240 oidbuf_size += POOL_OIDBUF_SIZE;
1241 /*
1242 * This need to live throughout the life of child so home it in
1243 * TopMemoryContext
1244 */
1245 oldcxt = MemoryContextSwitchTo(TopMemoryContext);
1246 tmp = repalloc(oidbuf, sizeof(int) * oidbuf_size);
1247 MemoryContextSwitchTo(oldcxt);
1248 if (tmp == NULL)
1249 return;
1250
1251 oidbuf = tmp;
1252 }
1253
1254 for (i=0;i<oidbufp;i++)
1255 {
1256 if (oidbuf[i] == oid)
1257 /* Already same oid exists */
1258 return;
1259 }
1260 oidbuf[oidbufp++] = oid;
1261 }
1262
1263
1264 /*
1265 * Get table oid buffer
1266 */
pool_get_dml_table_oid(int ** oid)1267 static int pool_get_dml_table_oid(int **oid)
1268 {
1269 *oid = oidbuf;
1270 return oidbufp;
1271 }
1272
pool_get_dropdb_table_oids(int ** oids,int dboid)1273 static int pool_get_dropdb_table_oids(int **oids, int dboid)
1274 {
1275 int *rtn = 0;
1276 int oids_size = 0;
1277 int *tmp;
1278
1279 int num_oids = 0;
1280 DIR *dir;
1281 struct dirent *dp;
1282 char path[1024];
1283
1284 snprintf(path, sizeof(path), "%s/%d", pool_config->memqcache_oiddir, dboid);
1285 if ((dir = opendir(path)) == NULL)
1286 {
1287 ereport(DEBUG1,
1288 (errmsg("memcache: getting drop table oids"),
1289 errdetail("Failed to open dir: %s", path)));
1290 return 0;
1291 }
1292
1293 while ((dp = readdir(dir)) != NULL)
1294 {
1295 if (strcmp(dp->d_name, ".") == 0 || strcmp(dp->d_name, "..") == 0)
1296 continue;
1297
1298 if (num_oids >= oids_size)
1299 {
1300 oids_size += POOL_OIDBUF_SIZE;
1301 tmp = repalloc(rtn, sizeof(int) * oids_size);
1302 if (tmp == NULL)
1303 {
1304 closedir(dir);
1305 return 0;
1306 }
1307 rtn = tmp;
1308 }
1309
1310 rtn[num_oids] = atol(dp->d_name);
1311 num_oids++;
1312 }
1313
1314 closedir(dir);
1315 *oids = rtn;
1316
1317 return num_oids;
1318 }
1319
1320 /* Discard oid internal buffer */
pool_discard_dml_table_oid(void)1321 static void pool_discard_dml_table_oid(void)
1322 {
1323 oidbufp = 0;
1324 }
1325
1326 /*
1327 * Management modules for oid map. When caching SELECT results, we
1328 * record table oids to file, which has following structure.
1329 *
1330 * memqcache_oiddir -+- database_oid -+-table_oid_file1
1331 * |
1332 * +-table_oid_file2
1333 * |
1334 * +-table_oid_file3...
1335 *
1336 * table_oid_file's name is table oid, which was used by the SELECT
1337 * statement. The file has 1 or more cacheid(s). When SELECT result is
1338 * cached, the file is created and cache id is appended. Later SELECT
1339 * using same table oid will add to the same file. If the SELECT uses
1340 * multiple tables, multiple table_oid_file will be created. When
1341 * INSERT/UPDATE/DELETE is executed, corresponding caches must be
1342 * deleted (cache invalidation) (when DROP TABLE, ALTER TABLE is
1343 * executed, the caches must be deleted as well). When database is
1344 * dropped, all caches belonging to the database must be deleted.
1345 */
1346
1347 /*
1348 * Get oid of current database
1349 */
pool_get_database_oid(void)1350 static int pool_get_database_oid(void)
1351 {
1352 /*
1353 * Query to convert table name to oid
1354 */
1355 int oid = 0;
1356 static POOL_RELCACHE *relcache;
1357 POOL_CONNECTION_POOL *backend;
1358
1359 backend = pool_get_session_context(false)->backend;
1360
1361 /*
1362 * If relcache does not exist, create it.
1363 */
1364 if (!relcache)
1365 {
1366 relcache = pool_create_relcache(pool_config->relcache_size, DATABASE_TO_OID_QUERY,
1367 int_register_func, int_unregister_func,
1368 false);
1369 if (relcache == NULL)
1370 {
1371 ereport(LOG,
1372 (errmsg("memcache: error creating relcache while getting database OID")));
1373 return oid;
1374 }
1375 }
1376
1377 /*
1378 * Search relcache.
1379 */
1380 oid = (int)(intptr_t)pool_search_relcache(relcache, backend,
1381 MASTER_CONNECTION(backend)->sp->database);
1382 return oid;
1383 }
1384
1385 /*
1386 * Get oid of current database for discarding cache files
1387 * after executing DROP DATABASE
1388 */
pool_get_database_oid_from_dbname(char * dbname)1389 int pool_get_database_oid_from_dbname(char *dbname)
1390 {
1391 int dboid = 0;
1392 POOL_SELECT_RESULT *res;
1393 char query[1024];
1394
1395 POOL_CONNECTION_POOL *backend;
1396 backend = pool_get_session_context(false)->backend;
1397
1398 snprintf(query, sizeof(query), DATABASE_TO_OID_QUERY, dbname);
1399 do_query(MASTER(backend), query, &res, MAJOR(backend));
1400
1401 if (res->numrows != 1)
1402 {
1403 ereport(DEBUG1,
1404 (errmsg("memcache: getting oid of current database"),
1405 errdetail("received %d rows", res->numrows)));
1406 free_select_result(res);
1407 return 0;
1408 }
1409
1410 dboid = atol(res->data[0]);
1411 free_select_result(res);
1412
1413 return dboid;
1414 }
1415
1416 /*
1417 * Add cache id (shmem case) or hash key (memcached case) to table oid
1418 * map file. Caller must hold shmem lock before calling this function
1419 * to avoid file extension conflict among different pgpool child
1420 * process.
1421 * As of pgpool-II 3.2, pool_handle_query_cache is responsible for that.
1422 * (pool_handle_query_cache -> pool_commit_cache -> pool_add_table_oid_map)
1423 */
pool_add_table_oid_map(POOL_CACHEKEY * cachekey,int num_table_oids,int * table_oids)1424 static void pool_add_table_oid_map(POOL_CACHEKEY *cachekey, int num_table_oids, int *table_oids)
1425 {
1426 char *dir;
1427 int dboid;
1428 char path[1024];
1429 int i;
1430 int len;
1431
1432 /*
1433 * Create memqcache_oiddir
1434 */
1435 dir = pool_config->memqcache_oiddir;
1436
1437 if (mkdir(dir, S_IREAD|S_IWRITE|S_IEXEC) == -1)
1438 {
1439 if (errno != EEXIST)
1440 {
1441 ereport(WARNING,
1442 (errmsg("memcache: adding table oid maps, failed to create directory:\"%s\". error:\"%s\"", dir, strerror(errno))));
1443 return;
1444 }
1445 }
1446
1447 /*
1448 * Create memqcache_oiddir/database_oid
1449 */
1450 dboid = pool_get_database_oid();
1451 ereport(DEBUG1,
1452 (errmsg("memcache: adding table oid maps"),
1453 errdetail("dboid %d", dboid)));
1454
1455 if (dboid <= 0)
1456 {
1457 ereport(WARNING,
1458 (errmsg("memcache: adding table oid maps, failed to get database OID")));
1459 return;
1460 }
1461
1462 snprintf(path, sizeof(path), "%s/%d", dir, dboid);
1463 if (mkdir(path, S_IREAD|S_IWRITE|S_IEXEC) == -1)
1464 {
1465 if (errno != EEXIST)
1466 {
1467 ereport(WARNING,
1468 (errmsg("memcache: adding table oid maps, failed to create directory:\"%s\". error:\"%s\"", path, strerror(errno))));
1469 return;
1470 }
1471 }
1472
1473 if (pool_is_shmem_cache())
1474 {
1475 len = sizeof(cachekey->cacheid);
1476 }
1477 else
1478 {
1479 len = sizeof(cachekey->hashkey);
1480 }
1481
1482 for (i=0;i<num_table_oids;i++)
1483 {
1484 int fd;
1485 int oid = table_oids[i];
1486 int sts;
1487 struct flock fl;
1488
1489 /*
1490 * Create or open each memqcache_oiddir/database_oid/table_oid
1491 */
1492 snprintf(path, sizeof(path), "%s/%d/%d", dir, dboid, oid);
1493 if ((fd = open(path, O_CREAT|O_RDWR, S_IRUSR|S_IWUSR)) == -1)
1494 {
1495 ereport(WARNING,
1496 (errmsg("memcache: adding table oid maps, failed to open file:\"%s\". error:\"%s\"", path, strerror(errno))));
1497 return;
1498 }
1499
1500 fl.l_type = F_WRLCK;
1501 fl.l_whence = SEEK_SET;
1502 fl.l_start = 0; /* Offset from l_whence */
1503 fl.l_len = 0; /* length, 0 = to EOF */
1504
1505 sts = fcntl(fd, F_SETLKW, &fl);
1506 if (sts == -1)
1507 {
1508 ereport(WARNING,
1509 (errmsg("memcache: adding table oid maps, failed to lock file:\"%s\". error:\"%s\"", path, strerror(errno))));
1510
1511 close(fd);
1512 return;
1513 }
1514
1515 /*
1516 * Below was ifdef-out because of a performance reason.
1517 * Looking for duplicate cache entries in a file needed
1518 * unacceptably high cost. So we gave up this and decided not
1519 * to care about duplicate entries in the file.
1520 */
1521 #ifdef NOT_USED
1522 for (;;)
1523 {
1524 sts = read(fd, (char *)&buf, len);
1525 if (sts == -1)
1526 {
1527 ereport(WARNING,
1528 (errmsg("memcache: adding table oid maps, failed to read file:\"%s\". error:\"%s\"", path, strerror(errno))));
1529 close(fd);
1530 return;
1531 }
1532 else if (sts == len)
1533 {
1534 if (memcmp(cachekey, &buf, len) == 0)
1535 {
1536 /* Same key found. Skip this */
1537 close(fd);
1538 return;
1539 }
1540 continue;
1541 }
1542 /*
1543 * Must be EOF
1544 */
1545 if (sts != 0)
1546 {
1547 ereport(WARNING,
1548 (errmsg("memcache: adding table oid maps, invalid data length:%d in file:\"%s\". error:\"%s\"",sts, path)));
1549 close(fd);
1550 return;
1551 }
1552 break;
1553 }
1554 #endif
1555
1556 if (lseek(fd, 0, SEEK_END) == -1)
1557 {
1558 ereport(WARNING,
1559 (errmsg("memcache: adding table oid maps, failed seek on file:\"%s\". error:\"%s\"", path, strerror(errno))));
1560 close(fd);
1561 return;
1562 }
1563
1564 /*
1565 * Write cache_id or cache key at the end of file
1566 */
1567 sts = write(fd, (char *)cachekey, len);
1568 if (sts == -1 || sts != len)
1569 {
1570 ereport(WARNING,
1571 (errmsg("memcache: adding table oid maps, failed to write file:\"%s\". error:\"%s\"", path, strerror(errno))));
1572 close(fd);
1573 return;
1574 }
1575 close(fd);
1576 }
1577 }
1578
1579 /*
1580 * Discard all oid maps at pgpool-II startup.
1581 * This is necessary for shmem case.
1582 */
pool_discard_oid_maps(void)1583 void pool_discard_oid_maps(void)
1584 {
1585 char command[1024];
1586
1587 snprintf(command, sizeof(command), "/bin/rm -fr %s/[0-9]*",
1588 pool_config->memqcache_oiddir);
1589 if(system(command) == -1)
1590 ereport(WARNING,
1591 (errmsg("unable to execute command \"%s\"",command),
1592 errdetail("system() command failed with error \"%s\"",strerror(errno))));
1593
1594
1595 }
1596
pool_discard_oid_maps_by_db(int dboid)1597 void pool_discard_oid_maps_by_db(int dboid)
1598 {
1599 char command[1024];
1600
1601 if (pool_is_shmem_cache())
1602 {
1603 snprintf(command, sizeof(command), "/bin/rm -fr %s/%d/",
1604 pool_config->memqcache_oiddir, dboid);
1605
1606 ereport(DEBUG1,
1607 (errmsg("memcache: discarding oid maps by db"),
1608 errdetail("command: '%s\'", command)));
1609
1610 if(system(command) == -1)
1611 ereport(WARNING,
1612 (errmsg("unable to execute command \"%s\"",command),
1613 errdetail("system() command failed with error \"%s\"",strerror(errno))));
1614 }
1615 }
1616
1617 /*
1618 * Read cache id (shmem case) or hash key (memcached case) from table
1619 * oid map file according to table_oids and discard cache entries. If
1620 * unlink is true, the file will be unlinked after successful cache
1621 * removal.
1622 */
pool_invalidate_query_cache(int num_table_oids,int * table_oid,bool unlinkp,int dboid)1623 static void pool_invalidate_query_cache(int num_table_oids, int *table_oid, bool unlinkp, int dboid)
1624 {
1625 char *dir;
1626 char path[1024];
1627 int i;
1628 int len;
1629 POOL_CACHEKEY buf;
1630
1631 /*
1632 * Create memqcache_oiddir
1633 */
1634 dir = pool_config->memqcache_oiddir;
1635 if (mkdir(dir, S_IREAD|S_IWRITE|S_IEXEC) == -1)
1636 {
1637 if (errno != EEXIST)
1638 {
1639 ereport(WARNING,
1640 (errmsg("memcache: invalidating query cache, failed to create directory:\"%s\". error:\"%s\"", dir, strerror(errno))));
1641 return;
1642 }
1643 }
1644
1645 /*
1646 * Create memqcache_oiddir/database_oid
1647 */
1648 if (dboid == 0)
1649 {
1650 dboid = pool_get_database_oid();
1651 ereport(DEBUG1,
1652 (errmsg("memcache invalidating query cache"),
1653 errdetail("dboid %d", dboid)));
1654
1655 if (dboid <= 0)
1656 {
1657 ereport(WARNING,
1658 (errmsg("memcache: invalidating query cache, could not get database OID")));
1659 return;
1660 }
1661 }
1662
1663 snprintf(path, sizeof(path), "%s/%d", dir, dboid);
1664 if (mkdir(path, S_IREAD|S_IWRITE|S_IEXEC) == -1)
1665 {
1666 if (errno != EEXIST)
1667 {
1668 ereport(WARNING,
1669 (errmsg("memcache: invalidating query cache, failed to create directory:\"%s\". error:\"%s\"", path, strerror(errno))));
1670 return;
1671 }
1672 }
1673
1674 if (pool_is_shmem_cache())
1675 {
1676 len = sizeof(buf.cacheid);
1677 }
1678 else
1679 {
1680 len = sizeof(buf.hashkey);
1681 }
1682
1683 for (i=0;i<num_table_oids;i++)
1684 {
1685 int fd;
1686 int oid = table_oid[i];
1687 int sts;
1688 struct flock fl;
1689
1690 /*
1691 * Open each memqcache_oiddir/database_oid/table_oid
1692 */
1693 snprintf(path, sizeof(path), "%s/%d/%d", dir, dboid, oid);
1694 if ((fd = open(path, O_RDONLY)) == -1)
1695 {
1696 /* This may be normal. It is possible that no SELECT has
1697 * been issued since the table has been created or since
1698 * pgpool-II started up.
1699 */
1700 ereport(DEBUG1,
1701 (errmsg("memcache invalidating query cache"),
1702 errdetail("failed to open \"%s\". reason:\"%s\"",path, strerror(errno))));
1703 continue;
1704 }
1705
1706 fl.l_type = F_RDLCK;
1707 fl.l_whence = SEEK_SET;
1708 fl.l_start = 0; /* Offset from l_whence */
1709 fl.l_len = 0; /* length, 0 = to EOF */
1710
1711 sts = fcntl(fd, F_SETLKW, &fl);
1712 if (sts == -1)
1713 {
1714 ereport(WARNING,
1715 (errmsg("memcache: invalidating query cache, failed to lock file:\"%s\". error:\"%s\"", path, strerror(errno))));
1716 close(fd);
1717 return;
1718 }
1719 for (;;)
1720 {
1721 sts = read(fd, (char *)&buf, len);
1722 if (sts == -1)
1723 {
1724 ereport(WARNING,
1725 (errmsg("memcache: invalidating query cache, failed to read file:\"%s\". error:\"%s\"", path, strerror(errno))));
1726
1727 close(fd);
1728 return;
1729 }
1730 else if (sts == len)
1731 {
1732 if (pool_is_shmem_cache())
1733 {
1734 ereport(DEBUG1,
1735 (errmsg("memcache invalidating query cache"),
1736 errdetail("deleting cacheid:%d itemid:%d",
1737 buf.cacheid.blockid, buf.cacheid.itemid)));
1738 pool_delete_item_shmem_cache(&buf.cacheid);
1739 }
1740 #ifdef USE_MEMCACHED
1741 else
1742 {
1743 char delbuf[33];
1744
1745 memcpy(delbuf, buf.hashkey, 32);
1746 delbuf[32] = 0;
1747 ereport(DEBUG1,
1748 (errmsg("memcache invalidating query cache"),
1749 errdetail("deleting %s", delbuf)));
1750
1751 delete_cache_on_memcached(delbuf);
1752 }
1753 #endif
1754 continue;
1755 }
1756
1757 /*
1758 * Must be EOF
1759 */
1760 if (sts != 0)
1761 {
1762 ereport(WARNING,
1763 (errmsg("memcache: invalidating query cache, invalid data length:%d in file:\"%s\"",sts, path)));
1764 close(fd);
1765 return;
1766 }
1767 break;
1768 }
1769
1770 if (unlinkp)
1771 {
1772 unlink(path);
1773 }
1774 close(fd);
1775 }
1776 #ifdef SHMEMCACHE_DEBUG
1777 dump_shmem_cache(0);
1778 #endif
1779 }
1780
1781 /*
1782 * Reset SELECT data buffers. If reset_dml_oids is true, call
1783 * pool_discard_dml_table_oid() to reset table oids used in DML statements.
1784 */
pool_reset_memqcache_buffer(bool reset_dml_oids)1785 static void pool_reset_memqcache_buffer(bool reset_dml_oids)
1786 {
1787 POOL_SESSION_CONTEXT * session_context;
1788
1789 session_context = pool_get_session_context(true);
1790
1791 if (session_context && session_context->query_cache_array)
1792 {
1793 POOL_TEMP_QUERY_CACHE *cache;
1794
1795 ereport(DEBUG1,
1796 (errmsg("memcache reset buffer"),
1797 errdetail("discard: %p", session_context->query_cache_array)));
1798
1799 pool_discard_query_cache_array(session_context->query_cache_array);
1800 session_context->query_cache_array = pool_create_query_cache_array();
1801
1802 ereport(DEBUG1,
1803 (errmsg("memcache reset buffer"),
1804 errdetail("create: %p", session_context->query_cache_array)));
1805 /*
1806 * if the query context is still under use, we cannot discard
1807 * temporary cache.
1808 */
1809 if ((SL_MODE && pool_is_doing_extended_query_message()) ||
1810 can_query_context_destroy(session_context->query_context))
1811 {
1812 ereport(DEBUG1,
1813 (errmsg("memcache reset buffer"),
1814 errdetail("discard temp buffer of %p (%s)",
1815 session_context->query_context, session_context->query_context->original_query)));
1816
1817 cache = pool_get_current_cache();
1818 pool_discard_temp_query_cache(cache);
1819 /*
1820 * Reset temp_cache pointer in the current query context
1821 * so that we don't double free memory.
1822 */
1823 session_context->query_context->temp_cache = NULL;
1824 }
1825 }
1826
1827 if (reset_dml_oids)
1828 pool_discard_dml_table_oid();
1829
1830 pool_tmp_stats_reset_num_selects();
1831 }
1832
1833 /*
1834 * Return true if memory cache method is "shmem". The purpose of this
1835 * function is to cache the result of stcmp and to save a few cycle.
1836 */
pool_is_shmem_cache(void)1837 bool pool_is_shmem_cache(void)
1838 {
1839 return (pool_config->memqcache_method == SHMEM_CACHE)?true:false;
1840 }
1841
1842 /*
1843 * Remember memory cache number of blocks.
1844 */
1845 static int memqcache_num_blocks;
pool_set_memqcache_blocks(int num_blocks)1846 static void pool_set_memqcache_blocks(int num_blocks)
1847 {
1848 memqcache_num_blocks = num_blocks;
1849 }
1850
1851 /*
1852 * Return memory cache number of blocks.
1853 */
pool_get_memqcache_blocks(void)1854 static int pool_get_memqcache_blocks(void)
1855 {
1856 return memqcache_num_blocks;
1857 }
1858
1859 /*
1860 * Query cache on shared memory management modules.
1861 */
1862
1863 /*
1864 * Calculate necessary shared memory size.
1865 */
pool_shared_memory_cache_size(void)1866 size_t pool_shared_memory_cache_size(void)
1867 {
1868 int64 num_blocks;
1869 size_t size;
1870
1871 if (pool_config->memqcache_maxcache > pool_config->memqcache_cache_block_size)
1872 ereport(FATAL,
1873 (errmsg("invalid memory cache configuration"),
1874 errdetail("memqcache_cache_block_size %d should be greater or equal to memqcache_maxcache %d",
1875 pool_config->memqcache_cache_block_size,
1876 pool_config->memqcache_maxcache)));
1877
1878
1879 num_blocks = pool_config->memqcache_total_size/
1880 pool_config->memqcache_cache_block_size;
1881 if (num_blocks == 0)
1882 ereport(FATAL,
1883 (errmsg("invalid memory cache configuration"),
1884 errdetail("memqcache_total_size %ld should be greater or equal to memqcache_cache_block_size %d",
1885 pool_config->memqcache_total_size,
1886 pool_config->memqcache_cache_block_size)));
1887
1888 ereport(LOG,
1889 (errmsg("memory cache initialized"),
1890 errdetail("memcache blocks :%ld",num_blocks)));
1891 /* Remember # of blocks */
1892 pool_set_memqcache_blocks(num_blocks);
1893 size = pool_config->memqcache_cache_block_size * num_blocks;
1894 return size;
1895 }
1896
1897 /*
1898 * Acquire and initialize shared memory cache. This should be called
1899 * only once from pgpool main process at the process staring up time.
1900 */
1901 static void *shmem;
pool_init_memory_cache(size_t size)1902 int pool_init_memory_cache(size_t size)
1903 {
1904 ereport(DEBUG1,
1905 (errmsg("memory cache request size : %zd",size)));
1906
1907 shmem = pool_shared_memory_create(size);
1908 return 0;
1909 }
1910
1911 /*
1912 * Clear all the shared memory cache and reset FSMM and hash table.
1913 */
1914 void
pool_clear_memory_cache(void)1915 pool_clear_memory_cache(void)
1916 {
1917 size_t size;
1918 pool_sigset_t oldmask;
1919
1920 POOL_SETMASK2(&BlockSig, &oldmask);
1921 pool_shmem_lock();
1922
1923 PG_TRY();
1924 {
1925 size = pool_shared_memory_cache_size();
1926 memset(shmem, 0, size);
1927
1928 size = pool_shared_memory_fsmm_size();
1929 pool_reset_fsmm(size);
1930
1931 pool_discard_oid_maps();
1932
1933 pool_hash_reset(pool_config->memqcache_max_num_cache);
1934 }
1935 PG_CATCH();
1936 {
1937 pool_shmem_unlock();
1938 POOL_SETMASK(&oldmask);
1939 PG_RE_THROW();
1940 }
1941 PG_END_TRY();
1942
1943 pool_shmem_unlock();
1944 POOL_SETMASK(&oldmask);
1945 }
1946
1947 /*
1948 * Return shared memory cache address
1949 */
pool_memory_cache_address(void)1950 static void *pool_memory_cache_address(void)
1951 {
1952 return shmem;
1953 }
1954
1955 /*
1956 * Initialize new block
1957 */
1958
1959 /*
1960 * Free space management map
1961 *
1962 * Free space management map (FSMM) consists of bytes. Each byte
1963 * corresponds to block id. For example, if you have 1GB cache and
1964 * block size is 8Kb, number of blocks = 131,072, thus total size of
1965 * FSMM is 128Kb. Each FSMM entry has value from 0 to 255. Those
1966 * values describes total free space in each block.
1967 * For example, if the value is 2, the free space can be between 64
1968 * bytes and 95 bytes.
1969 *
1970 * value free space(in bytes)
1971 * 0 0-31
1972 * 1 32-63
1973 * 2 64-95
1974 * 3 96-127
1975 * :
1976 * 255 8160-8192
1977 */
1978
1979 /*
1980 * Calculate necessary shared memory size for FSMM. Should be called after
1981 * pool_shared_memory_cache_size.
1982 */
pool_shared_memory_fsmm_size(void)1983 size_t pool_shared_memory_fsmm_size(void)
1984 {
1985 size_t size;
1986
1987 size = pool_get_memqcache_blocks() * sizeof(char);
1988 return size;
1989 }
1990
1991 /*
1992 * Acquire and initialize shared memory cache for FSMM. This should be
1993 * called after pool_shared_memory_cache_size only once from pgpool
1994 * main process at the process staring up time.
1995 */
1996 static void *fsmm;
pool_init_fsmm(size_t size)1997 int pool_init_fsmm(size_t size)
1998 {
1999 int maxblock = pool_get_memqcache_blocks();
2000 int encode_value;
2001
2002 fsmm = pool_shared_memory_create(size);
2003 encode_value = POOL_MAX_FREE_SPACE/POOL_FSMM_RATIO;
2004 memset(fsmm, encode_value, maxblock);
2005 return 0;
2006 }
2007
2008 /*
2009 * Return shared memory fsmm address
2010 */
pool_fsmm_address(void)2011 static void *pool_fsmm_address(void)
2012 {
2013 return fsmm;
2014 }
2015
2016 /*
2017 * Clock algorithm shared query cache management modules.
2018 */
2019
2020 /*
2021 * Clock hand pointing to next victim block
2022 */
2023 static int *pool_fsmm_clock_hand;
2024
2025 /*
2026 * Allocate and initialize clock hand on shmem
2027 */
pool_allocate_fsmm_clock_hand(void)2028 void pool_allocate_fsmm_clock_hand(void)
2029 {
2030 pool_fsmm_clock_hand = pool_shared_memory_create(sizeof(*pool_fsmm_clock_hand));
2031 *pool_fsmm_clock_hand = 0;
2032 }
2033
2034 /*
2035 * Reset FSMM.
2036 */
2037 static void
pool_reset_fsmm(size_t size)2038 pool_reset_fsmm(size_t size)
2039 {
2040 int encode_value;
2041
2042 encode_value = POOL_MAX_FREE_SPACE/POOL_FSMM_RATIO;
2043 memset(fsmm, encode_value, size);
2044
2045 *pool_fsmm_clock_hand = 0;
2046 }
2047
2048 /*
2049 * Find victim block using clock algorithm and make it free.
2050 * Returns new free block id.
2051 */
pool_reuse_block(void)2052 static POOL_CACHE_BLOCKID pool_reuse_block(void)
2053 {
2054 int maxblock = pool_get_memqcache_blocks();
2055 char *block = block_address(*pool_fsmm_clock_hand);
2056 POOL_CACHE_BLOCK_HEADER *bh = (POOL_CACHE_BLOCK_HEADER *)block;
2057 POOL_CACHE_BLOCKID reused_block;
2058 POOL_CACHE_ITEM_POINTER *cip;
2059 char *p;
2060 int i;
2061
2062 bh->flags = 0;
2063 reused_block = *pool_fsmm_clock_hand;
2064 p = block_address(reused_block);
2065
2066 for (i=0;i<bh->num_items;i++)
2067 {
2068 cip = item_pointer(p, i);
2069
2070 if (!(POOL_ITEM_DELETED & cip->flags))
2071 {
2072 pool_hash_delete(&cip->query_hash);
2073 ereport(DEBUG1,
2074 (errmsg("pool_reuse_block: blockid: %d item: %d", reused_block, i)));
2075 }
2076 }
2077
2078 pool_init_cache_block(reused_block);
2079 pool_update_fsmm(reused_block, POOL_MAX_FREE_SPACE);
2080
2081 (*pool_fsmm_clock_hand)++;
2082 if (*pool_fsmm_clock_hand >= maxblock)
2083 *pool_fsmm_clock_hand = 0;
2084
2085 ereport(LOG,
2086 (errmsg("pool_reuse_block: blockid: %d", reused_block)));
2087
2088 return reused_block;
2089 }
2090
2091 /*
2092 * Get block id which has enough space
2093 */
pool_get_block(size_t free_space)2094 static POOL_CACHE_BLOCKID pool_get_block(size_t free_space)
2095 {
2096 int encode_value;
2097 unsigned char *p = pool_fsmm_address();
2098 int i;
2099 int maxblock = pool_get_memqcache_blocks();
2100 POOL_CACHE_BLOCK_HEADER *bh;
2101
2102 if (p == NULL)
2103 {
2104 ereport(WARNING,
2105 (errmsg("memcache: getting block: FSMM is not initialized")));
2106 return -1;
2107 }
2108
2109 if (free_space > POOL_MAX_FREE_SPACE)
2110 {
2111 ereport(WARNING,
2112 (errmsg("memcache: getting block: invalid free space:%zd", free_space),
2113 errdetail("requested free space: %zd is more than maximum allowed space:%lu",free_space,POOL_MAX_FREE_SPACE)));
2114 return -1;
2115 }
2116
2117 encode_value = free_space/POOL_FSMM_RATIO;
2118
2119 for (i=0;i<maxblock;i++)
2120 {
2121 if (p[i] >= encode_value)
2122 {
2123 /*
2124 * This block *may" have enough space.
2125 * We need to make sure it actually has enough space.
2126 */
2127 bh = (POOL_CACHE_BLOCK_HEADER *)block_address(i);
2128 if (bh->free_bytes >= free_space)
2129 {
2130 return (POOL_CACHE_BLOCKID)i;
2131 }
2132 }
2133 }
2134
2135 /*
2136 * No enough space found. Reuse victim block
2137 */
2138 return pool_reuse_block();
2139 }
2140
2141 /*
2142 * Update free space info for specified block
2143 */
pool_update_fsmm(POOL_CACHE_BLOCKID blockid,size_t free_space)2144 static void pool_update_fsmm(POOL_CACHE_BLOCKID blockid, size_t free_space)
2145 {
2146 int encode_value;
2147 char *p = pool_fsmm_address();
2148
2149 if (p == NULL)
2150 {
2151 ereport(WARNING,
2152 (errmsg("memcache: updating free space in block: FSMM is not initialized")));
2153 return;
2154 }
2155
2156 if (blockid >= pool_get_memqcache_blocks())
2157 {
2158 ereport(WARNING,
2159 (errmsg("memcache: updating free space in block: block id:%d in invalid",blockid)));
2160 return;
2161 }
2162
2163 if (free_space > POOL_MAX_FREE_SPACE)
2164 {
2165 ereport(WARNING,
2166 (errmsg("memcache: updating free space in block: invalid free space:%zd", free_space),
2167 errdetail("requested free space: %zd is more than maximum allowed space:%lu",free_space,POOL_MAX_FREE_SPACE)));
2168
2169 return;
2170 }
2171
2172 encode_value = free_space/POOL_FSMM_RATIO;
2173
2174 p[blockid] = encode_value;
2175
2176 return;
2177 }
2178
2179 /*
2180 * Add item data to shared memory cache.
2181 * On successful registration, returns cache id.
2182 * The cache id is overwritten by the subsequent call to this function.
2183 * On error returns NULL.
2184 */
pool_add_item_shmem_cache(POOL_QUERY_HASH * query_hash,char * data,int size)2185 static POOL_CACHEID *pool_add_item_shmem_cache(POOL_QUERY_HASH *query_hash, char *data, int size)
2186 {
2187 static POOL_CACHEID cacheid;
2188 POOL_CACHE_BLOCKID blockid;
2189 POOL_CACHE_BLOCK_HEADER *bh;
2190 POOL_CACHE_ITEM_POINTER *cip;
2191
2192 POOL_CACHE_ITEM ci;
2193 POOL_CACHE_ITEM_POINTER cip_body;
2194 char *item;
2195
2196 int request_size;
2197 char *p;
2198 int i;
2199 char *src;
2200 char *dst;
2201 int num_deleted;
2202 char *dcip;
2203 char *dci;
2204 bool need_pack;
2205 char *work_buffer;
2206 int index;
2207
2208 if (query_hash == NULL)
2209 {
2210 ereport(LOG,
2211 (errmsg("error while adding item to shmem cache, query hash is NULL")));
2212 return NULL;
2213 }
2214
2215 if (data == NULL)
2216 {
2217 ereport(LOG,
2218 (errmsg("error while adding item to shmem cache, data is NULL")));
2219 return NULL;
2220 }
2221
2222 if (size <= 0)
2223 {
2224 ereport(LOG,
2225 (errmsg("error while adding item to shmem cache, invalid request size: %d", size)));
2226 return NULL;
2227 }
2228
2229 /* Add overhead */
2230 request_size = size + sizeof(POOL_CACHE_ITEM_POINTER) + sizeof(POOL_CACHE_ITEM_HEADER);
2231
2232 /* Get cache block which has enough space */
2233 blockid = pool_get_block(request_size);
2234
2235 if (blockid == -1)
2236 {
2237 return NULL;
2238 }
2239
2240 /*
2241 * Initialize the block if necessary. If no live items are
2242 * remained, we also initialize the block. If there's contiguous
2243 * deleted items, we turn them into free space as well.
2244 */
2245 pool_init_cache_block(blockid);
2246
2247 /*
2248 * Make sure that we have at least one free hash element.
2249 */
2250 while (!is_free_hash_element())
2251 {
2252 /* If not, reuse next victim block */
2253 blockid = pool_reuse_block();
2254 pool_init_cache_block(blockid);
2255 }
2256
2257 /* Get block address on shmem */
2258 p = block_address(blockid);
2259 bh = (POOL_CACHE_BLOCK_HEADER *)p;
2260
2261 /*
2262 * Create contiguous free space. We assume that item bodies are
2263 * ordered from bottom to top of the block, and corresponding item
2264 * pointers are ordered from the youngest to the oldest in the
2265 * beginning of the block.
2266 */
2267
2268 /*
2269 * Optimization. If there's no deleted items, we don't need to
2270 * pack it to create contiguous free space.
2271 */
2272 need_pack = false;
2273 for (i=0;i<bh->num_items;i++)
2274 {
2275 cip = item_pointer(p, i);
2276
2277 if (POOL_ITEM_DELETED & cip->flags) /* Deleted item? */
2278 {
2279 need_pack = true;
2280 ereport(DEBUG1,
2281 (errmsg("memcache adding item"),
2282 errdetail("start creating contiguous space")));
2283 break;
2284 }
2285 }
2286
2287 /*
2288 * We disable packing for now.
2289 * Revisit and remove following code fragment later.
2290 */
2291 need_pack = false;
2292
2293 if (need_pack)
2294 {
2295 /*
2296 * Pack and create contiguous free space.
2297 */
2298 dcip = calloc(1, pool_config->memqcache_cache_block_size);
2299 if (!dcip)
2300 {
2301 ereport(WARNING,
2302 (errmsg("memcache: adding item to cache: calloc failed")));
2303 return NULL;
2304 }
2305
2306 work_buffer = dcip;
2307 dci = dcip + pool_config->memqcache_cache_block_size;
2308 num_deleted = 0;
2309 index = 0;
2310
2311 for (i=0;i<bh->num_items;i++)
2312 {
2313 int total_length;
2314 POOL_CACHEID cid;
2315
2316 cip = item_pointer(p, i);
2317
2318 if (POOL_ITEM_DELETED & cip->flags) /* Deleted item? */
2319 {
2320 num_deleted++;
2321 continue;
2322 }
2323
2324 /* Copy item body */
2325 src = p + cip->offset;
2326 total_length = item_header(p, i)->total_length;
2327 dst = dci - total_length;
2328 cip->offset = dst - work_buffer;
2329 memcpy(dst, src, total_length);
2330
2331 dci -= total_length;
2332
2333 /* Copy item pointer */
2334 src = (char *)cip;
2335 dst = (char *)item_pointer(dcip, index);
2336 memcpy(dst, src, sizeof(POOL_CACHE_ITEM_POINTER));
2337
2338 /* Update hash index */
2339 cid.blockid = blockid;
2340 cid.itemid = index;
2341 pool_hash_insert(&cip->query_hash, &cid, true);
2342 ereport(DEBUG1,
2343 (errmsg("memcache adding item"),
2344 errdetail("item cid updated. old:%d %d new:%d %d",
2345 blockid, i, blockid, index)));
2346 index++;
2347 }
2348
2349 /* All items deleted? */
2350 if (num_deleted > 0 && num_deleted == bh->num_items)
2351 {
2352 ereport(DEBUG1,
2353 (errmsg("memcache adding item"),
2354 errdetail("all items deleted, total deleted:%d", num_deleted)));
2355 bh->flags = 0;
2356 pool_init_cache_block(blockid);
2357 pool_update_fsmm(blockid, POOL_MAX_FREE_SPACE);
2358 }
2359 else
2360 {
2361 /* Update number of items */
2362 bh->num_items -= num_deleted;
2363
2364 /* Copy back the packed block except block header */
2365 memcpy(p+sizeof(POOL_CACHE_BLOCK_HEADER),
2366 work_buffer+sizeof(POOL_CACHE_BLOCK_HEADER),
2367 pool_config->memqcache_cache_block_size-sizeof(POOL_CACHE_BLOCK_HEADER));
2368 }
2369 free(work_buffer);
2370 }
2371
2372 /*
2373 * Make sure that we have enough free space
2374 */
2375 if (bh->free_bytes < request_size)
2376 {
2377 /* This should not happen */
2378 ereport(WARNING,
2379 (errmsg("memcache: adding item to cache: not enough space"),
2380 errdetail("free space: %d required: %d block id:%d",
2381 bh->free_bytes, request_size, blockid)));
2382 return NULL;
2383 }
2384
2385 /*
2386 * At this point, new item can be located at block_header->num_items
2387 */
2388
2389 /* Fill in cache item header */
2390 ci.header.timestamp = time(NULL);
2391 ci.header.total_length = sizeof(POOL_CACHE_ITEM_HEADER) + size;
2392
2393 /* Calculate item body address */
2394 if (bh->num_items == 0)
2395 {
2396 /* This is the #0 item. So address is block_bottom -
2397 * data_length */
2398 item = p + pool_config->memqcache_cache_block_size - ci.header.total_length;
2399
2400 /* Mark this block used */
2401 bh->flags = POOL_BLOCK_USED;
2402 }
2403 else
2404 {
2405 cip = item_pointer(p, bh->num_items-1);
2406 item = p + cip->offset - ci.header.total_length;
2407 }
2408
2409 /* Copy item header */
2410 memcpy(item, &ci, sizeof(POOL_CACHE_ITEM_HEADER));
2411 bh->free_bytes -= sizeof(POOL_CACHE_ITEM_HEADER);
2412
2413 /* Copy item body */
2414 memcpy(item + sizeof(POOL_CACHE_ITEM_HEADER), data, size);
2415 bh->free_bytes -= size;
2416
2417 /* Copy cache item pointer */
2418 memcpy(&cip_body.query_hash, query_hash, sizeof(POOL_QUERY_HASH));
2419 memset(&cip_body.next, 0, sizeof(POOL_CACHEID));
2420 cip_body.offset = item - p;
2421 cip_body.flags = POOL_ITEM_USED;
2422 memcpy(item_pointer(p, bh->num_items), &cip_body, sizeof(POOL_CACHE_ITEM_POINTER));
2423 bh->free_bytes -= sizeof(POOL_CACHE_ITEM_POINTER);
2424
2425 /* Update FSMM */
2426 pool_update_fsmm(blockid, bh->free_bytes);
2427
2428 cacheid.blockid = blockid;
2429 cacheid.itemid = bh->num_items;
2430 ereport(DEBUG1,
2431 (errmsg("memcache adding item"),
2432 errdetail("new item inserted. blockid: %d itemid:%d",
2433 cacheid.blockid, cacheid.itemid)));
2434
2435 /* Add up number of items */
2436 bh->num_items++;
2437
2438 /* Update hash table */
2439 if (pool_hash_insert(query_hash, &cacheid, false) < 0)
2440 {
2441 ereport(LOG,
2442 (errmsg("error while adding item to shmem cache, hash insert failed")));
2443
2444 /* Since we have failed to insert hash index entry, we need to
2445 * undo the addition of cache entry.
2446 */
2447 pool_delete_item_shmem_cache(&cacheid);
2448 return NULL;
2449 }
2450 ereport(DEBUG1,
2451 (errmsg("memcache adding item"),
2452 errdetail("block: %d item: %d", cacheid.blockid, cacheid.itemid)));
2453
2454 #ifdef SHMEMCACHE_DEBUG
2455 dump_shmem_cache(blockid);
2456 #endif
2457 return &cacheid;
2458 }
2459
2460 /*
2461 * Returns item data address on shared memory cache specified by query hash.
2462 * Also data length is set to *size.
2463 * On error or data not found case returns NULL.
2464 * Detail is set to *sts. (0: success, 1: not found, -1: error)
2465 */
pool_get_item_shmem_cache(POOL_QUERY_HASH * query_hash,int * size,int * sts)2466 static char *pool_get_item_shmem_cache(POOL_QUERY_HASH *query_hash, int *size, int *sts)
2467 {
2468 POOL_CACHEID *cacheid;
2469 POOL_CACHE_ITEM_HEADER *cih;
2470
2471 if (sts == NULL)
2472 {
2473 ereport(LOG,
2474 (errmsg("error while getting item from shmem cache, sts is NULL")));
2475 return NULL;
2476 }
2477
2478 *sts = -1;
2479
2480 if (query_hash == NULL)
2481 {
2482 ereport(LOG,
2483 (errmsg("error while getting item from shmem cache, query hash is NULL")));
2484 return NULL;
2485 }
2486
2487 if (size == NULL)
2488 {
2489 ereport(LOG,
2490 (errmsg("error while getting item from shmem cache, size is NULL")));
2491 return NULL;
2492 }
2493
2494 /*
2495 * Find cache header by using hash table
2496 */
2497 cacheid = pool_find_item_on_shmem_cache(query_hash);
2498 if (cacheid == NULL)
2499 {
2500 /* Not found */
2501 *sts = 1;
2502 return NULL;
2503 }
2504
2505 cih = pool_cache_item_header(cacheid);
2506
2507 *size = cih->total_length - sizeof(POOL_CACHE_ITEM_HEADER);
2508 return (char *)cih + sizeof(POOL_CACHE_ITEM_HEADER);
2509 }
2510
2511 /*
2512 * Find data on shared memory cache specified query hash.
2513 * On success returns cache id.
2514 * The cache id is overwritten by the subsequent call to this function.
2515 */
pool_find_item_on_shmem_cache(POOL_QUERY_HASH * query_hash)2516 static POOL_CACHEID *pool_find_item_on_shmem_cache(POOL_QUERY_HASH *query_hash)
2517 {
2518 static POOL_CACHEID cacheid;
2519 POOL_CACHEID *c;
2520 POOL_CACHE_ITEM_HEADER *cih;
2521 time_t now;
2522
2523 c = pool_hash_search(query_hash);
2524 if (!c)
2525 {
2526 return NULL;
2527 }
2528
2529 cih = item_header(block_address(c->blockid), c->itemid);
2530
2531 /* Check cache expiration */
2532 if (pool_config->memqcache_expire > 0)
2533 {
2534 now = time(NULL);
2535 if (now > (cih->timestamp + pool_config->memqcache_expire))
2536 {
2537 ereport(DEBUG1,
2538 (errmsg("memcache finding item"),
2539 errdetail("cache expired: now: %ld timestamp: %ld",
2540 now, cih->timestamp + pool_config->memqcache_expire)));
2541 pool_delete_item_shmem_cache(c);
2542 return NULL;
2543 }
2544 }
2545
2546 cacheid.blockid = c->blockid;
2547 cacheid.itemid = c->itemid;
2548 return &cacheid;
2549 }
2550
2551 /*
2552 * Delete item data specified cache id from shmem.
2553 * On successful deletion, returns 0.
2554 * Other wise return -1.
2555 * FSMM is also updated.
2556 */
pool_delete_item_shmem_cache(POOL_CACHEID * cacheid)2557 static int pool_delete_item_shmem_cache(POOL_CACHEID *cacheid)
2558 {
2559 POOL_CACHE_BLOCK_HEADER *bh;
2560 POOL_CACHE_ITEM_POINTER *cip;
2561 POOL_CACHE_ITEM_HEADER *cih;
2562 POOL_QUERY_HASH key;
2563 int size;
2564 ereport(DEBUG1,
2565 (errmsg("memcache deleting item data"),
2566 errdetail("cacheid:%d itemid:%d",cacheid->blockid, cacheid->itemid)));
2567
2568 if (cacheid->blockid >= pool_get_memqcache_blocks())
2569 {
2570 ereport(LOG,
2571 (errmsg("error while deleting item from shmem cache, invalid block: %d",
2572 cacheid->blockid)));
2573 return -1;
2574 }
2575
2576 bh = (POOL_CACHE_BLOCK_HEADER *)block_address(cacheid->blockid);
2577 if (!(bh->flags & POOL_BLOCK_USED))
2578 {
2579 ereport(LOG,
2580 (errmsg("error while deleting item from shmem cache, block: %d is not used",
2581 cacheid->blockid)));
2582 return -1;
2583 }
2584
2585 if (cacheid->itemid >= bh->num_items)
2586 {
2587 /*
2588 * This could happen if the block is reused. Since contents
2589 * of oid map file is not updated when the block is reused.
2590 */
2591 ereport(DEBUG1,
2592 (errmsg("memcache error deleting item data"),
2593 errdetail("invalid item id %d in block:%d",
2594 cacheid->itemid, cacheid->blockid)));
2595
2596 return -1;
2597 }
2598
2599 cip = item_pointer(block_address(cacheid->blockid), cacheid->itemid);
2600 if (!(cip->flags & POOL_ITEM_USED))
2601 {
2602 ereport(LOG,
2603 (errmsg("error while deleting item from shmem cache, item: %d was not used",
2604 cacheid->itemid)));
2605 return -1;
2606 }
2607
2608 if (cip->flags & POOL_ITEM_DELETED)
2609 {
2610 ereport(LOG,
2611 (errmsg("error while deleting item from shmem cache, item: %d was already deleted",
2612 cacheid->itemid)));
2613 return -1;
2614 }
2615
2616 /* Save cache key */
2617 memcpy(&key, &cip->query_hash, sizeof(POOL_QUERY_HASH));
2618
2619 cih = pool_cache_item_header(cacheid);
2620 size = cih->total_length + sizeof(POOL_CACHE_ITEM_POINTER);
2621
2622 /* Delete item pointer */
2623 cip->flags |= POOL_ITEM_DELETED;
2624
2625 /*
2626 * We do NOT count down bh->num_items here. The deleted space will be recycled
2627 * by pool_add_item_shmem_cache(). However, if this is the last item, we can
2628 * recycle whole block.
2629 *
2630 * 2012/4/1: Now we do not pack data in
2631 * pool_add_item_shmem_cache() for performance reason. Also we
2632 * count down num_items if it is the last one.
2633 */
2634 if ((bh->num_items -1) == 0)
2635 {
2636 ereport(DEBUG1,
2637 (errmsg("memcache deleting item data"),
2638 errdetail("no item remains. initialize block")));
2639 bh->flags = 0;
2640 pool_init_cache_block(cacheid->blockid);
2641 }
2642
2643 /* Remove hash index */
2644 pool_hash_delete(&key);
2645
2646 /*
2647 * If the deleted item is last one in the block, we add it to the free space.
2648 */
2649 if (cacheid->itemid == (bh->num_items -1))
2650 {
2651 bh->free_bytes += size;
2652 ereport(DEBUG1,
2653 (errmsg("memcache deleting item data"),
2654 errdetail("deleted %d bytes, freebytes is = %d",
2655 size, bh->free_bytes)));
2656
2657 bh->num_items--;
2658 }
2659
2660 /* Update FSMM */
2661 pool_update_fsmm(cacheid->blockid, bh->free_bytes);
2662
2663 return 0;
2664 }
2665
2666 /*
2667 * Returns item header specified by cache id.
2668 */
pool_cache_item_header(POOL_CACHEID * cacheid)2669 static POOL_CACHE_ITEM_HEADER *pool_cache_item_header(POOL_CACHEID *cacheid)
2670 {
2671 POOL_CACHE_BLOCK_HEADER *bh;
2672
2673 if (cacheid->blockid >= pool_get_memqcache_blocks())
2674 {
2675 ereport(WARNING,
2676 (errmsg("error while getting cache item header, invalid block id: %d", cacheid->blockid)));
2677 return NULL;
2678 }
2679
2680 bh = (POOL_CACHE_BLOCK_HEADER *)block_address(cacheid->blockid);
2681 if (cacheid->itemid >= bh->num_items)
2682 {
2683 ereport(WARNING,
2684 (errmsg("error while getting cache item header, invalid item id: %d", cacheid->itemid)));
2685 return NULL;
2686 }
2687
2688 return item_header((char *)bh, cacheid->itemid);
2689 }
2690
2691 /*
2692 * Initialize specified block.
2693 */
pool_init_cache_block(POOL_CACHE_BLOCKID blockid)2694 static int pool_init_cache_block(POOL_CACHE_BLOCKID blockid)
2695 {
2696 char *p;
2697 POOL_CACHE_BLOCK_HEADER *bh;
2698
2699 if (blockid >= pool_get_memqcache_blocks())
2700 {
2701 ereport(WARNING,
2702 (errmsg("error while initializing cache block, invalid block id: %d", blockid)));
2703 return -1;
2704 }
2705
2706 p = block_address(blockid);
2707 bh = (POOL_CACHE_BLOCK_HEADER *)p;
2708
2709 /* Is this block used? */
2710 if (!(bh->flags & POOL_BLOCK_USED))
2711 {
2712 /* Initialize empty block */
2713 memset(p, 0, pool_config->memqcache_cache_block_size);
2714 bh->free_bytes = pool_config->memqcache_cache_block_size -
2715 sizeof(POOL_CACHE_BLOCK_HEADER);
2716 }
2717 return 0;
2718 }
2719
2720 #if NOT_USED
2721 /*
2722 * Delete all items in the block.
2723 */
pool_wipe_out_cache_block(POOL_CACHE_BLOCKID blockid)2724 static void pool_wipe_out_cache_block(POOL_CACHE_BLOCKID blockid)
2725 {
2726 char *p;
2727 POOL_CACHE_BLOCK_HEADER *bh;
2728 POOL_CACHE_ITEM_POINTER *cip;
2729 POOL_CACHEID cacheid;
2730 int i;
2731
2732 /* Get block address on shmem */
2733 p = block_address(blockid);
2734 bh = (POOL_CACHE_BLOCK_HEADER *)p;
2735 cacheid.blockid = blockid;
2736
2737 for (i=0;i<bh->num_items;i++)
2738 {
2739 cip = item_pointer(p, i);
2740
2741 if ((POOL_ITEM_DELETED & cip->flags) == 0) /* Not deleted item? */
2742 {
2743 cacheid.itemid = i;
2744 pool_delete_item_shmem_cache(&cacheid);
2745 }
2746 }
2747
2748 bh->flags = 0;
2749 pool_init_cache_block(blockid);
2750 pool_update_fsmm(blockid, POOL_MAX_FREE_SPACE);
2751 }
2752 #endif
2753
2754 /*
2755 * Acquire lock: XXX giant lock
2756 */
pool_shmem_lock(void)2757 void pool_shmem_lock(void)
2758 {
2759 if (pool_is_shmem_cache())
2760 {
2761 pool_semaphore_lock(SHM_CACHE_SEM);
2762
2763 }
2764 }
2765
2766 /*
2767 * Release lock
2768 */
pool_shmem_unlock(void)2769 void pool_shmem_unlock(void)
2770 {
2771 if (pool_is_shmem_cache())
2772 {
2773 pool_semaphore_unlock(SHM_CACHE_SEM);
2774 }
2775 }
2776
2777 /*
2778 * Returns cache block address specified by block id
2779 */
block_address(int blockid)2780 static char *block_address(int blockid)
2781 {
2782 char *p;
2783
2784 p = pool_memory_cache_address() +
2785 blockid * pool_config->memqcache_cache_block_size;
2786 return p;
2787 }
2788
2789 /*
2790 * Returns i th item pointer in block address block
2791 */
item_pointer(char * block,int i)2792 static POOL_CACHE_ITEM_POINTER *item_pointer(char *block, int i)
2793 {
2794 return (POOL_CACHE_ITEM_POINTER *)(block + sizeof(POOL_CACHE_BLOCK_HEADER) +
2795 sizeof(POOL_CACHE_ITEM_POINTER) * i);
2796 }
2797
2798 /*
2799 * Returns i th item header in block address block
2800 */
item_header(char * block,int i)2801 static POOL_CACHE_ITEM_HEADER *item_header(char *block, int i)
2802 {
2803 POOL_CACHE_ITEM_POINTER *cip;
2804
2805 cip = item_pointer(block, i);
2806 return (POOL_CACHE_ITEM_HEADER *)(block + cip->offset);
2807 }
2808
2809 #ifdef SHMEMCACHE_DEBUG
2810 /*
2811 * Dump shmem cache block
2812 */
dump_shmem_cache(POOL_CACHE_BLOCKID blockid)2813 static void dump_shmem_cache(POOL_CACHE_BLOCKID blockid)
2814 {
2815 POOL_CACHE_BLOCK_HEADER *bh;
2816 POOL_CACHE_ITEM_POINTER *cip;
2817 POOL_CACHE_ITEM_HEADER *cih;
2818 int i;
2819
2820 bh = (POOL_CACHE_BLOCK_HEADER *)block_address(blockid);
2821 fprintf(stderr, "shmem: block header(%lu bytes): flags:%x num_items:%d free_bytes:%d\n",
2822 sizeof(*bh), bh->flags, bh->num_items, bh->free_bytes);
2823 for (i=0;i<bh->num_items;i++)
2824 {
2825 cip = item_pointer((char *)bh, i);
2826 fprintf(stderr, "shmem: block: %d %d th item pointer(%lu bytes): offset:%d flags:%x\n",
2827 blockid, i, sizeof(*cip), cip->offset, cip->flags);
2828 cih = item_header((char *)bh, i);
2829 fprintf(stderr, "shmem: block: %d %d th item header(%lu bytes): timestamp:%ld length:%d\n",
2830 blockid, i, sizeof(*cih), cih->timestamp, cih->total_length);
2831 }
2832 }
2833 #endif
2834
2835 /*
2836 * SELECT query result array modules
2837 */
pool_create_query_cache_array(void)2838 POOL_QUERY_CACHE_ARRAY *pool_create_query_cache_array(void)
2839 {
2840 #define POOL_QUERY_CACHE_ARRAY_ALLOCATE_NUM 128
2841 #define POOL_QUERY_CACHE_ARRAY_HEADER_SIZE (sizeof(int)+sizeof(int))
2842
2843 size_t size;
2844 POOL_QUERY_CACHE_ARRAY *p;
2845 POOL_SESSION_CONTEXT *session_context =pool_get_session_context(false);
2846 MemoryContext old_context = MemoryContextSwitchTo(session_context->memory_context);
2847
2848 size = POOL_QUERY_CACHE_ARRAY_HEADER_SIZE + POOL_QUERY_CACHE_ARRAY_ALLOCATE_NUM *
2849 sizeof(POOL_TEMP_QUERY_CACHE *);
2850 p = palloc(size);
2851 p->num_caches = 0;
2852 p->array_size = POOL_QUERY_CACHE_ARRAY_ALLOCATE_NUM;
2853 MemoryContextSwitchTo(old_context);
2854 return p;
2855 }
2856
2857 /*
2858 * Discard query cache array
2859 */
pool_discard_query_cache_array(POOL_QUERY_CACHE_ARRAY * cache_array)2860 void pool_discard_query_cache_array(POOL_QUERY_CACHE_ARRAY *cache_array)
2861 {
2862 int i;
2863
2864 if (!cache_array)
2865 return;
2866
2867 ereport(DEBUG1,
2868 (errmsg("memcache discarding query cache array"),
2869 errdetail("num_caches: %d", cache_array->num_caches)));
2870
2871 for (i=0;i<cache_array->num_caches;i++)
2872 {
2873 ereport(DEBUG2,
2874 (errmsg("memcache discarding query cache array"),
2875 errdetail("cache no: %d cache: %p", i, cache_array->caches[i])));
2876 pool_discard_temp_query_cache(cache_array->caches[i]);
2877 }
2878 pfree(cache_array);
2879 }
2880
2881 /*
2882 * Add query cache array
2883 */
pool_add_query_cache_array(POOL_QUERY_CACHE_ARRAY * cache_array,POOL_TEMP_QUERY_CACHE * cache)2884 static POOL_QUERY_CACHE_ARRAY * pool_add_query_cache_array(POOL_QUERY_CACHE_ARRAY *cache_array, POOL_TEMP_QUERY_CACHE *cache)
2885 {
2886 size_t size;
2887 POOL_QUERY_CACHE_ARRAY *cp = cache_array;
2888
2889 if (!cache_array)
2890 return cp;
2891
2892 ereport(DEBUG2,
2893 (errmsg("memcache adding query cache array"),
2894 errdetail("num_caches: %d cache: %p", cache_array->num_caches, cache)));
2895 if (cache_array->num_caches >= cache_array->array_size)
2896 {
2897 cache_array->array_size += POOL_QUERY_CACHE_ARRAY_ALLOCATE_NUM;
2898 size = POOL_QUERY_CACHE_ARRAY_HEADER_SIZE + cache_array->array_size *
2899 sizeof(POOL_TEMP_QUERY_CACHE *);
2900 cache_array = repalloc(cache_array, size);
2901 }
2902 cache_array->caches[cache_array->num_caches++] = cache;
2903 return cache_array;
2904 }
2905
2906 /*
2907 * SELECT query result temporary cache modules
2908 */
2909
2910 /*
2911 * Create SELECT result temporary cache
2912 */
pool_create_temp_query_cache(char * query)2913 POOL_TEMP_QUERY_CACHE *pool_create_temp_query_cache(char *query)
2914 {
2915 POOL_SESSION_CONTEXT *session_context =pool_get_session_context(false);
2916 MemoryContext old_context = MemoryContextSwitchTo(session_context->memory_context);
2917 POOL_TEMP_QUERY_CACHE *p;
2918 p = palloc(sizeof(*p));
2919 p->query = pstrdup(query);
2920
2921 p->buffer = pool_create_buffer();
2922 p->oids = pool_create_buffer();
2923 p->num_oids = 0;
2924 p->is_exceeded = false;
2925 p->is_discarded = false;
2926
2927 MemoryContextSwitchTo(old_context);
2928
2929 ereport(DEBUG1,
2930 (errmsg("pool_create_temp_query_cache: cache created: %p", p)));
2931
2932 return p;
2933 }
2934
2935 /*
2936 * Discard temp query cache
2937 */
pool_discard_temp_query_cache(POOL_TEMP_QUERY_CACHE * temp_cache)2938 void pool_discard_temp_query_cache(POOL_TEMP_QUERY_CACHE *temp_cache)
2939 {
2940 if (!temp_cache)
2941 return;
2942
2943 if (temp_cache->query)
2944 pfree(temp_cache->query);
2945 if (temp_cache->buffer)
2946 pool_discard_buffer(temp_cache->buffer);
2947 if (temp_cache->oids)
2948 pool_discard_buffer(temp_cache->oids);
2949
2950 ereport(DEBUG1,
2951 (errmsg("pool_discard_temp_query_cache: cache discarded: %p", temp_cache)));
2952
2953 pfree(temp_cache);
2954 }
2955
2956 /*
2957 * Add data to temp query cache.
2958 * Data must be FE/BE protocol packet.
2959 */
pool_add_temp_query_cache(POOL_TEMP_QUERY_CACHE * temp_cache,char kind,char * data,int data_len)2960 static void pool_add_temp_query_cache(POOL_TEMP_QUERY_CACHE *temp_cache, char kind, char *data, int data_len)
2961 {
2962 POOL_INTERNAL_BUFFER *buffer;
2963 size_t buflen;
2964 int send_len;
2965
2966 if (temp_cache == NULL)
2967 {
2968 /* This could happen if cache exceeded in previous query
2969 * execution in the same unnamed portal.
2970 */
2971 ereport(DEBUG1,
2972 (errmsg("memcache adding temporary query cache"),
2973 errdetail("POOL_TEMP_QUERY_CACHE is NULL")));
2974 return;
2975 }
2976
2977 if (temp_cache->is_exceeded)
2978 {
2979 ereport(DEBUG1,
2980 (errmsg("memcache adding temporary query cache"),
2981 errdetail("memqcache_maxcache exceeds")));
2982 return;
2983 }
2984
2985 /*
2986 * We only store T(Table Description), D(Data row), C(Command Complete),
2987 * 1(ParseComplete), 2(BindComplete)
2988 */
2989 if (kind != 'T' && kind != 'D' && kind != 'C' && kind != '1' && kind != '2')
2990 {
2991 return;
2992 }
2993
2994 /* Check data limit */
2995 buffer = temp_cache->buffer;
2996 buflen = pool_get_buffer_length(buffer);
2997
2998 if ((buflen+data_len+sizeof(int)+1) > pool_config->memqcache_maxcache)
2999 {
3000 ereport(DEBUG1,
3001 (errmsg("memcache adding temporary query cache"),
3002 errdetail("data size exceeds memqcache_maxcache. current:%zd requested:%zd memq_maxcache:%d",
3003 buflen, data_len+sizeof(int)+1, pool_config->memqcache_maxcache)));
3004 temp_cache->is_exceeded = true;
3005 return;
3006 }
3007
3008 pool_add_buffer(buffer, &kind, 1);
3009 send_len = htonl(data_len + sizeof(int));
3010 pool_add_buffer(buffer, (char *)&send_len, sizeof(int));
3011 pool_add_buffer(buffer, data, data_len);
3012
3013 return;
3014 }
3015
3016 /*
3017 * Add table oids used by SELECT to temp query cache.
3018 */
pool_add_oids_temp_query_cache(POOL_TEMP_QUERY_CACHE * temp_cache,int num_oids,int * oids)3019 static void pool_add_oids_temp_query_cache(POOL_TEMP_QUERY_CACHE *temp_cache, int num_oids, int *oids)
3020 {
3021 POOL_INTERNAL_BUFFER *buffer;
3022
3023 if (!temp_cache || num_oids <= 0)
3024 return;
3025
3026 buffer = temp_cache->oids;
3027 pool_add_buffer(buffer, oids, num_oids*sizeof(int));
3028 temp_cache->num_oids = num_oids;
3029 }
3030
3031 /*
3032 * Internal buffer management modules.
3033 * Usage:
3034 * 1) Create buffer using pool_create_buffer().
3035 * 2) Add data to buffer using pool_add_buffer().
3036 * 3) Extract (copied) data from buffer using pool_get_buffer().
3037 * 4) Optionally you can:
3038 * Obtain buffer length by using pool_get_buffer_length().
3039 * Obtain buffer pointer by using pool_get_buffer_pointer().
3040 * 5) Discard buffer using pool_discard_buffer().
3041 */
3042
3043 /*
3044 * Create and return internal buffer
3045 */
pool_create_buffer(void)3046 static POOL_INTERNAL_BUFFER *pool_create_buffer(void)
3047 {
3048 POOL_INTERNAL_BUFFER *p;
3049 p = palloc0(sizeof(*p));
3050 return p;
3051 }
3052
3053 /*
3054 * Discard internal buffer
3055 */
pool_discard_buffer(POOL_INTERNAL_BUFFER * buffer)3056 static void pool_discard_buffer(POOL_INTERNAL_BUFFER *buffer)
3057 {
3058 if (buffer)
3059 {
3060 if (buffer->buf)
3061 pfree(buffer->buf);
3062 pfree(buffer);
3063 }
3064 }
3065
3066 /*
3067 * Add data to internal buffer
3068 */
pool_add_buffer(POOL_INTERNAL_BUFFER * buffer,void * data,size_t len)3069 static void pool_add_buffer(POOL_INTERNAL_BUFFER *buffer, void *data, size_t len)
3070 {
3071 #define POOL_ALLOCATE_UNIT 8192
3072
3073 /* Sanity check */
3074 if (!buffer || !data || len == 0)
3075 return;
3076 POOL_SESSION_CONTEXT *session_context =pool_get_session_context(false);
3077 MemoryContext old_context = MemoryContextSwitchTo(session_context->memory_context);
3078
3079 /* Check if we need to increase the buffer size */
3080 if ((buffer->buflen + len) > buffer->bufsize)
3081 {
3082 size_t allocate_size = ((buffer->buflen + len)/POOL_ALLOCATE_UNIT +1)*POOL_ALLOCATE_UNIT;
3083 ereport(DEBUG2,
3084 (errmsg("memcache adding data to internal buffer"),
3085 errdetail("realloc old size:%zd new size:%zd",
3086 buffer->bufsize, allocate_size)));
3087 buffer->bufsize = allocate_size;
3088 buffer->buf = (char *)repalloc(buffer->buf, buffer->bufsize);
3089 }
3090 /* Add data to buffer */
3091 memcpy(buffer->buf+buffer->buflen, data, len);
3092 buffer->buflen += len;
3093 ereport(DEBUG2,
3094 (errmsg("memcache adding data to internal buffer"),
3095 errdetail("len:%zd, total:%zd bufsize:%zd",
3096 len, buffer->buflen, buffer->bufsize)));
3097 MemoryContextSwitchTo(old_context);
3098 return;
3099 }
3100
3101 /*
3102 * Get data from internal buffer.
3103 * Data is stored in newly malloc memory.
3104 * Data length is returned to len.
3105 */
pool_get_buffer(POOL_INTERNAL_BUFFER * buffer,size_t * len)3106 static void *pool_get_buffer(POOL_INTERNAL_BUFFER *buffer, size_t *len)
3107 {
3108 void *p;
3109
3110 if (buffer->bufsize == 0 || buffer->buflen == 0 ||
3111 buffer->buf == NULL)
3112 {
3113 *len = 0;
3114 return NULL;
3115 }
3116
3117 p = palloc(buffer->buflen);
3118 memcpy(p, buffer->buf, buffer->buflen);
3119 *len = buffer->buflen;
3120 return p;
3121 }
3122
3123 /*
3124 * Get internal buffer length.
3125 */
pool_get_buffer_length(POOL_INTERNAL_BUFFER * buffer)3126 static size_t pool_get_buffer_length(POOL_INTERNAL_BUFFER *buffer)
3127 {
3128 if (buffer == NULL)
3129 return 0;
3130
3131 return buffer->buflen;
3132 }
3133
3134 #ifdef NOT_USED
3135 /*
3136 * Get internal buffer pointer.
3137 */
pool_get_buffer_pointer(POOL_INTERNAL_BUFFER * buffer)3138 static char *pool_get_buffer_pointer(POOL_INTERNAL_BUFFER *buffer)
3139 {
3140 if (buffer == NULL)
3141 return NULL;
3142 return buffer->buf;
3143 }
3144 #endif
3145 /*
3146 * Get query cache buffer struct of current query context
3147 */
pool_get_current_cache(void)3148 POOL_TEMP_QUERY_CACHE *pool_get_current_cache(void)
3149 {
3150 POOL_SESSION_CONTEXT *session_context;
3151 POOL_QUERY_CONTEXT *query_context;
3152 POOL_TEMP_QUERY_CACHE *p = NULL;
3153
3154 session_context = pool_get_session_context(true);
3155 if (session_context)
3156 {
3157 query_context = session_context->query_context;
3158 if (query_context)
3159 {
3160 p = query_context->temp_cache;
3161 }
3162 }
3163 return p;
3164 }
3165
3166 /*
3167 * Get query cache buffer of current query context
3168 */
pool_get_current_cache_buffer(size_t * len)3169 static char *pool_get_current_cache_buffer(size_t *len)
3170 {
3171 char *p = NULL;
3172 *len = 0;
3173 POOL_TEMP_QUERY_CACHE *cache;
3174
3175 cache = pool_get_current_cache();
3176 if (cache)
3177 {
3178 p = pool_get_buffer(cache->buffer, len);
3179 }
3180 return p;
3181 }
3182
3183 /*
3184 * Mark this temporary query cache buffer discarded if the SELECT
3185 * uses the table oid specified by oids.
3186 */
pool_check_and_discard_cache_buffer(int num_oids,int * oids)3187 static void pool_check_and_discard_cache_buffer(int num_oids, int *oids)
3188 {
3189 POOL_SESSION_CONTEXT *session_context;
3190 POOL_TEMP_QUERY_CACHE *cache;
3191 int num_caches;
3192 size_t len;
3193 int *soids;
3194 int i, j, k;
3195
3196 session_context = pool_get_session_context(true);
3197
3198 if (!session_context || !session_context->query_cache_array)
3199 return;
3200
3201 num_caches = session_context->query_cache_array->num_caches;
3202
3203 for (i=0;i<num_caches;i++)
3204 {
3205 cache = session_context->query_cache_array->caches[i];
3206 if (!cache || cache->is_discarded)
3207 continue;
3208
3209 soids = (int *)pool_get_buffer(cache->oids, &len);
3210 if (!soids || !len)
3211 continue;
3212
3213 for(j=0;j<cache->num_oids;j++)
3214 {
3215 if (cache->is_discarded)
3216 break;
3217
3218 for (k=0;k<num_oids;k++)
3219 {
3220 if (soids[j] == oids[k])
3221 {
3222 ereport(DEBUG1,
3223 (errmsg("discard cache for \"%s\"",cache->query)));
3224 cache->is_discarded = true;
3225 break;
3226 }
3227 }
3228 }
3229 pfree(soids);
3230 }
3231 }
3232
3233 /*
3234 * At Ready for Query or Comand Complete handle query cache. For streaming
3235 * replication mode and extended query at Comand Complete handle query cache.
3236 * For other case At Ready for Query handle query cache.
3237 */
pool_handle_query_cache(POOL_CONNECTION_POOL * backend,char * query,Node * node,char state)3238 void pool_handle_query_cache(POOL_CONNECTION_POOL *backend, char *query, Node *node, char state)
3239 {
3240 POOL_SESSION_CONTEXT *session_context;
3241 pool_sigset_t oldmask;
3242 char *cache_buffer;
3243 size_t len;
3244 int num_oids;
3245 int *oids;
3246 int i;
3247
3248 session_context = pool_get_session_context(true);
3249
3250 /* Ok to cache SELECT result? */
3251 if (pool_is_cache_safe())
3252 {
3253 SelectContext ctx;
3254 MemoryContext old_context;
3255 old_context = MemoryContextSwitchTo(session_context->memory_context);
3256 num_oids = pool_extract_table_oids_from_select_stmt(node, &ctx);
3257 MemoryContextSwitchTo(old_context);
3258 oids = ctx.table_oids;
3259 ereport(DEBUG2,
3260 (errmsg("query cache handler for ReadyForQuery"),
3261 errdetail("num_oids: %d oid: %d", num_oids, *oids)));
3262
3263 if (state == 'I') /* Not inside a transaction? */
3264 {
3265 /*
3266 * Make sure that temporary cache is not exceeded.
3267 */
3268 if (!pool_is_cache_exceeded())
3269 {
3270 POOL_TEMP_QUERY_CACHE *cache;
3271 /*
3272 * If we are not inside a transaction, we can
3273 * immediately register to cache storage.
3274 */
3275 /* Register to memcached or shmem */
3276 POOL_SETMASK2(&BlockSig, &oldmask);
3277 pool_shmem_lock();
3278
3279 cache_buffer = pool_get_current_cache_buffer(&len);
3280 if (cache_buffer)
3281 {
3282 if (session_context->query_context->skip_cache_commit == false)
3283 {
3284 if (pool_commit_cache(backend, query, cache_buffer, len, num_oids, oids) != 0)
3285 {
3286 ereport(WARNING,
3287 (errmsg("ReadyForQuery: pool_commit_cache failed")));
3288 }
3289 }
3290 /*
3291 * Reset temporary query cache buffer. This is
3292 * necessary if extended query protocol is used and a
3293 * bind/execute message arrives which uses a statement
3294 * created by prior parse message. In this case since
3295 * the temp_cache is not initialized by a parse
3296 * message, messages are added to pre existing temp
3297 * cache buffer. The problem was found in bug#152.
3298 * http://www.pgpool.net/mantisbt/view.php?id=152
3299 */
3300 cache = pool_get_current_cache();
3301 ereport(DEBUG1,
3302 (errmsg("pool_handle_query_cache: temp_cache: %p", cache)));
3303 pool_discard_temp_query_cache(cache);
3304
3305 if (STREAM && pool_is_doing_extended_query_message())
3306 session_context->query_context->temp_cache = NULL;
3307 else
3308 session_context->query_context->temp_cache = pool_create_temp_query_cache(query);
3309 pfree(cache_buffer);
3310 }
3311 pool_shmem_unlock();
3312 POOL_SETMASK(&oldmask);
3313 }
3314
3315 /* Count up SELECT stats */
3316 pool_stats_count_up_num_selects(1);
3317
3318 /* Reset temp buffer */
3319 pool_reset_memqcache_buffer(true);
3320 }
3321 else
3322 {
3323 POOL_TEMP_QUERY_CACHE *cache = pool_get_current_cache();
3324
3325 /* In transaction. Keep to temp query cache array */
3326 pool_add_oids_temp_query_cache(cache, num_oids, oids);
3327
3328 /*
3329 * If temp cache has been overflowed, just trash the half
3330 * baked temp cache.
3331 */
3332 if (pool_is_cache_exceeded())
3333 {
3334 POOL_TEMP_QUERY_CACHE *cache;
3335
3336 cache = pool_get_current_cache();
3337 pool_discard_temp_query_cache(cache);
3338 /*
3339 * Reset temp_cache pointer in the current query context
3340 * so that we don't double free memory.
3341 */
3342 session_context->query_context->temp_cache = NULL;
3343
3344 }
3345 /*
3346 * Otherwise add to the temp cache array.
3347 */
3348 else
3349 {
3350 session_context->query_cache_array =
3351 pool_add_query_cache_array(session_context->query_cache_array, cache);
3352 /*
3353 * Reset temp_cache pointer in the current query
3354 * context so that we don't add the same temp cache to
3355 * the cache array. This is necessary such that case
3356 * when next query is just a "bind message", without
3357 * "parse message". In the case the query context is
3358 * reused and same cache pointer will be added to the
3359 * query_cache_array which we do not want.
3360 */
3361 session_context->query_context->temp_cache = NULL;
3362 }
3363
3364 /* Count up temporary SELECT stats */
3365 pool_tmp_stats_count_up_num_selects();
3366 }
3367 }
3368 else if (is_rollback_query(node)) /* Rollback? */
3369 {
3370 /* Discard buffered data */
3371 pool_reset_memqcache_buffer(true);
3372 }
3373 else if (is_commit_query(node)) /* Commit? */
3374 {
3375 int num_caches;
3376
3377 POOL_SETMASK2(&BlockSig, &oldmask);
3378 pool_shmem_lock();
3379
3380 /* Invalidate query cache */
3381 if (pool_config->memqcache_auto_cache_invalidation)
3382 {
3383 num_oids = pool_get_dml_table_oid(&oids);
3384 pool_invalidate_query_cache(num_oids, oids, true, 0);
3385 }
3386
3387 /*
3388 * If we have something in the query cache buffer, that means
3389 * either:
3390 * - We only had SELECTs in the transaction
3391 * - We had only SELECTs after last DML
3392 * Thus we can register SELECT results to cache storage.
3393 */
3394 num_caches = session_context->query_cache_array->num_caches;
3395 for (i=0;i<num_caches;i++)
3396 {
3397 POOL_TEMP_QUERY_CACHE *cache;
3398
3399 cache = session_context->query_cache_array->caches[i];
3400 if (!cache || cache->is_discarded)
3401 continue;
3402
3403 num_oids = cache->num_oids;
3404 oids = pool_get_buffer(cache->oids, &len);
3405 cache_buffer = pool_get_buffer(cache->buffer, &len);
3406
3407 if (pool_commit_cache(backend, cache->query, cache_buffer, len, num_oids, oids) != 0)
3408 {
3409 ereport(WARNING,
3410 (errmsg("ReadyForQuery: pool_commit_cache failed")));
3411 }
3412 if (oids)
3413 pfree(oids);
3414 if (cache_buffer)
3415 pfree(cache_buffer);
3416 }
3417 pool_shmem_unlock();
3418 POOL_SETMASK(&oldmask);
3419
3420 /* Count up number of SELECT stats */
3421 pool_stats_count_up_num_selects(pool_tmp_stats_get_num_selects());
3422
3423 pool_reset_memqcache_buffer(true);
3424 }
3425 else /* Non cache safe queries */
3426 {
3427 /* Non cachable SELECT */
3428 if (node && IsA(node, SelectStmt))
3429 {
3430 /* Extract table oids from buffer */
3431 num_oids = pool_get_dml_table_oid(&oids);
3432
3433 if (state == 'I')
3434 {
3435 /*
3436 * If Data-modifying statements in SELECT's WITH clause,
3437 * invalidate query cache.
3438 */
3439 if (num_oids > 0 && pool_config->memqcache_auto_cache_invalidation)
3440 {
3441 POOL_SETMASK2(&BlockSig, &oldmask);
3442 pool_shmem_lock();
3443 pool_invalidate_query_cache(num_oids, oids, true, 0);
3444 pool_shmem_unlock();
3445 POOL_SETMASK(&oldmask);
3446 }
3447
3448 /* Count up SELECT stats */
3449 pool_stats_count_up_num_selects(1);
3450 pool_reset_memqcache_buffer(true);
3451 }
3452 else
3453 {
3454 /*
3455 * If we are inside a transaction, we cannot invalidate
3456 * query cache yet. However we can clear cache buffer, if
3457 * DML/DDL modifies the TABLE which SELECT uses.
3458 */
3459 if (num_oids > 0 && pool_config->memqcache_auto_cache_invalidation)
3460 {
3461 pool_check_and_discard_cache_buffer(num_oids, oids);
3462 pool_reset_memqcache_buffer(false);
3463 }
3464
3465 /* Count up temporary SELECT stats */
3466 pool_tmp_stats_count_up_num_selects();
3467 }
3468 }
3469 /*
3470 * If the query is DROP DATABASE, discard both of caches in shmem/memcached and
3471 * oidmap in memqcache_oiddir.
3472 */
3473 else if (is_drop_database(node) && session_context->query_context->dboid != 0)
3474 {
3475 int dboid = session_context->query_context->dboid;
3476 num_oids = pool_get_dropdb_table_oids(&oids, dboid);
3477
3478 if (num_oids > 0 && pool_config->memqcache_auto_cache_invalidation)
3479 {
3480 pool_shmem_lock();
3481 pool_invalidate_query_cache(num_oids, oids, true, dboid);
3482 pool_discard_oid_maps_by_db(dboid);
3483 pool_shmem_unlock();
3484 pool_reset_memqcache_buffer(true);
3485
3486 pfree(oids);
3487 ereport(DEBUG2,
3488 (errmsg("query cache handler for ReadyForQuery"),
3489 errdetail("deleted all cache files for the DROPped DB")));
3490 }
3491 }
3492 else
3493 {
3494 /*
3495 * DML/DCL/DDL case
3496 */
3497
3498 /* Extract table oids from buffer */
3499 num_oids = pool_get_dml_table_oid(&oids);
3500 if (num_oids > 0 && pool_config->memqcache_auto_cache_invalidation)
3501 {
3502 /*
3503 * If we are not inside a transaction, we can
3504 * immediately invalidate query cache.
3505 */
3506 if (state == 'I')
3507 {
3508 POOL_SETMASK2(&BlockSig, &oldmask);
3509 pool_shmem_lock();
3510 pool_invalidate_query_cache(num_oids, oids, true, 0);
3511 pool_shmem_unlock();
3512 POOL_SETMASK(&oldmask);
3513 pool_reset_memqcache_buffer(true);
3514 }
3515 else
3516 {
3517 /*
3518 * If we are inside a transaction, we
3519 * cannot invalidate query cache
3520 * yet. However we can clear cache buffer,
3521 * if DML/DDL modifies the TABLE which SELECT uses.
3522 */
3523 pool_check_and_discard_cache_buffer(num_oids, oids);
3524 pool_reset_memqcache_buffer(false);
3525 }
3526 }
3527 else if (num_oids == 0)
3528 {
3529 /*
3530 * It is also necessary to clear cache buffers in case of
3531 * no oid queries (like BEGIN, CHECKPOINT, VACUUM, etc) too.
3532 */
3533 pool_reset_memqcache_buffer(true);
3534 }
3535 }
3536 }
3537 }
3538
3539 /*
3540 * Create and initialize query cache stats
3541 */
3542 static POOL_QUERY_CACHE_STATS *stats;
pool_init_memqcache_stats(void)3543 int pool_init_memqcache_stats(void)
3544 {
3545 stats = pool_shared_memory_create(sizeof(POOL_QUERY_CACHE_STATS));
3546 pool_reset_memqcache_stats();
3547 return 0;
3548 }
3549
3550 /*
3551 * Returns copy of stats area. The copy is in static area and will be
3552 * overwritten by next call to this function.
3553 */
pool_get_memqcache_stats(void)3554 POOL_QUERY_CACHE_STATS *pool_get_memqcache_stats(void)
3555 {
3556 static POOL_QUERY_CACHE_STATS mystats;
3557 pool_sigset_t oldmask;
3558
3559 memset(&mystats, 0, sizeof(POOL_QUERY_CACHE_STATS));
3560
3561 if (stats)
3562 {
3563 POOL_SETMASK2(&BlockSig, &oldmask);
3564 pool_semaphore_lock(QUERY_CACHE_STATS_SEM);
3565 memcpy(&mystats, stats, sizeof(POOL_QUERY_CACHE_STATS));
3566 pool_semaphore_unlock(QUERY_CACHE_STATS_SEM);
3567 POOL_SETMASK(&oldmask);
3568 }
3569
3570 return &mystats;
3571 }
3572
3573 /*
3574 * Reset query cache stats. Caller must lock QUERY_CACHE_STATS_SEM if
3575 * necessary.
3576 */
pool_reset_memqcache_stats(void)3577 void pool_reset_memqcache_stats(void)
3578 {
3579 memset(stats, 0, sizeof(POOL_QUERY_CACHE_STATS));
3580 stats->start_time = time(NULL);
3581 }
3582
3583 /*
3584 * Count up number of successful SELECTs and returns the number.
3585 * QUERY_CACHE_STATS_SEM lock is acquired in this function.
3586 */
pool_stats_count_up_num_selects(long long int num)3587 long long int pool_stats_count_up_num_selects(long long int num)
3588 {
3589 pool_sigset_t oldmask;
3590
3591 POOL_SETMASK2(&BlockSig, &oldmask);
3592 pool_semaphore_lock(QUERY_CACHE_STATS_SEM);
3593 stats->num_selects += num;
3594 pool_semaphore_unlock(QUERY_CACHE_STATS_SEM);
3595 POOL_SETMASK(&oldmask);
3596 return stats->num_selects;
3597 }
3598
3599 /*
3600 * Count up number of successful SELECTs in temporary area and returns
3601 * the number.
3602 */
pool_tmp_stats_count_up_num_selects(void)3603 long long int pool_tmp_stats_count_up_num_selects(void)
3604 {
3605 POOL_SESSION_CONTEXT *session_context;
3606
3607 session_context = pool_get_session_context(false);
3608 session_context->num_selects++;
3609 return session_context->num_selects;
3610 }
3611
3612 /*
3613 * Return number of successful SELECTs in temporary area.
3614 */
pool_tmp_stats_get_num_selects(void)3615 long long int pool_tmp_stats_get_num_selects(void)
3616 {
3617 POOL_SESSION_CONTEXT *session_context;
3618
3619 session_context = pool_get_session_context(false);
3620 return session_context->num_selects;
3621 }
3622
3623 /*
3624 * Reset number of successful SELECTs in temporary area.
3625 */
pool_tmp_stats_reset_num_selects(void)3626 void pool_tmp_stats_reset_num_selects(void)
3627 {
3628 POOL_SESSION_CONTEXT *session_context;
3629
3630 session_context = pool_get_session_context(false);
3631 session_context->num_selects = 0;
3632 }
3633
3634 /*
3635 * Count up number of SELECTs extracted from cache returns the number.
3636 * QUERY_CACHE_STATS_SEM lock is acquired in this function.
3637 */
pool_stats_count_up_num_cache_hits(void)3638 long long int pool_stats_count_up_num_cache_hits(void)
3639 {
3640 pool_sigset_t oldmask;
3641
3642 POOL_SETMASK2(&BlockSig, &oldmask);
3643 pool_semaphore_lock(QUERY_CACHE_STATS_SEM);
3644 stats->num_cache_hits++;
3645 pool_semaphore_unlock(QUERY_CACHE_STATS_SEM);
3646 POOL_SETMASK(&oldmask);
3647 return stats->num_cache_hits;
3648 }
3649
3650 /*
3651 * On shared memory hash table implementation. We use sub part of md5
3652 * hash key as hash function. The experiment has shown that has_any()
3653 * of PostgreSQL is a little bit better than the method using part of
3654 * md5 hash value, but it seems adding some cpu cycles to call
3655 * hash_any() is not worth the trouble.
3656 */
3657
3658 static volatile POOL_HASH_HEADER *hash_header;
3659 static volatile POOL_HASH_ELEMENT *hash_elements;
3660 static volatile POOL_HASH_ELEMENT *hash_free;
3661
3662 /*
3663 * Initialize hash table on shared memory "nelements" is max number of
3664 * hash keys. The actual number of hash key is rounded up to power of
3665 * 2.
3666 */
3667 #undef POOL_HASH_DEBUG
3668
pool_hash_init(int nelements)3669 int pool_hash_init(int nelements)
3670 {
3671 size_t size;
3672 int nelements2; /* number of rounded up hash keys */
3673 int shift;
3674 uint32 mask;
3675 POOL_HASH_HEADER hh;
3676 int i;
3677
3678 if (nelements <= 0)
3679 ereport(ERROR,
3680 (errmsg("initializing hash table on shared memory, invalid number of elements: %d",nelements)));
3681
3682 /* Round up to power of 2 */
3683 shift = 32;
3684 nelements2 = 1;
3685 do
3686 {
3687 nelements2 <<= 1;
3688 shift--;
3689 } while (nelements2 < nelements);
3690
3691 mask = ~0;
3692 mask >>= shift;
3693 size = (char *)&hh.elements - (char *)&hh + sizeof(POOL_HEADER_ELEMENT)*nelements2;
3694 hash_header = pool_shared_memory_create(size);
3695 hash_header->nhash = nelements2;
3696 hash_header->mask = mask;
3697
3698 #ifdef POOL_HASH_DEBUG
3699 ereport(LOG,
3700 (errmsg("initializing hash table on shared memory"),
3701 errdetail("size:%zd nelements2:%d", size, nelements2)));
3702
3703 #endif
3704
3705 size = sizeof(POOL_HASH_ELEMENT)*nelements2;
3706 hash_elements = pool_shared_memory_create(size);
3707
3708 #ifdef POOL_HASH_DEBUG
3709 ereport(LOG,
3710 (errmsg("initializing hash table on shared memory"),
3711 errdetail("size:%zd nelements2:%d", size, nelements2)));
3712 #endif
3713
3714 for (i=0;i<nelements2-1;i++)
3715 {
3716 hash_elements[i].next = (POOL_HASH_ELEMENT *)&hash_elements[i+1];
3717 }
3718 hash_elements[nelements2-1].next = NULL;
3719 hash_free = hash_elements;
3720
3721 return 0;
3722 }
3723
3724 /*
3725 * Reset hash table on shared memory "nelements" is max number of
3726 * hash keys. The actual number of hash key is rounded up to power of
3727 * 2.
3728 */
3729 static int
pool_hash_reset(int nelements)3730 pool_hash_reset(int nelements)
3731 {
3732 size_t size;
3733 int nelements2; /* number of rounded up hash keys */
3734 int shift;
3735 uint32 mask;
3736 POOL_HASH_HEADER hh;
3737 int i;
3738
3739 if (nelements <= 0)
3740 ereport(ERROR,
3741 (errmsg("clearing hash table on shared memory, invalid number of elements: %d",nelements)));
3742
3743 /* Round up to power of 2 */
3744 shift = 32;
3745 nelements2 = 1;
3746 do
3747 {
3748 nelements2 <<= 1;
3749 shift--;
3750 } while (nelements2 < nelements);
3751
3752 mask = ~0;
3753 mask >>= shift;
3754
3755 size = (char *)&hh.elements - (char *)&hh + sizeof(POOL_HEADER_ELEMENT)*nelements2;
3756 memset((void *)hash_header, 0, size);
3757
3758 hash_header->nhash = nelements2;
3759 hash_header->mask = mask;
3760
3761 size = sizeof(POOL_HASH_ELEMENT)*nelements2;
3762 memset((void *)hash_elements, 0, size);
3763
3764 for (i=0;i<nelements2-1;i++)
3765 {
3766 hash_elements[i].next = (POOL_HASH_ELEMENT *)&hash_elements[i+1];
3767 }
3768 hash_elements[nelements2-1].next = NULL;
3769 hash_free = hash_elements;
3770
3771 return 0;
3772 }
3773
3774 /*
3775 * Search cacheid by MD5 hash key string
3776 * If found, returns cache id, otherwise NULL.
3777 */
pool_hash_search(POOL_QUERY_HASH * key)3778 POOL_CACHEID *pool_hash_search(POOL_QUERY_HASH *key)
3779 {
3780 volatile POOL_HASH_ELEMENT *element;
3781
3782 uint32 hash_key = create_hash_key(key);
3783
3784 if (hash_key >= hash_header->nhash)
3785 {
3786 ereport(WARNING,
3787 (errmsg("memcache: searching cacheid from hash. invalid hash key"),
3788 errdetail("invalid hash key: %uld nhash: %ld",
3789 hash_key, hash_header->nhash)));
3790 return NULL;
3791 }
3792
3793 {
3794 char md5[POOL_MD5_HASHKEYLEN+1];
3795 memcpy(md5, key->query_hash, POOL_MD5_HASHKEYLEN);
3796 md5[POOL_MD5_HASHKEYLEN] = '\0';
3797 #ifdef POOL_HASH_DEBUG
3798 ereport(LOG,
3799 (errmsg("searching hash table"),
3800 errdetail("hash_key:%d md5:%s", hash_key, md5)));
3801 #endif
3802 }
3803
3804 element = hash_header->elements[hash_key].element;
3805 while (element)
3806 {
3807 {
3808 char md5[POOL_MD5_HASHKEYLEN+1];
3809 memcpy(md5, key->query_hash, POOL_MD5_HASHKEYLEN);
3810 md5[POOL_MD5_HASHKEYLEN] = '\0';
3811 #ifdef POOL_HASH_DEBUG
3812 ereport(LOG,
3813 (errmsg("searching hash table"),
3814 errdetail("element md5:%s", md5)));
3815 #endif
3816 }
3817
3818 if (memcmp((const void *)element->hashkey.query_hash,
3819 (const void *)key->query_hash, sizeof(key->query_hash)) == 0)
3820 {
3821 return (POOL_CACHEID *)&element->cacheid;
3822 }
3823 element = element->next;
3824 }
3825 return NULL;
3826 }
3827
3828 /*
3829 * Insert MD5 key and associated cache id into shmem hash table. If
3830 * "update" is true, replace cacheid associated with the MD5 key,
3831 * rather than throw an error.
3832 */
pool_hash_insert(POOL_QUERY_HASH * key,POOL_CACHEID * cacheid,bool update)3833 static int pool_hash_insert(POOL_QUERY_HASH *key, POOL_CACHEID *cacheid, bool update)
3834 {
3835 POOL_HASH_ELEMENT *element;
3836 POOL_HASH_ELEMENT *new_element;
3837
3838 uint32 hash_key = create_hash_key(key);
3839
3840 if (hash_key >= hash_header->nhash)
3841 {
3842 ereport(WARNING,
3843 (errmsg("memcache: adding cacheid to hash. invalid hash key"),
3844 errdetail("invalid hash key: %uld nhash: %ld",
3845 hash_key, hash_header->nhash)));
3846 return -1;
3847 }
3848
3849 {
3850 char md5[POOL_MD5_HASHKEYLEN+1];
3851 memcpy(md5, key->query_hash, POOL_MD5_HASHKEYLEN);
3852 md5[POOL_MD5_HASHKEYLEN] = '\0';
3853 #ifdef POOL_HASH_DEBUG
3854 ereport(LOG,
3855 (errmsg("searching hash table"),
3856 errdetail("hash_key:%d md5:%s block:%d item:%d", hash_key, md5, cacheid->blockid, cacheid->itemid)));
3857 #endif
3858 }
3859
3860 /*
3861 * Look for hash key.
3862 */
3863 element = hash_header->elements[hash_key].element;
3864
3865 while (element)
3866 {
3867 if (memcmp((const void *)element->hashkey.query_hash,
3868 (const void *)key->query_hash, sizeof(key->query_hash)) == 0)
3869 {
3870 /* Hash key found. If "update" is false, just throw an error. */
3871 char md5[POOL_MD5_HASHKEYLEN+1];
3872
3873 if (!update)
3874 {
3875 memcpy(md5, key->query_hash, POOL_MD5_HASHKEYLEN);
3876 md5[POOL_MD5_HASHKEYLEN] = '\0';
3877 ereport(LOG,
3878 (errmsg("memcache: adding cacheid to hash. hash key:\"%s\" already exists",md5)));
3879 return -1;
3880 }
3881 else
3882 {
3883 /* Update cache id */
3884 memcpy((void *)&element->cacheid, cacheid, sizeof(POOL_CACHEID));
3885 return 0;
3886 }
3887 }
3888 element = element->next;
3889 }
3890
3891 /*
3892 * Ok, same key did not exist. Just insert new hash key.
3893 */
3894 new_element = (POOL_HASH_ELEMENT *)get_new_hash_element();
3895 if (!new_element)
3896 {
3897 ereport(LOG,
3898 (errmsg("memcache: adding cacheid to hash. failed to get new element")));
3899 return -1;
3900 }
3901
3902 element = hash_header->elements[hash_key].element;
3903
3904 hash_header->elements[hash_key].element = new_element;
3905 new_element->next = element;
3906
3907 memcpy((void *)new_element->hashkey.query_hash, key->query_hash, POOL_MD5_HASHKEYLEN);
3908 memcpy((void *)&new_element->cacheid, cacheid, sizeof(POOL_CACHEID));
3909
3910 return 0;
3911 }
3912
3913 /*
3914 * Delete MD5 key and associated cache id from shmem hash table.
3915 */
pool_hash_delete(POOL_QUERY_HASH * key)3916 int pool_hash_delete(POOL_QUERY_HASH *key)
3917 {
3918 POOL_HASH_ELEMENT *element;
3919 POOL_HASH_ELEMENT **delete_point;
3920 bool found;
3921
3922 uint32 hash_key = create_hash_key(key);
3923
3924 if (hash_key >= hash_header->nhash)
3925 {
3926 ereport(LOG,
3927 (errmsg("memcache: deleting key from hash. invalid key"),
3928 errdetail("invalid hash key: %uld nhash: %ld",
3929 hash_key, hash_header->nhash)));
3930 return -1;
3931 }
3932
3933 /*
3934 * Look for delete location
3935 */
3936 found = false;
3937 delete_point = (POOL_HASH_ELEMENT **)&(hash_header->elements[hash_key].element);
3938 element = hash_header->elements[hash_key].element;
3939
3940 while (element)
3941 {
3942 if (memcmp(element->hashkey.query_hash, key->query_hash, sizeof(key->query_hash)) == 0)
3943 {
3944 found = true;
3945 break;
3946 }
3947 delete_point = &element->next;
3948 element = element->next;
3949 }
3950
3951 if (!found)
3952 {
3953 char md5[POOL_MD5_HASHKEYLEN+1];
3954
3955 memcpy(md5, key->query_hash, POOL_MD5_HASHKEYLEN);
3956 md5[POOL_MD5_HASHKEYLEN] = '\0';
3957 ereport(LOG,
3958 (errmsg("memcache: deleting key from hash. key:\"%s\" not found",md5)));
3959 return -1;
3960 }
3961
3962 /*
3963 * Put back the element to free list
3964 */
3965 *delete_point = element->next;
3966 put_back_hash_element(element);
3967
3968 return 0;
3969 }
3970
3971 /*
3972 * Calculate 32bit binary hash key (i.e. location in hash header) from MD5
3973 * string. We use top most 8 characters of MD5 string for calculation.
3974 */
create_hash_key(POOL_QUERY_HASH * key)3975 static uint32 create_hash_key(POOL_QUERY_HASH *key)
3976 {
3977 #define POOL_HASH_NCHARS 8
3978
3979 char md5[POOL_HASH_NCHARS+1];
3980 uint32 mask;
3981
3982 memcpy(md5, key->query_hash, POOL_HASH_NCHARS);
3983 md5[POOL_HASH_NCHARS] = '\0';
3984 mask = strtoul(md5, NULL, 16);
3985 mask &= hash_header->mask;
3986 return mask;
3987 }
3988
3989 /*
3990 * Get new free hash element from free list.
3991 */
get_new_hash_element(void)3992 static volatile POOL_HASH_ELEMENT *get_new_hash_element(void)
3993 {
3994 volatile POOL_HASH_ELEMENT *elm;
3995
3996 if (!hash_free->next)
3997 {
3998 /* No free element */
3999 return NULL;
4000 }
4001
4002 #ifdef POOL_HASH_DEBUG
4003 ereport(LOG,
4004 (errmsg("getting new hash element"),
4005 errdetail("hash_free->next:%p hash_free->next->next:%p",
4006 hash_free->next, hash_free->next->next)));
4007 #endif
4008
4009 elm = hash_free->next;
4010 hash_free->next = elm->next;
4011
4012 return elm;
4013 }
4014
4015 /*
4016 * Put back hash element to free list.
4017 */
put_back_hash_element(volatile POOL_HASH_ELEMENT * element)4018 static void put_back_hash_element(volatile POOL_HASH_ELEMENT *element)
4019 {
4020 POOL_HASH_ELEMENT *elm;
4021
4022 #ifdef POOL_HASH_DEBUG
4023 ereport(LOG,
4024 (errmsg("getting new hash element"),
4025 errdetail("hash_free->next:%p hash_free->next->next:%p",
4026 hash_free->next, hash_free->next->next)));
4027 #endif
4028
4029 elm = hash_free->next;
4030 hash_free->next = (POOL_HASH_ELEMENT *)element;
4031 element->next = elm;
4032 }
4033
4034 /*
4035 * Return true if there's a free hash element.
4036 */
is_free_hash_element(void)4037 static bool is_free_hash_element(void)
4038 {
4039 return hash_free->next != NULL;
4040 }
4041
4042 /*
4043 * Returns shared memory cache stats.
4044 * Subsequent call to this function will break return value
4045 * because its in static memory.
4046 * Caller must hold shmem_lock before calling this function.
4047 * If in memory query cache is not enabled, all stats are 0.
4048 */
pool_get_shmem_storage_stats(void)4049 POOL_SHMEM_STATS *pool_get_shmem_storage_stats(void)
4050 {
4051 static POOL_SHMEM_STATS mystats;
4052 POOL_HASH_ELEMENT *element;
4053 int nblocks;
4054 int i;
4055
4056 memset(&mystats, 0, sizeof(POOL_SHMEM_STATS));
4057
4058 if (!pool_config-> memory_cache_enabled)
4059 return &mystats;
4060
4061 /*
4062 * Copy cache hit data
4063 */
4064 mystats.cache_stats.num_selects = stats->num_selects;
4065 mystats.cache_stats.num_cache_hits = stats->num_cache_hits;
4066
4067 if (pool_config->memqcache_method != SHMEM_CACHE)
4068 return &mystats;
4069
4070 /* number of total hash entries */
4071 mystats.num_hash_entries = hash_header->nhash;
4072
4073 /* number of used hash entries */
4074 for (i=0;i<hash_header->nhash;i++)
4075 {
4076 element = hash_header->elements[i].element;
4077 while (element)
4078 {
4079 mystats.used_hash_entries++;
4080 element = element->next;
4081 }
4082 }
4083
4084 nblocks = pool_get_memqcache_blocks();
4085
4086 for (i=0;i<nblocks;i++)
4087 {
4088 POOL_CACHE_BLOCK_HEADER *bh;
4089 POOL_CACHE_ITEM_POINTER *cip;
4090 char *p = block_address(i);
4091 bh = (POOL_CACHE_BLOCK_HEADER *)p;
4092 int j;
4093
4094 if (bh->flags & POOL_BLOCK_USED)
4095 {
4096 for (j=0;j<bh->num_items;j++)
4097 {
4098 cip = item_pointer(p, j);
4099 if (POOL_ITEM_DELETED & cip->flags)
4100 {
4101 mystats.fragment_cache_entries_size += item_header(p, j)->total_length;
4102 }
4103 else
4104 {
4105 /* number of used cache entries */
4106 mystats.num_cache_entries++;
4107 /* total size of used cache entries */
4108 mystats.used_cache_entries_size += (item_header(p, j)->total_length + sizeof(POOL_CACHE_ITEM_POINTER));
4109 }
4110 }
4111 mystats.used_cache_entries_size += sizeof(POOL_CACHE_BLOCK_HEADER);
4112 /* total size of free(usable) cache entries */
4113 mystats.free_cache_entries_size += bh->free_bytes;
4114 }
4115 else
4116 {
4117 mystats.free_cache_entries_size += pool_config->memqcache_cache_block_size;
4118 }
4119 }
4120
4121 /*
4122 * Copy POOL_QUERY_CACHE_STATS
4123 */
4124 memcpy(&mystats.cache_stats, stats, sizeof(mystats.cache_stats));
4125
4126 return &mystats;
4127 }
4128
4129 /*
4130 * Inject cached message to the target backend buffer to pretend as if backend
4131 * actually replies with Data row and Command Complete message.
4132 */
inject_cached_message(POOL_CONNECTION * backend,char * qcache,int qcachelen)4133 static void inject_cached_message(POOL_CONNECTION *backend, char *qcache, int qcachelen)
4134 {
4135 char kind;
4136 int len;
4137 char *buf;
4138 int timeout;
4139 int i = 0;
4140 bool is_prepared_stmt = false;
4141 POOL_SESSION_CONTEXT *session_context;
4142 POOL_QUERY_CONTEXT* query_context;
4143 POOL_PENDING_MESSAGE *msg;
4144
4145 session_context = pool_get_session_context(false);
4146 query_context = session_context->query_context;
4147 msg = pool_pending_message_find_lastest_by_query_context(query_context);
4148
4149 if (msg)
4150 {
4151 /*
4152 * If pending message found, we should extract target backend from it
4153 */
4154 int backend_id;
4155
4156 backend_id = pool_pending_message_get_target_backend_id(msg);
4157 backend = CONNECTION(session_context->backend, backend_id);
4158 timeout = -1;
4159 }
4160 else
4161 timeout = 0;
4162
4163 /* Send flush messsage to backend to retrieve response of backend */
4164 pool_write(backend, "H", 1);
4165 len = htonl(sizeof(len));
4166 pool_write_and_flush(backend, &len, sizeof(len));
4167
4168 /*
4169 * Push any response from backend
4170 */
4171 for(;;)
4172 {
4173 pool_read(backend, &kind, 1);
4174 ereport(DEBUG1,
4175 (errmsg("inject_cached_message: push message kind: '%c'", kind)));
4176 if (msg &&
4177 ((kind == 'T' && msg->type == POOL_DESCRIBE) ||
4178 (kind == '2' && msg->type == POOL_BIND)))
4179 {
4180 /* Pending message seen. Now it is likely to end of pending data */
4181 timeout = 0;
4182 }
4183 pool_push(backend, &kind, sizeof(kind));
4184 pool_read(backend, &len, sizeof(len));
4185 pool_push(backend, &len, sizeof(len));
4186 if ((ntohl(len)-sizeof(len)) > 0)
4187 {
4188 buf = pool_read2(backend, ntohl(len)-sizeof(len));
4189 pool_push(backend, buf, ntohl(len)-sizeof(len));
4190 }
4191
4192 /* check if there's any pending data */
4193 if (!pool_ssl_pending(backend) && pool_read_buffer_is_empty(backend))
4194 {
4195 pool_set_timeout(timeout);
4196 if (pool_check_fd(backend) != 0)
4197 {
4198 ereport(DEBUG1,
4199 (errmsg("inject_cached_message: select shows no pending data")));
4200 pool_set_timeout(-1);
4201 break;
4202 }
4203 pool_set_timeout(-1);
4204 }
4205 }
4206
4207 /*
4208 * Inject row data and command complete
4209 */
4210 while (i < qcachelen)
4211 {
4212 char tmpkind;
4213 int tmplen;
4214 char *p;
4215
4216 tmpkind = qcache[i];
4217 i++;
4218
4219 memcpy(&tmplen, qcache+i, sizeof(tmplen));
4220 i += sizeof(tmplen);
4221 len = ntohl(tmplen);
4222 p = qcache + i;
4223 i += len - sizeof(tmplen);
4224
4225 /* No need to cache PARSE and BIND responses */
4226 if (tmpkind == '1' || tmpkind == '2')
4227 {
4228 is_prepared_stmt = true;
4229 continue;
4230 }
4231
4232 /*
4233 * In the prepared statement execution, there is no need to send
4234 * 'T' response to the frontend.
4235 */
4236 if (is_prepared_stmt && tmpkind == 'T')
4237 {
4238 continue;
4239 }
4240
4241 /* push message */
4242 ereport(DEBUG1,
4243 (errmsg("inject_cached_message: push cached messages: '%c' len: %d", tmpkind, len)));
4244 pool_push(backend, &tmpkind, 1);
4245 pool_push(backend, &tmplen, sizeof(tmplen));
4246 if (len > 0)
4247 pool_push(backend, p, len - sizeof(tmplen));
4248 }
4249
4250 /*
4251 * Pop data.
4252 */
4253 pool_pop(backend, &len);
4254 }
4255