1 /* -*-pgsql-c-*- */
2 /*
3  * pgpool: a language independent connection pool server for PostgreSQL
4  * written by Tatsuo Ishii
5  *
6  * Copyright (c) 2003-2020	PgPool Global Development Group
7  *
8  * Permission to use, copy, modify, and distribute this software and
9  * its documentation for any purpose and without fee is hereby
10  * granted, provided that the above copyright notice appear in all
11  * copies and that both that copyright notice and this permission
12  * notice appear in supporting documentation, and that the name of the
13  * author not be used in advertising or publicity pertaining to
14  * distribution of the software without specific, written prior
15  * permission. The author makes no representations about the
16  * suitability of this software for any purpose.  It is provided "as
17  * is" without express or implied warranty.
18  *
19  * pool_memqcache.c: query cache on shmem or memcached
20  *
21  */
22 #define DATABASE_TO_OID_QUERY "SELECT oid FROM pg_database WHERE datname = '%s'"
23 
24 #include "pool.h"
25 
26 #include <stdio.h>
27 #include <string.h>
28 #include <unistd.h>
29 #include <sys/file.h>
30 #include <sys/stat.h>
31 #include <sys/types.h>
32 #include <fcntl.h>
33 #include <errno.h>
34 #include <string.h>
35 #include <stdlib.h>
36 #include <ctype.h>
37 #include <arpa/inet.h>
38 #include <dirent.h>
39 
40 #ifdef USE_MEMCACHED
41 #include <libmemcached/memcached.h>
42 #endif
43 
44 #include "auth/md5.h"
45 #include "pool_config.h"
46 #include "protocol/pool_proto_modules.h"
47 #include "parser/parsenodes.h"
48 #include "context/pool_session_context.h"
49 #include "query_cache/pool_memqcache.h"
50 #include "utils/pool_relcache.h"
51 #include "utils/pool_select_walker.h"
52 #include "utils/pool_stream.h"
53 #include "utils/pool_stream.h"
54 #include "utils/elog.h"
55 #include "utils/palloc.h"
56 #include "utils/memutils.h"
57 
58 
59 #ifdef USE_MEMCACHED
60 memcached_st *memc;
61 #endif
62 
63 static char* encode_key(const char *s, char *buf, POOL_CONNECTION_POOL *backend);
64 #ifdef DEBUG
65 static void dump_cache_data(const char *data, size_t len);
66 #endif
67 static int pool_commit_cache(POOL_CONNECTION_POOL *backend, char *query, char *data, size_t datalen, int num_oids, int *oids);
68 static int pool_fetch_cache(POOL_CONNECTION_POOL *backend, const char *query, char **buf, size_t *len);
69 static int send_cached_messages(POOL_CONNECTION *frontend, const char *qcache, int qcachelen);
70 static void send_message(POOL_CONNECTION *conn, char kind, int len, const char *data);
71 #ifdef USE_MEMCACHED
72 static int delete_cache_on_memcached(const char *key);
73 #endif
74 static int pool_get_dml_table_oid(int **oid);
75 static int pool_get_dropdb_table_oids(int **oids, int dboid);
76 static void pool_discard_dml_table_oid(void);
77 static void pool_invalidate_query_cache(int num_table_oids, int *table_oid, bool unlink, int dboid);
78 
79 static int pool_get_database_oid(void);
80 static void pool_add_table_oid_map(POOL_CACHEKEY *cachkey, int num_table_oids, int *table_oids);
81 static void pool_reset_memqcache_buffer(bool reset_dml_oids);
82 static POOL_CACHEID *pool_add_item_shmem_cache(POOL_QUERY_HASH *query_hash, char *data, int size);
83 static POOL_CACHEID *pool_find_item_on_shmem_cache(POOL_QUERY_HASH *query_hash);
84 static char *pool_get_item_shmem_cache(POOL_QUERY_HASH *query_hash, int *size, int *sts);
85 static POOL_QUERY_CACHE_ARRAY * pool_add_query_cache_array(POOL_QUERY_CACHE_ARRAY *cache_array, POOL_TEMP_QUERY_CACHE *cache);
86 static void pool_add_temp_query_cache(POOL_TEMP_QUERY_CACHE *temp_cache, char kind, char *data, int data_len);
87 static void pool_add_oids_temp_query_cache(POOL_TEMP_QUERY_CACHE *temp_cache, int num_oids, int *oids);
88 static POOL_INTERNAL_BUFFER *pool_create_buffer(void);
89 static void pool_discard_buffer(POOL_INTERNAL_BUFFER *buffer);
90 static void pool_add_buffer(POOL_INTERNAL_BUFFER *buffer, void *data, size_t len);
91 static void *pool_get_buffer(POOL_INTERNAL_BUFFER *buffer, size_t *len);
92 #ifdef NOT_USED
93 static char *pool_get_buffer_pointer(POOL_INTERNAL_BUFFER *buffer);
94 #endif
95 static char *pool_get_current_cache_buffer(size_t *len);
96 static size_t pool_get_buffer_length(POOL_INTERNAL_BUFFER *buffer);
97 static void pool_check_and_discard_cache_buffer(int num_oids, int *oids);
98 
99 static void pool_set_memqcache_blocks(int num_blocks);
100 static int pool_get_memqcache_blocks(void);
101 static void *pool_memory_cache_address(void);
102 static void pool_reset_fsmm(size_t size);
103 static void *pool_fsmm_address(void);
104 static void pool_update_fsmm(POOL_CACHE_BLOCKID blockid, size_t free_space);
105 static POOL_CACHE_BLOCKID pool_get_block(size_t free_space);
106 static POOL_CACHE_ITEM_HEADER *pool_cache_item_header(POOL_CACHEID *cacheid);
107 static int pool_init_cache_block(POOL_CACHE_BLOCKID blockid);
108 #if NOT_USED
109 static void pool_wipe_out_cache_block(POOL_CACHE_BLOCKID blockid);
110 #endif
111 static int pool_delete_item_shmem_cache(POOL_CACHEID *cacheid);
112 static char *block_address(int blockid);
113 static POOL_CACHE_ITEM_POINTER *item_pointer(char *block, int i);
114 static POOL_CACHE_ITEM_HEADER *item_header(char *block, int i);
115 static POOL_CACHE_BLOCKID pool_reuse_block(void);
116 #ifdef SHMEMCACHE_DEBUG
117 static void dump_shmem_cache(POOL_CACHE_BLOCKID blockid);
118 #endif
119 
120 static int pool_hash_reset(int nelements);
121 static int pool_hash_insert(POOL_QUERY_HASH *key, POOL_CACHEID *cacheid, bool update);
122 static uint32 create_hash_key(POOL_QUERY_HASH *key);
123 static volatile POOL_HASH_ELEMENT *get_new_hash_element(void);
124 static void put_back_hash_element(volatile POOL_HASH_ELEMENT *element);
125 static bool is_free_hash_element(void);
126 static void inject_cached_message(POOL_CONNECTION *backend, char *qcache, int qcachelen);
127 
128 /*
129  * Connect to Memcached
130  */
memcached_connect(void)131 int memcached_connect(void)
132 {
133 	char *memqcache_memcached_host;
134 	int memqcache_memcached_port;
135 #ifdef USE_MEMCACHED
136 	memcached_server_st *servers;
137 	memcached_return rc;
138 
139 	/* Already connected? */
140 	if (memc)
141 	{
142 		return 0;
143 	}
144 #endif
145 
146 	memqcache_memcached_host = pool_config->memqcache_memcached_host;
147 	memqcache_memcached_port = pool_config->memqcache_memcached_port;
148 
149 	ereport(DEBUG1,
150             (errmsg("connecting to memcached on Host:\"%s:%d\"", memqcache_memcached_host,memqcache_memcached_port)));
151 
152 #ifdef USE_MEMCACHED
153 	memc = memcached_create(NULL);
154 	servers = memcached_server_list_append(NULL,
155 										   memqcache_memcached_host,
156 										   memqcache_memcached_port,
157 										   &rc);
158 
159 	rc = memcached_server_push(memc, servers);
160 	if (rc != MEMCACHED_SUCCESS)
161 	{
162 		ereport(WARNING,
163 			(errmsg("failed to connect to memcached, server push error:\"%s\"\n", memcached_strerror(memc, rc))));
164 		memc = (memcached_st *)-1;
165 		return -1;
166 	}
167 	memcached_server_list_free(servers);
168 #else
169 	ereport(WARNING,
170 			(errmsg("failed to connect to memcached, memcached support is not enabled")));
171 	return -1;
172 #endif
173 	return 0;
174 }
175 
176 /*
177  * Disconnect to Memcached
178  */
memcached_disconnect(void)179 void memcached_disconnect (void)
180 {
181 #ifdef USE_MEMCACHED
182 	if (!memc)
183 	{
184 		return;
185 	}
186 	memcached_free(memc);
187 #else
188 	ereport(WARNING,
189 			(errmsg("failed to disconnect from memcached, memcached support is not enabled")));
190 #endif
191 }
192 
193 /*
194  * Register buffer data for query cache in memory cache
195  */
memqcache_register(char kind,POOL_CONNECTION * frontend,char * data,int data_len)196 void memqcache_register(char kind,
197                         POOL_CONNECTION *frontend,
198                         char *data,
199                         int data_len)
200 {
201 	POOL_TEMP_QUERY_CACHE *cache;
202 	POOL_SESSION_CONTEXT *session_context;
203 	POOL_QUERY_CONTEXT *query_context;
204 
205 	cache = pool_get_current_cache();
206 
207 	if (cache == NULL)
208 	{
209 		session_context = pool_get_session_context(true);
210 
211 		if (session_context && pool_is_query_in_progress())
212 		{
213 			char *query = pool_get_query_string();
214 			query_context = session_context->query_context;
215 
216 			if (query)
217 				query_context->temp_cache = pool_create_temp_query_cache(query);
218 		}
219 	}
220 
221 	pool_add_temp_query_cache(cache, kind, data, data_len);
222 }
223 
224 /*
225  * Commit SELECT results to cache storage.
226  */
pool_commit_cache(POOL_CONNECTION_POOL * backend,char * query,char * data,size_t datalen,int num_oids,int * oids)227 static int pool_commit_cache(POOL_CONNECTION_POOL *backend, char *query, char *data, size_t datalen, int num_oids, int *oids)
228 {
229 #ifdef USE_MEMCACHED
230 	memcached_return rc;
231 #endif
232 	POOL_CACHEKEY cachekey;
233 	char tmpkey[MAX_KEY];
234 	time_t memqcache_expire;
235 
236 	/*
237 	 * get_buflen() will return -1 if query result exceeds memqcache_maxcache
238 	 */
239 	if (datalen == -1)
240 	{
241 		return -1;
242 	}
243 
244 	/* query disabled */
245 	if (strlen(query) <= 0)
246 	{
247 		return -1;
248 	}
249 	ereport(DEBUG1,
250 		(errmsg("commiting SELECT results to cache storage"),
251 			 errdetail("Query=\"%s\"", query)));
252 
253 #ifdef DEBUG
254 	dump_cache_data(data, datalen);
255 #endif
256 
257 	/* encode md5key for memcached */
258 	encode_key(query, tmpkey, backend);
259 	ereport(DEBUG2,
260 		(errmsg("commiting SELECT results to cache storage"),
261 			 errdetail("search key : \"%s\"", tmpkey)));
262 
263 	memcpy(cachekey.hashkey, tmpkey, 32);
264 
265 	memqcache_expire = pool_config->memqcache_expire;
266 	ereport(DEBUG1,
267 		(errmsg("commiting SELECT results to cache storage"),
268 			 errdetail("memqcache_expire = %ld", memqcache_expire)));
269 
270 	if (pool_is_shmem_cache())
271 	{
272 		POOL_CACHEID *cacheid;
273 		POOL_QUERY_HASH query_hash;
274 
275 		memcpy(query_hash.query_hash, tmpkey, sizeof(query_hash.query_hash));
276 
277 		cacheid = pool_hash_search(&query_hash);
278 
279 		if (cacheid != NULL)
280 		{
281 			ereport(DEBUG1,
282 				(errmsg("commiting SELECT results to cache storage"),
283 					 errdetail("item already exists")));
284 
285 			return 0;
286 		}
287 		else
288 		{
289 			cacheid = pool_add_item_shmem_cache(&query_hash, data, datalen);
290 			if (cacheid == NULL)
291 			{
292 				ereport(LOG,
293 						(errmsg("failed to add item to shmem cache")));
294 				return -1;
295 			}
296 			else
297 			{
298 				ereport(DEBUG2,
299 					(errmsg("commiting SELECT results to cache storage"),
300 						errdetail("blockid: %d itemid: %d",
301 							   cacheid->blockid, cacheid->itemid)));
302 			}
303 			cachekey.cacheid.blockid = cacheid->blockid;
304 			cachekey.cacheid.itemid = cacheid->itemid;
305 		}
306 	}
307 
308 #ifdef USE_MEMCACHED
309 	else
310 	{
311 		rc = memcached_set(memc, tmpkey, 32,
312 						   data, datalen, (time_t)memqcache_expire, 0);
313 		if (rc != MEMCACHED_SUCCESS)
314 		{
315 			ereport(WARNING,
316 					(errmsg("cache commit failed with error:\"%s\"",memcached_strerror(memc, rc))));
317 			return -1;
318 		}
319 		ereport(DEBUG1,
320 			(errmsg("commiting SELECT results to cache storage"),
321 				 errdetail("set cache succeeded")));
322 	}
323 #endif
324 
325 	/*
326 	 * Register cache id to oid map
327 	 */
328 	pool_add_table_oid_map(&cachekey, num_oids, oids);
329 
330 	return 0;
331 }
332 
333 /*
334  * Fetch from memory cache.
335  * Return:
336  * 0: fetch success,
337  * 1: not found
338  */
pool_fetch_cache(POOL_CONNECTION_POOL * backend,const char * query,char ** buf,size_t * len)339 static int pool_fetch_cache(POOL_CONNECTION_POOL *backend, const char *query, char **buf, size_t *len)
340 {
341 	char *ptr;
342 	char tmpkey[MAX_KEY];
343 	int sts;
344 	char *p;
345 
346 	if (strlen(query) <= 0)
347 		ereport(ERROR,
348 			(errmsg("fetching from cache storage, no query")));
349 
350 	/* encode md5key for memcached */
351 	encode_key(query, tmpkey, backend);
352 	ereport(DEBUG1,
353 		(errmsg("fetching from cache storage"),
354 			 errdetail("search key \"%s\"", tmpkey)));
355 
356 
357 	if (pool_is_shmem_cache())
358 	{
359 		POOL_QUERY_HASH query_hash;
360 		int mylen;
361 
362 		memcpy(query_hash.query_hash, tmpkey, sizeof(query_hash.query_hash));
363 
364 		ptr = pool_get_item_shmem_cache(&query_hash, &mylen, &sts);
365 		if (ptr == NULL)
366 		{
367 			ereport(DEBUG1,
368 				(errmsg("fetching from cache storage"),
369 					 errdetail("cache not found on shared memory")));
370 
371 			return 1;
372 		}
373 		*len = mylen;
374 	}
375 #ifdef USE_MEMCACHED
376 	else
377 	{
378 		memcached_return rc;
379 		unsigned int flags;
380 
381 		ptr = memcached_get(memc, tmpkey, strlen(tmpkey), len, &flags, &rc);
382 
383 		if (rc != MEMCACHED_SUCCESS)
384 		{
385 			if (rc != MEMCACHED_NOTFOUND)
386 			{
387 				ereport(LOG,
388 					(errmsg("fetching from cache storage, memcached_get failed with error: \"%s\"", memcached_strerror(memc, rc))));
389 				/*
390 				 * Turn off memory cache support to prevent future errors.
391 				 */
392 				pool_config->memory_cache_enabled = 0;
393 				/* Behave as if cache not found */
394 				return 1;
395 			}
396 			else
397 			{
398 				/* Not found */
399 				ereport(DEBUG1,
400 					(errmsg("fetching from cache storage"),
401 						 errdetail("cache item not found for key: \"%s\" and query:\"%s\"",tmpkey,query)));
402 				return 1;
403 			}
404 		}
405 	}
406 #else
407 	else
408 	{
409 		ereport(ERROR,
410 			(errmsg("memcached support is not enabled")));
411 	}
412 #endif
413 
414 	p = palloc(*len);
415 
416 	memcpy(p, ptr, *len);
417 
418 	if (!pool_is_shmem_cache())
419 	{
420 		free(ptr);
421 	}
422 
423 	ereport(DEBUG1,
424 		(errmsg("fetching from cache storage"),
425 			 errdetail("query=\"%s\" len:%zd", query, *len)));
426 #ifdef DEBUG
427 	dump_cache_data(p, *len);
428 #endif
429 
430 	*buf = p;
431 
432 	return 0;
433 }
434 
435 /*
436  * encode key.
437  * create cache key as md5(username + query string + database name)
438  */
encode_key(const char * s,char * buf,POOL_CONNECTION_POOL * backend)439 static char* encode_key(const char *s, char *buf, POOL_CONNECTION_POOL *backend)
440 {
441 	char* strkey;
442 	int u_length;
443 	int d_length;
444 	int q_length;
445 	int length;
446 
447 	u_length = strlen(backend->info->user);
448 	ereport(DEBUG1,
449 		(errmsg("memcache encode key"),
450 			 errdetail("username: \"%s\" database_name: \"%s\"", backend->info->user,backend->info->database)));
451 
452 	d_length = strlen(backend->info->database);
453 
454 	q_length = strlen(s);
455 	ereport(DEBUG1,
456 		(errmsg("memcache encode key"),
457 			 errdetail("query: \"%s\"", s)));
458 
459 	length = u_length + d_length + q_length + 1;
460 
461 	strkey = (char*)palloc(sizeof(char) * length);
462 
463 	snprintf(strkey, length, "%s%s%s", backend->info->user, s, backend->info->database);
464 
465 	pool_md5_hash(strkey, strlen(strkey), buf);
466 	ereport(DEBUG1,
467 		(errmsg("memcache encode key"),
468 			 errdetail("`%s' -> `%s'", strkey, buf)));
469 	pfree(strkey);
470 	return buf;
471 }
472 
473 #ifdef DEBUG
474 /*
475  * dump cache data
476  */
dump_cache_data(const char * data,size_t len)477 static void dump_cache_data(const char *data, size_t len)
478 {
479 	int i;
480 	int plen;
481 
482 
483 	fprintf(stderr,"shmem: len = %zd\n", len);
484 
485 	while (len > 0)
486 	{
487 		fprintf(stderr,"shmem: kind:%c\n", *data++);
488 		len--;
489 		memmove(&plen, data, 4);
490 		len -= 4;
491 		data += 4;
492 		plen = ntohl(plen);
493 		fprintf(stderr,"shmem: len:%d\n", plen);
494 		plen -= 4;
495 
496 		fprintf(stderr, "shmem: ");
497 		for (i=0;i<plen;i++)
498 		{
499 			fprintf(stderr, "%02x ", (unsigned char)(*data++));
500 			len--;
501 		}
502 		fprintf(stderr, "\n");
503 	}
504 }
505 #endif
506 
507 /*
508  * send cached messages
509  */
send_cached_messages(POOL_CONNECTION * frontend,const char * qcache,int qcachelen)510 static int send_cached_messages(POOL_CONNECTION *frontend, const char *qcache, int qcachelen)
511 {
512 	int msg = 0;
513 	int i = 0;
514 	int is_prepared_stmt = 0;
515 	int len;
516 	const char *p;
517 
518 	while (i < qcachelen)
519 	{
520 		char tmpkind;
521 		int tmplen;
522 
523 		tmpkind = qcache[i];
524 		i++;
525 
526 		memcpy(&tmplen, qcache+i, sizeof(tmplen));
527 		i += sizeof(tmplen);
528 		len = ntohl(tmplen);
529 		p = qcache + i;
530 		i += len - sizeof(tmplen);
531 
532 		/* No need to cache PARSE and BIND responses */
533 		if (tmpkind == '1' || tmpkind == '2')
534 		{
535 			is_prepared_stmt = 1;
536 			continue;
537 		}
538 
539 		/*
540 		 * In the prepared statement execution, there is no need to send
541 		 * 'T' response to the frontend.
542 		 */
543 		if (is_prepared_stmt && tmpkind == 'T')
544 		{
545 			continue;
546 		}
547 
548 		/* send message to frontend */
549 		ereport(DEBUG1,
550 			(errmsg("memcache: sending cached messages: '%c' len: %d", tmpkind, len)));
551 		send_message(frontend, tmpkind, len, p);
552 
553 		msg++;
554 	}
555 
556 	return msg;
557 }
558 
559 /*
560  * send message to frontend
561  */
send_message(POOL_CONNECTION * conn,char kind,int len,const char * data)562 static void send_message(POOL_CONNECTION *conn, char kind, int len, const char *data)
563 {
564 	ereport(DEBUG2,
565 			(errmsg("memcache: sending messages: kind '%c', len=%d, data=%p", kind, len, data)));
566 
567 	pool_write(conn, &kind, 1);
568 
569 	len = htonl(len);
570 	pool_write(conn, &len, sizeof(len));
571 
572 	len = ntohl(len);
573 	pool_write(conn, (void *)data, len-sizeof(len));
574 }
575 
576 #ifdef USE_MEMCACHED
577 /*
578  * delete query cache on memcached
579  */
delete_cache_on_memcached(const char * key)580 static int delete_cache_on_memcached(const char *key)
581 {
582 
583 	memcached_return rc;
584 
585 	ereport(DEBUG2,
586 			(errmsg("memcache: deleteing cache on memcached with key: \"%s\"", key)));
587 
588 
589 	/* delete cache data on memcached. key is md5 hash query */
590     rc= memcached_delete(memc, key, 32, (time_t)0);
591 
592 	/* delete cache data on memcached is failed */
593     if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_BUFFERED)
594     {
595 		ereport(LOG,
596 				(errmsg("failed to delete cache on memcached, error:\"%s\"", memcached_strerror(memc, rc))));
597         return 0;
598     }
599     return 1;
600 
601 }
602 #endif
603 
604 /*
605  * Fetch SELECT data from cache if possible.
606  */
pool_fetch_from_memory_cache(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend,char * contents,bool * foundp)607 POOL_STATUS pool_fetch_from_memory_cache(POOL_CONNECTION *frontend,
608 										 POOL_CONNECTION_POOL *backend,
609 										 char *contents, bool *foundp)
610 {
611 	char *qcache;
612 	size_t qcachelen;
613 	int sts;
614 	pool_sigset_t oldmask;
615 
616 	ereport(DEBUG1,
617 			(errmsg("pool_fetch_from_memory_cache called")));
618 
619 	*foundp = false;
620 
621     POOL_SETMASK2(&BlockSig, &oldmask);
622 	pool_shmem_lock();
623 
624     PG_TRY();
625     {
626         sts = pool_fetch_cache(backend, contents, &qcache, &qcachelen);
627     }
628     PG_CATCH();
629     {
630         pool_shmem_unlock();
631         POOL_SETMASK(&oldmask);
632         PG_RE_THROW();
633     }
634     PG_END_TRY();
635 
636 	pool_shmem_unlock();
637 	POOL_SETMASK(&oldmask);
638 
639 	if (sts != 0)
640 		/* Cache not found */
641 		return POOL_CONTINUE;
642 
643 	/*
644 	 * Cache found. If we are doing extended query and in streaming
645 	 * replication mode, we need to retrieve any responses from backend and
646 	 * forward them to frontend.
647 	 */
648 	if (pool_is_doing_extended_query_message() && SL_MODE)
649 	{
650 		POOL_SESSION_CONTEXT *session_context;
651 		POOL_CONNECTION *target_backend;
652 
653 		ereport(DEBUG1,
654 				(errmsg("memcache: injecting cache data")));
655 
656 		session_context = pool_get_session_context(true);
657 		target_backend = CONNECTION(backend, session_context->load_balance_node_id);
658 		inject_cached_message(target_backend, qcache, qcachelen);
659 	}
660 	else
661 	{
662 		/*
663 		 * Send each messages to frontend
664 		 */
665 		send_cached_messages(frontend, qcache, qcachelen);
666 	}
667 
668 	pfree(qcache);
669 
670 	/*
671 	 * Send a "READY FOR QUERY" if not in extended query.
672 	 */
673 	if (!pool_is_doing_extended_query_message() && MAJOR(backend) == PROTO_MAJOR_V3)
674 	{
675 		signed char state;
676 
677 		/*
678 		 * We keep previous transaction state.
679 		 */
680 		state = MASTER(backend)->tstate;
681 		send_message(frontend, 'Z', 5, (char *)&state);
682 	}
683 
684 	if (!pool_is_doing_extended_query_message() || !SL_MODE)
685 	{
686 		if (pool_flush(frontend))
687 		{
688 			return POOL_END;
689 		}
690 	}
691 
692 	*foundp = true;
693 
694 	if (pool_config->log_per_node_statement)
695 		ereport(LOG,
696 				(errmsg("fetch from memory cache"),
697 				 errdetail("query result fetched from cache. statement: %s", contents)));
698 
699 	ereport(DEBUG1,
700 			(errmsg("fetch from memory cache"),
701 			 errdetail("query result found in the query cache, %s", contents)));
702 
703 	return POOL_CONTINUE;
704 }
705 
706 /*
707  * Simple and rough (thus unreliable) check if the query is likely
708  * SELECT. Just check if the query starts with SELECT or WITH. This
709  * can be used before parse tree is available.
710  */
pool_is_likely_select(char * query)711 bool pool_is_likely_select(char *query)
712 {
713 	bool do_continue = false;
714 
715 	if (query == NULL)
716 		return false;
717 
718 	if (pool_config->ignore_leading_white_space)
719 	{
720 		/* Ignore leading white spaces */
721 		while (*query && isspace(*query))
722 			query++;
723 	}
724 	if (! *query)
725 	{
726 		return false;
727 	}
728 
729 	/*
730 	 * Get rid of head comment.
731 	 * It is sure that the query is in correct format, because the parser
732 	 * has rejected bad queries such as the one with not-ended comment.
733 	 */
734 	while (*query)
735 	{
736 		/* Ignore spaces and return marks */
737 		do_continue = false;
738 		while (*query && isspace(*query))
739 		{
740 			query++;
741 			do_continue = true;
742 		}
743 		if (do_continue)
744 		{
745 			continue;
746 		}
747 
748 		while (*query && !strncmp(query, "\n", 2))
749 		{
750 			query++;
751 			do_continue = true;
752 		}
753 		if (do_continue)
754 		{
755 			query += 2;
756 			continue;
757 		}
758 
759 		/* Ignore comments like C */
760 		if (!strncmp(query, "/*", 2))
761 		{
762 			while (*query && strncmp(query, "*/", 2))
763 				query++;
764 
765 			query += 2;
766 			continue;
767 		}
768 
769 		/* Ignore SQL comments */
770 		if (!strncmp(query, "--", 2))
771 		{
772 			while (*query && strncmp(query, "\n", 2))
773 				query++;
774 
775 			query += 2;
776 			continue;
777 		}
778 
779 		if (!strncasecmp(query, "SELECT", 6) || !strncasecmp(query, "WITH", 4))
780 		{
781 			return true;
782 		}
783 
784 		query++;
785 	}
786 
787 	return false;
788 }
789 
790 /*
791  * Return true if SELECT can be cached.  "node" is the parse tree for
792  * the query and "query" is the query string.
793  * The query must be SELECT or WITH.
794  */
pool_is_allow_to_cache(Node * node,char * query)795 bool pool_is_allow_to_cache(Node *node, char *query)
796 {
797 	int i = 0;
798 	int num_oids = -1;
799 	SelectContext ctx;
800 
801 	/*
802 	 * If NO QUERY CACHE comment exists, do not cache.
803 	 */
804 	if (!strncasecmp(query, NO_QUERY_CACHE, NO_QUERY_CACHE_COMMENT_SZ))
805 		return false;
806 	/*
807 	 * Check black table list first.
808 	 */
809 	if (pool_config->num_black_memqcache_table_list > 0)
810 	{
811 		/* Extract oids in from clause of SELECT, and check if SELECT to them could be cached. */
812 		num_oids = pool_extract_table_oids_from_select_stmt(node, &ctx);
813 		if (num_oids > 0)
814 		{
815 			for (i = 0; i < num_oids; i++)
816 			{
817 				ereport(DEBUG1,
818 						(errmsg("memcache: checking if node is allowed to cache: check table_names[%d] = \"%s\"", i, ctx.table_names[i])));
819 				if (pool_is_table_in_black_list(ctx.table_names[i]) == true)
820 				{
821 					ereport(DEBUG1,
822 							(errmsg("memcache: node is not allowed to cache")));
823 					return false;
824 				}
825 			}
826 		}
827 	}
828 
829 	/* SELECT INTO or SELECT FOR SHARE or UPDATE cannot be cached */
830 	if (pool_has_insertinto_or_locking_clause(node))
831 		return false;
832 
833 	/*
834 	 * If SELECT uses non immutable functions, it's not allowed to
835 	 * cache.
836 	 */
837 	if (pool_has_non_immutable_function_call(node))
838 		return false;
839 
840 	/*
841 	 * If SELECT uses temporary tables it's not allowed to cache.
842 	 */
843 	if (pool_config->check_temp_table && pool_has_temp_table(node))
844 		return false;
845 
846 	/*
847 	 * If SELECT uses system catalogs, it's not allowed to cache.
848 	 */
849 	if (pool_has_system_catalog(node))
850 		return false;
851 
852 	/*
853 	 * TABLESAMPLE is not allowed to cache.
854 	 */
855 	if (IsA(node, SelectStmt) && ((SelectStmt *)node)->fromClause)
856 	{
857 		List *tbl_list = ((SelectStmt *)node)->fromClause;
858 		ListCell   *tbl;
859 		foreach(tbl, tbl_list)
860 		{
861 			if (IsA(lfirst(tbl), RangeTableSample))
862 				return false;
863 		}
864 	}
865 
866 
867 	/*
868 	 * If the table is in the while list, allow to cache even if it is
869 	 * VIEW or unlogged table.
870 	 */
871 	if (pool_config->num_white_memqcache_table_list > 0)
872 	{
873 		if (num_oids < 0)
874 			num_oids = pool_extract_table_oids_from_select_stmt(node, &ctx);
875 
876 		if (num_oids > 0)
877 		{
878 			for (i = 0; i < num_oids; i++)
879 			{
880 				char *table = ctx.table_names[i];
881 				ereport(DEBUG1,
882 						(errmsg("memcache: checking if node is allowed to cache: check table_names[%d] = \"%s\"", i, table)));
883 				if (is_view(table) || is_unlogged_table(table))
884 				{
885 					if (pool_is_table_in_white_list(table) == false)
886 					{
887 						ereport(DEBUG1,
888 								(errmsg("memcache: node is not allowed to cache")));
889 						return false;
890 					}
891 				}
892 			}
893 		}
894 	}
895 	else
896 	{
897 		/*
898 		 * If SELECT uses views, it's not allowed to cache.
899 		 */
900 		if (pool_has_view(node))
901 			return false;
902 
903 		/*
904 		 * If SELECT uses unlogged tables, it's not allowed to cache.
905 		 */
906 		if (pool_has_unlogged_table(node))
907 			return false;
908 	}
909 
910 	/*
911 	 * If Data-modifying statements in WITH clause, it's not allowed to cache.
912 	 */
913 	if(IsA(node, SelectStmt) && ((SelectStmt *) node)->withClause)
914 	{
915 		ListCell	*lc;
916 		WithClause	*withClause = ((SelectStmt *) node)->withClause;
917 
918 		foreach(lc, withClause->ctes)
919 		{
920 			CommonTableExpr *cte = (CommonTableExpr *)lfirst(lc);
921 			if(IsA(cte->ctequery, InsertStmt) ||
922 			   IsA(cte->ctequery, DeleteStmt) ||
923 			   IsA(cte->ctequery, UpdateStmt))
924 			{
925 				return false;
926 			}
927 		}
928 	}
929 
930 	return true;
931 }
932 
933 
934 /*
935  * Return true If the SELECTed table is in back list.
936  */
pool_is_table_in_black_list(const char * table_name)937 bool pool_is_table_in_black_list(const char *table_name)
938 {
939 
940 	if (pool_config->num_black_memqcache_table_list > 0 &&
941 	    pattern_compare((char *)table_name, BLACKLIST, "black_memqcache_table_list") == 1)
942 	{
943 		return true;
944 	}
945 
946 	return false;
947 }
948 
949 /*
950  * Return true If the SELECTed table is in white list.
951  */
pool_is_table_in_white_list(const char * table_name)952 bool pool_is_table_in_white_list(const char *table_name)
953 {
954 	if (pool_config->num_white_memqcache_table_list > 0 &&
955 		pattern_compare((char *)table_name, WHITELIST, "white_memqcache_table_list") == 1)
956 	{
957 		return true;
958 	}
959 
960 	return false;
961 }
962 
963 /*
964  * Extract table oid from INSERT/UPDATE/DELETE/TRUNCATE/
965  * DROP TABLE/ALTER TABLE/COPY FROM statement.
966  * For SELECT, if Data-modifying statements in its WITH clause,
967  * extract table oid from Data-modifying statements too.
968  * Returns number of oids.
969  * In case of error, returns 0 (InvalidOid).
970  * oids buffer (oidsp) will be discarded by subsequent call.
971  */
pool_extract_table_oids(Node * node,int ** oidsp)972 int pool_extract_table_oids(Node *node, int **oidsp)
973 {
974 #define POOL_MAX_DML_OIDS 128
975 	char *table;
976 	static int oids[POOL_MAX_DML_OIDS];
977 	int num_oids;
978 	int oid;
979 
980 	if (node == NULL)
981 	{
982 		ereport(LOG,
983 				(errmsg("memcache: error while extracting table oids. statement is NULL")));
984 		return 0;
985 	}
986 
987 	num_oids = 0;
988 	*oidsp = oids;
989 
990 	if (IsA(node, InsertStmt))
991 	{
992 		InsertStmt *stmt = (InsertStmt *) node;
993 
994 		num_oids = pool_extract_withclause_oids((Node *) stmt->withClause, *oidsp);
995 		table = make_table_name_from_rangevar(stmt->relation);
996 	}
997 	else if (IsA(node, UpdateStmt))
998 	{
999 		UpdateStmt *stmt = (UpdateStmt *) node;
1000 
1001 		num_oids = pool_extract_withclause_oids((Node *) stmt->withClause, *oidsp);
1002 		table = make_table_name_from_rangevar(stmt->relation);
1003 	}
1004 	else if (IsA(node, DeleteStmt))
1005 	{
1006 		DeleteStmt *stmt = (DeleteStmt *) node;
1007 
1008 		num_oids = pool_extract_withclause_oids((Node *) stmt->withClause, *oidsp);
1009 		table = make_table_name_from_rangevar(stmt->relation);
1010 	}
1011 	else if(IsA(node, SelectStmt))
1012 	{
1013 		SelectStmt *stmt = (SelectStmt *) node;
1014 		num_oids = pool_extract_withclause_oids((Node *) stmt->withClause, *oidsp);
1015 		table = NULL;
1016 	}
1017 #ifdef NOT_USED
1018 	/*
1019 	 * We do not handle CREATE TABLE here.  It is possible that
1020 	 * pool_extract_table_oids() is called before CREATE TABLE gets
1021 	 * executed.
1022 	 */
1023 	else if (IsA(node, CreateStmt))
1024 	{
1025 		CreateStmt *stmt = (CreateStmt *)node;
1026 		table = make_table_name_from_rangevar(stmt->relation);
1027 	}
1028 #endif
1029 
1030 	else if (IsA(node, AlterTableStmt))
1031 	{
1032 		AlterTableStmt *stmt = (AlterTableStmt *)node;
1033 		table = make_table_name_from_rangevar(stmt->relation);
1034 	}
1035 
1036 	else if (IsA(node, CopyStmt))
1037 	{
1038 		CopyStmt *stmt = (CopyStmt *)node;
1039 		if (stmt->is_from)		/* COPY FROM? */
1040 		{
1041 			table = make_table_name_from_rangevar(stmt->relation);
1042 		}
1043 		else
1044 		{
1045 			return 0;
1046 		}
1047 	}
1048 
1049 	else if (IsA(node, DropStmt))
1050 	{
1051 		ListCell *cell;
1052 
1053 		DropStmt *stmt = (DropStmt *)node;
1054 
1055 		if (stmt->removeType != OBJECT_TABLE)
1056 		{
1057 			return 0;
1058 		}
1059 
1060 		/* Here, stmt->objects is list of target relation info.  The
1061 		 * first cell of target relation info is a list (possibly)
1062 		 * consists of database, schema and relation.  We need to call
1063 		 * makeRangeVarFromNameList() before passing to
1064 		 * make_table_name_from_rangevar. Otherwise we get weird excessively
1065 		 * decorated relation name (''table_name'').
1066 		 */
1067 		foreach(cell, stmt->objects)
1068 		{
1069 			if (num_oids >= POOL_MAX_DML_OIDS)
1070 			{
1071 				ereport(LOG,
1072 						(errmsg("memcache: error while extracting table oids. too many oids:%d", num_oids)));
1073 				return 0;
1074 			}
1075 
1076 			table = make_table_name_from_rangevar(makeRangeVarFromNameList(lfirst(cell)));
1077 			oid = pool_table_name_to_oid(table);
1078 			if (oid > 0)
1079 			{
1080 				oids[num_oids++] = pool_table_name_to_oid(table);
1081 				ereport(DEBUG1,
1082 						(errmsg("memcache: extracting table oids: table: \"%s\" oid:%d", table, oids[num_oids-1])));
1083 			}
1084 		}
1085 		return num_oids;
1086 	}
1087 	else if (IsA(node, TruncateStmt))
1088 	{
1089 		ListCell *cell;
1090 
1091 		TruncateStmt *stmt = (TruncateStmt *)node;
1092 
1093 		foreach(cell, stmt->relations)
1094 		{
1095 			if (num_oids >= POOL_MAX_DML_OIDS)
1096 			{
1097 				ereport(LOG,
1098 						(errmsg("memcache: error while extracting table oids. too many oids:%d", num_oids)));
1099 				return 0;
1100 			}
1101 
1102 			table = make_table_name_from_rangevar(lfirst(cell));
1103 			oid = pool_table_name_to_oid(table);
1104 			if (oid > 0)
1105 			{
1106 				oids[num_oids++] = pool_table_name_to_oid(table);
1107 				ereport(DEBUG1,
1108 						(errmsg("memcache: extracting table oids: table: \"%s\" oid:%d", table, oids[num_oids-1])));
1109 			}
1110 		}
1111 		return num_oids;
1112 	}
1113 	else if (IsA(node, ExplainStmt))
1114 	{
1115 		ListCell	*cell;
1116 		DefElem		*def;
1117 		ExplainStmt *stmt = (ExplainStmt *) node;
1118 
1119 		foreach(cell, stmt->options)
1120 		{
1121 			def = lfirst(cell);
1122 			if (strncmp("analyze", def->defname, 7) == 0)
1123 			{
1124 				return pool_extract_table_oids(stmt->query, oidsp);
1125 			}
1126 		}
1127 
1128 		table = NULL;
1129 	}
1130 	else
1131 	{
1132 		ereport(DEBUG1,
1133 				(errmsg("memcache: extracting table oids: statment is different from INSERT/UPDATE/DELETE/TRUNCATE/DROP TABLE/ALTER TABLE")));
1134 		return 0;
1135 	}
1136 
1137 	oid = pool_table_name_to_oid(table);
1138 	if (oid > 0)
1139 	{
1140 		if (num_oids >= POOL_MAX_DML_OIDS)
1141 		{
1142 			ereport(LOG,
1143 					(errmsg("memcache: error while extracting table oids. too many oids:%d", num_oids)));
1144 			return 0;
1145 		}
1146 
1147 		oids[num_oids++] = pool_table_name_to_oid(table);
1148 		ereport(DEBUG1,
1149 				(errmsg("memcache: extracting table oids: table: \"%s\" oid:%d", table, oid)));
1150 	}
1151 	return num_oids;
1152 }
1153 
1154 /*
1155  * Extract table oid from INSERT/UPDATE/DELETE
1156  * FROM statement in WITH clause.
1157  * Returns number of oids.
1158  * oids buffer (oidsp) will be discarded by subsequent call.
1159  */
1160 int
pool_extract_withclause_oids(Node * node,int * oidsp)1161 pool_extract_withclause_oids(Node *node, int *oidsp)
1162 {
1163 	int			num_oids = 0;
1164 	int			oid;
1165 	char	   *table;
1166 	ListCell   *lc;
1167 	WithClause *with;
1168 
1169 	if(oidsp == NULL)
1170 	{
1171 		return 0;
1172 	}
1173 
1174 	if(!node || !IsA(node, WithClause))
1175 	{
1176 		return 0;
1177 	}
1178 
1179 	with = (WithClause *) node;
1180 	foreach(lc, with->ctes)
1181 	{
1182 		CommonTableExpr *cte = (CommonTableExpr *)lfirst(lc);
1183 		if(IsA(cte->ctequery, InsertStmt))
1184 		{
1185 			InsertStmt *stmt = (InsertStmt *) cte->ctequery;
1186 			table = make_table_name_from_rangevar(stmt->relation);
1187 		}
1188 		else if(IsA(cte->ctequery, DeleteStmt))
1189 		{
1190 			DeleteStmt *stmt = (DeleteStmt *) cte->ctequery;
1191 			table = make_table_name_from_rangevar(stmt->relation);
1192 		}
1193 		else if(IsA(cte->ctequery, UpdateStmt))
1194 		{
1195 			UpdateStmt *stmt = (UpdateStmt *) cte->ctequery;
1196 			table = make_table_name_from_rangevar(stmt->relation);
1197 		}
1198 		else
1199 		{
1200 			/* only check INSERT/DELETE/UPDATE in WITH clause */
1201 			table = NULL;
1202 		}
1203 
1204 		oid = pool_table_name_to_oid(table);
1205 		if (oid > 0)
1206 		{
1207 			if (num_oids >= POOL_MAX_DML_OIDS)
1208 			{
1209 				break;
1210 			}
1211 
1212 			oidsp[num_oids++] = pool_table_name_to_oid(table);
1213 			ereport(DEBUG1,
1214 					(errmsg("memcache: extracting table oids: table: \"%s\" oid:%d", table, oidsp[num_oids - 1])));
1215 		}
1216 	}
1217 
1218 	return num_oids;
1219 }
1220 
1221 #define POOL_OIDBUF_SIZE 1024
1222 static int* oidbuf;
1223 static int oidbufp;
1224 static int oidbuf_size;
1225 
1226 /*
1227  * Add table oid to internal buffer
1228  */
pool_add_dml_table_oid(int oid)1229 void pool_add_dml_table_oid(int oid)
1230 {
1231 	int i;
1232 	int* tmp;
1233 
1234 	if (oid == 0)
1235 		return;
1236 
1237 	if (oidbufp >= oidbuf_size)
1238 	{
1239 		MemoryContext oldcxt;
1240 		oidbuf_size += POOL_OIDBUF_SIZE;
1241 		/*
1242 		 * This need to live throughout the life of child so home it in
1243 		 * TopMemoryContext
1244 		 */
1245 		oldcxt = MemoryContextSwitchTo(TopMemoryContext);
1246 		tmp = repalloc(oidbuf, sizeof(int) * oidbuf_size);
1247 		MemoryContextSwitchTo(oldcxt);
1248 		if (tmp == NULL)
1249 			return;
1250 
1251 		oidbuf = tmp;
1252 	}
1253 
1254 	for (i=0;i<oidbufp;i++)
1255 	{
1256 		if (oidbuf[i] == oid)
1257 		/* Already same oid exists */
1258 			return;
1259 	}
1260 	oidbuf[oidbufp++] = oid;
1261 }
1262 
1263 
1264 /*
1265  * Get table oid buffer
1266  */
pool_get_dml_table_oid(int ** oid)1267 static int pool_get_dml_table_oid(int **oid)
1268 {
1269 	*oid = oidbuf;
1270 	return oidbufp;
1271 }
1272 
pool_get_dropdb_table_oids(int ** oids,int dboid)1273 static int pool_get_dropdb_table_oids(int **oids, int dboid)
1274 {
1275 	int *rtn = 0;
1276 	int oids_size = 0;
1277 	int *tmp;
1278 
1279 	int num_oids = 0;
1280 	DIR *dir;
1281 	struct dirent *dp;
1282 	char path[1024];
1283 
1284 	snprintf(path, sizeof(path), "%s/%d", pool_config->memqcache_oiddir, dboid);
1285 	if ((dir = opendir(path)) == NULL)
1286 	{
1287 		ereport(DEBUG1,
1288 			(errmsg("memcache: getting drop table oids"),
1289 				 errdetail("Failed to open dir: %s", path)));
1290 		return 0;
1291 	}
1292 
1293 	while ((dp = readdir(dir)) != NULL)
1294 	{
1295 		if (strcmp(dp->d_name, ".") == 0 || strcmp(dp->d_name, "..") == 0)
1296 			continue;
1297 
1298 		if (num_oids >= oids_size)
1299 		{
1300 			oids_size += POOL_OIDBUF_SIZE;
1301 			tmp = repalloc(rtn, sizeof(int) * oids_size);
1302 			if (tmp == NULL)
1303 			{
1304 				closedir(dir);
1305 				return 0;
1306 			}
1307 			rtn = tmp;
1308 		}
1309 
1310 		rtn[num_oids] = atol(dp->d_name);
1311 		num_oids++;
1312 	}
1313 
1314 	closedir(dir);
1315 	*oids = rtn;
1316 
1317 	return num_oids;
1318 }
1319 
1320 /* Discard oid internal buffer */
pool_discard_dml_table_oid(void)1321 static void pool_discard_dml_table_oid(void)
1322 {
1323 	oidbufp = 0;
1324 }
1325 
1326 /*
1327  * Management modules for oid map.  When caching SELECT results, we
1328  * record table oids to file, which has following structure.
1329  *
1330  * memqcache_oiddir -+- database_oid -+-table_oid_file1
1331  *                                    |
1332  *                                    +-table_oid_file2
1333  *                                    |
1334  *                                    +-table_oid_file3...
1335  *
1336  * table_oid_file's name is table oid, which was used by the SELECT
1337  * statement. The file has 1 or more cacheid(s). When SELECT result is
1338  * cached, the file is created and cache id is appended. Later SELECT
1339  * using same table oid will add to the same file. If the SELECT uses
1340  * multiple tables, multiple table_oid_file will be created. When
1341  * INSERT/UPDATE/DELETE is executed, corresponding caches must be
1342  * deleted (cache invalidation) (when DROP TABLE, ALTER TABLE is
1343  * executed, the caches must be deleted as well). When database is
1344  * dropped, all caches belonging to the database must be deleted.
1345  */
1346 
1347 /*
1348  * Get oid of current database
1349  */
pool_get_database_oid(void)1350 static int pool_get_database_oid(void)
1351 {
1352 /*
1353  * Query to convert table name to oid
1354  */
1355 	int oid = 0;
1356 	static POOL_RELCACHE *relcache;
1357 	POOL_CONNECTION_POOL *backend;
1358 
1359 	backend = pool_get_session_context(false)->backend;
1360 
1361 	/*
1362 	 * If relcache does not exist, create it.
1363 	 */
1364 	if (!relcache)
1365 	{
1366 		relcache = pool_create_relcache(pool_config->relcache_size, DATABASE_TO_OID_QUERY,
1367 										int_register_func, int_unregister_func,
1368 										false);
1369 		if (relcache == NULL)
1370 		{
1371 			ereport(LOG,
1372 				(errmsg("memcache: error creating relcache while getting database OID")));
1373 			return oid;
1374 		}
1375 	}
1376 
1377 	/*
1378 	 * Search relcache.
1379 	 */
1380 	oid = (int)(intptr_t)pool_search_relcache(relcache, backend,
1381 											  MASTER_CONNECTION(backend)->sp->database);
1382 	return oid;
1383 }
1384 
1385 /*
1386  * Get oid of current database for discarding cache files
1387  * after executing DROP DATABASE
1388  */
pool_get_database_oid_from_dbname(char * dbname)1389 int pool_get_database_oid_from_dbname(char *dbname)
1390 {
1391 	int dboid = 0;
1392 	POOL_SELECT_RESULT *res;
1393 	char query[1024];
1394 
1395 	POOL_CONNECTION_POOL *backend;
1396 	backend = pool_get_session_context(false)->backend;
1397 
1398 	snprintf(query, sizeof(query), DATABASE_TO_OID_QUERY, dbname);
1399 	do_query(MASTER(backend), query, &res, MAJOR(backend));
1400 
1401 	if (res->numrows != 1)
1402 	{
1403 		ereport(DEBUG1,
1404 			(errmsg("memcache: getting oid of current database"),
1405 				 errdetail("received %d rows", res->numrows)));
1406 		free_select_result(res);
1407 		return 0;
1408 	}
1409 
1410 	dboid = atol(res->data[0]);
1411 	free_select_result(res);
1412 
1413 	return dboid;
1414 }
1415 
1416 /*
1417  * Add cache id (shmem case) or hash key (memcached case) to table oid
1418  * map file.  Caller must hold shmem lock before calling this function
1419  * to avoid file extension conflict among different pgpool child
1420  * process.
1421  * As of pgpool-II 3.2, pool_handle_query_cache is responsible for that.
1422  * (pool_handle_query_cache -> pool_commit_cache -> pool_add_table_oid_map)
1423  */
pool_add_table_oid_map(POOL_CACHEKEY * cachekey,int num_table_oids,int * table_oids)1424 static void pool_add_table_oid_map(POOL_CACHEKEY *cachekey, int num_table_oids, int *table_oids)
1425 {
1426 	char *dir;
1427 	int dboid;
1428 	char path[1024];
1429 	int i;
1430 	int len;
1431 
1432 	/*
1433 	 * Create memqcache_oiddir
1434 	 */
1435 	dir = pool_config->memqcache_oiddir;
1436 
1437 	if (mkdir(dir, S_IREAD|S_IWRITE|S_IEXEC) == -1)
1438 	{
1439 		if (errno != EEXIST)
1440 		{
1441 			ereport(WARNING,
1442 				(errmsg("memcache: adding table oid maps, failed to create directory:\"%s\". error:\"%s\"", dir, strerror(errno))));
1443 			return;
1444 		}
1445 	}
1446 
1447 	/*
1448 	 * Create memqcache_oiddir/database_oid
1449 	 */
1450 	dboid = pool_get_database_oid();
1451 	ereport(DEBUG1,
1452 		(errmsg("memcache: adding table oid maps"),
1453 			 errdetail("dboid %d", dboid)));
1454 
1455 	if (dboid <= 0)
1456 	{
1457 		ereport(WARNING,
1458 				(errmsg("memcache: adding table oid maps, failed to get database OID")));
1459 		return;
1460 	}
1461 
1462 	snprintf(path, sizeof(path), "%s/%d", dir, dboid);
1463 	if (mkdir(path, S_IREAD|S_IWRITE|S_IEXEC) == -1)
1464 	{
1465 		if (errno != EEXIST)
1466 		{
1467 			ereport(WARNING,
1468 					(errmsg("memcache: adding table oid maps, failed to create directory:\"%s\". error:\"%s\"", path, strerror(errno))));
1469 			return;
1470 		}
1471 	}
1472 
1473 	if (pool_is_shmem_cache())
1474 	{
1475 		len = sizeof(cachekey->cacheid);
1476 	}
1477 	else
1478 	{
1479 		len = sizeof(cachekey->hashkey);
1480 	}
1481 
1482 	for (i=0;i<num_table_oids;i++)
1483 	{
1484 		int fd;
1485 		int oid = table_oids[i];
1486 		int sts;
1487 		struct flock fl;
1488 
1489 		/*
1490 		 * Create or open each memqcache_oiddir/database_oid/table_oid
1491 		 */
1492 		snprintf(path, sizeof(path), "%s/%d/%d", dir, dboid, oid);
1493 		if ((fd = open(path, O_CREAT|O_RDWR, S_IRUSR|S_IWUSR)) == -1)
1494 		{
1495 			ereport(WARNING,
1496 					(errmsg("memcache: adding table oid maps, failed to open file:\"%s\". error:\"%s\"", path, strerror(errno))));
1497 			return;
1498 		}
1499 
1500 		fl.l_type   = F_WRLCK;
1501 		fl.l_whence = SEEK_SET;
1502 		fl.l_start  = 0;        	/* Offset from l_whence         */
1503 		fl.l_len    = 0;        	/* length, 0 = to EOF           */
1504 
1505 		sts = fcntl(fd, F_SETLKW, &fl);
1506 		if (sts == -1)
1507 		{
1508 			ereport(WARNING,
1509 					(errmsg("memcache: adding table oid maps, failed to lock file:\"%s\". error:\"%s\"", path, strerror(errno))));
1510 
1511 			close(fd);
1512 			return;
1513 		}
1514 
1515 		/*
1516 		 * Below was ifdef-out because of a performance reason.
1517 		 * Looking for duplicate cache entries in a file needed
1518 		 * unacceptably high cost. So we gave up this and decided not
1519 		 * to care about duplicate entries in the file.
1520 		 */
1521 #ifdef NOT_USED
1522 		for (;;)
1523 		{
1524 			sts = read(fd, (char *)&buf, len);
1525 			if (sts == -1)
1526 			{
1527 				ereport(WARNING,
1528 						(errmsg("memcache: adding table oid maps, failed to read file:\"%s\". error:\"%s\"", path, strerror(errno))));
1529 				close(fd);
1530 				return;
1531 			}
1532 			else if (sts == len)
1533 			{
1534 				if (memcmp(cachekey, &buf, len) == 0)
1535 				{
1536 					/* Same key found. Skip this */
1537 					close(fd);
1538 					return;
1539 				}
1540 				continue;
1541 			}
1542 			/*
1543 			 * Must be EOF
1544 			 */
1545 			if (sts != 0)
1546 			{
1547 				ereport(WARNING,
1548 						(errmsg("memcache: adding table oid maps, invalid data length:%d in file:\"%s\". error:\"%s\"",sts, path)));
1549 				close(fd);
1550 				return;
1551 			}
1552 			break;
1553 		}
1554 #endif
1555 
1556 		if (lseek(fd, 0, SEEK_END) == -1)
1557 		{
1558 			ereport(WARNING,
1559 					(errmsg("memcache: adding table oid maps, failed seek on file:\"%s\". error:\"%s\"", path, strerror(errno))));
1560 			close(fd);
1561 			return;
1562 		}
1563 
1564 		/*
1565 		 * Write cache_id or cache key at the end of file
1566 		 */
1567 		sts = write(fd, (char *)cachekey, len);
1568 		if (sts == -1 || sts != len)
1569 		{
1570 			ereport(WARNING,
1571 					(errmsg("memcache: adding table oid maps, failed to write file:\"%s\". error:\"%s\"", path, strerror(errno))));
1572 			close(fd);
1573 			return;
1574 		}
1575 		close(fd);
1576 	}
1577 }
1578 
1579 /*
1580  * Discard all oid maps at pgpool-II startup.
1581  * This is necessary for shmem case.
1582  */
pool_discard_oid_maps(void)1583 void pool_discard_oid_maps(void)
1584 {
1585 	char command[1024];
1586 
1587 	snprintf(command, sizeof(command), "/bin/rm -fr %s/[0-9]*",
1588 			 pool_config->memqcache_oiddir);
1589 	if(system(command) == -1)
1590         ereport(WARNING,
1591             (errmsg("unable to execute command \"%s\"",command),
1592              errdetail("system() command failed with error \"%s\"",strerror(errno))));
1593 
1594 
1595 }
1596 
pool_discard_oid_maps_by_db(int dboid)1597 void pool_discard_oid_maps_by_db(int dboid)
1598 {
1599 	char command[1024];
1600 
1601 	if (pool_is_shmem_cache())
1602 	{
1603 		snprintf(command, sizeof(command), "/bin/rm -fr %s/%d/",
1604 				 pool_config->memqcache_oiddir, dboid);
1605 
1606 		ereport(DEBUG1,
1607 				(errmsg("memcache: discarding oid maps by db"),
1608 				 errdetail("command: '%s\'", command)));
1609 
1610 		if(system(command) == -1)
1611             ereport(WARNING,
1612 				(errmsg("unable to execute command \"%s\"",command),
1613                      errdetail("system() command failed with error \"%s\"",strerror(errno))));
1614 	}
1615 }
1616 
1617 /*
1618  * Read cache id (shmem case) or hash key (memcached case) from table
1619  * oid map file according to table_oids and discard cache entries.  If
1620  * unlink is true, the file will be unlinked after successful cache
1621  * removal.
1622  */
pool_invalidate_query_cache(int num_table_oids,int * table_oid,bool unlinkp,int dboid)1623 static void pool_invalidate_query_cache(int num_table_oids, int *table_oid, bool unlinkp, int dboid)
1624 {
1625 	char *dir;
1626 	char path[1024];
1627 	int i;
1628 	int len;
1629 	POOL_CACHEKEY buf;
1630 
1631 	/*
1632 	 * Create memqcache_oiddir
1633 	 */
1634 	dir = pool_config->memqcache_oiddir;
1635 	if (mkdir(dir, S_IREAD|S_IWRITE|S_IEXEC) == -1)
1636 	{
1637 		if (errno != EEXIST)
1638 		{
1639 			ereport(WARNING,
1640 					(errmsg("memcache: invalidating query cache, failed to create directory:\"%s\". error:\"%s\"", dir, strerror(errno))));
1641 			return;
1642 		}
1643 	}
1644 
1645 	/*
1646 	 * Create memqcache_oiddir/database_oid
1647 	 */
1648 	if (dboid == 0)
1649 	{
1650 		dboid = pool_get_database_oid();
1651 		ereport(DEBUG1,
1652 			(errmsg("memcache invalidating query cache"),
1653 				 errdetail("dboid %d", dboid)));
1654 
1655 		if (dboid <= 0)
1656 		{
1657 			ereport(WARNING,
1658 					(errmsg("memcache: invalidating query cache, could not get database OID")));
1659 			return;
1660 		}
1661 	}
1662 
1663 	snprintf(path, sizeof(path), "%s/%d", dir, dboid);
1664 	if (mkdir(path, S_IREAD|S_IWRITE|S_IEXEC) == -1)
1665 	{
1666 		if (errno != EEXIST)
1667 		{
1668 			ereport(WARNING,
1669 					(errmsg("memcache: invalidating query cache, failed to create directory:\"%s\". error:\"%s\"", path, strerror(errno))));
1670 			return;
1671 		}
1672 	}
1673 
1674 	if (pool_is_shmem_cache())
1675 	{
1676 		len = sizeof(buf.cacheid);
1677 	}
1678 	else
1679 	{
1680 		len = sizeof(buf.hashkey);
1681 	}
1682 
1683 	for (i=0;i<num_table_oids;i++)
1684 	{
1685 		int fd;
1686 		int oid = table_oid[i];
1687 		int sts;
1688 		struct flock fl;
1689 
1690 		/*
1691 		 * Open each memqcache_oiddir/database_oid/table_oid
1692 		 */
1693 		snprintf(path, sizeof(path), "%s/%d/%d", dir, dboid, oid);
1694 		if ((fd = open(path, O_RDONLY)) == -1)
1695 		{
1696 			/* This may be normal. It is possible that no SELECT has
1697 			 * been issued since the table has been created or since
1698 			 * pgpool-II started up.
1699 			 */
1700 			ereport(DEBUG1,
1701 				(errmsg("memcache invalidating query cache"),
1702 					errdetail("failed to open \"%s\". reason:\"%s\"",path, strerror(errno))));
1703 			continue;
1704 		}
1705 
1706 		fl.l_type   = F_RDLCK;
1707 		fl.l_whence = SEEK_SET;
1708 		fl.l_start  = 0;        	/* Offset from l_whence         */
1709 		fl.l_len    = 0;        	/* length, 0 = to EOF           */
1710 
1711 		sts = fcntl(fd, F_SETLKW, &fl);
1712 		if (sts == -1)
1713 		{
1714 			ereport(WARNING,
1715 					(errmsg("memcache: invalidating query cache, failed to lock file:\"%s\". error:\"%s\"", path, strerror(errno))));
1716 			close(fd);
1717 			return;
1718 		}
1719 		for (;;)
1720 		{
1721 			sts = read(fd, (char *)&buf, len);
1722 			if (sts == -1)
1723 			{
1724 				ereport(WARNING,
1725 						(errmsg("memcache: invalidating query cache, failed to read file:\"%s\". error:\"%s\"", path, strerror(errno))));
1726 
1727 				close(fd);
1728 				return;
1729 			}
1730 			else if (sts == len)
1731 			{
1732 				if (pool_is_shmem_cache())
1733 				{
1734 					ereport(DEBUG1,
1735 						(errmsg("memcache invalidating query cache"),
1736 							errdetail("deleting cacheid:%d itemid:%d",
1737 								   buf.cacheid.blockid, buf.cacheid.itemid)));
1738 					pool_delete_item_shmem_cache(&buf.cacheid);
1739 				}
1740 #ifdef USE_MEMCACHED
1741 				else
1742 				{
1743 					char delbuf[33];
1744 
1745 					memcpy(delbuf, buf.hashkey, 32);
1746 					delbuf[32] = 0;
1747 					ereport(DEBUG1,
1748 						(errmsg("memcache invalidating query cache"),
1749 							 errdetail("deleting %s", delbuf)));
1750 
1751 					delete_cache_on_memcached(delbuf);
1752 				}
1753 #endif
1754 				continue;
1755 			}
1756 
1757 			/*
1758 			 * Must be EOF
1759 			 */
1760 			if (sts != 0)
1761 			{
1762 				ereport(WARNING,
1763 						(errmsg("memcache: invalidating query cache, invalid data length:%d in file:\"%s\"",sts, path)));
1764 				close(fd);
1765 				return;
1766 			}
1767 			break;
1768 		}
1769 
1770 		if (unlinkp)
1771 		{
1772 			unlink(path);
1773 		}
1774 		close(fd);
1775 	}
1776 #ifdef SHMEMCACHE_DEBUG
1777 	dump_shmem_cache(0);
1778 #endif
1779 }
1780 
1781 /*
1782  * Reset SELECT data buffers.  If reset_dml_oids is true, call
1783  * pool_discard_dml_table_oid() to reset table oids used in DML statements.
1784  */
pool_reset_memqcache_buffer(bool reset_dml_oids)1785 static void pool_reset_memqcache_buffer(bool reset_dml_oids)
1786 {
1787 	POOL_SESSION_CONTEXT * session_context;
1788 
1789 	session_context = pool_get_session_context(true);
1790 
1791 	if (session_context && session_context->query_cache_array)
1792 	{
1793 		POOL_TEMP_QUERY_CACHE *cache;
1794 
1795 		ereport(DEBUG1,
1796 			(errmsg("memcache reset buffer"),
1797 				 errdetail("discard: %p", session_context->query_cache_array)));
1798 
1799 		pool_discard_query_cache_array(session_context->query_cache_array);
1800 		session_context->query_cache_array = pool_create_query_cache_array();
1801 
1802 		ereport(DEBUG1,
1803 			(errmsg("memcache reset buffer"),
1804 				 errdetail("create: %p", session_context->query_cache_array)));
1805 		/*
1806 		 * if the query context is still under use, we cannot discard
1807 		 * temporary cache.
1808 		 */
1809 		if ((SL_MODE && pool_is_doing_extended_query_message()) ||
1810 			can_query_context_destroy(session_context->query_context))
1811 		{
1812 			ereport(DEBUG1,
1813 				(errmsg("memcache reset buffer"),
1814 					errdetail("discard temp buffer of %p (%s)",
1815 						   session_context->query_context, session_context->query_context->original_query)));
1816 
1817 			cache = pool_get_current_cache();
1818 			pool_discard_temp_query_cache(cache);
1819 			/*
1820 			 * Reset temp_cache pointer in the current query context
1821 			 * so that we don't double free memory.
1822 			 */
1823 			session_context->query_context->temp_cache = NULL;
1824 		}
1825 	}
1826 
1827 	if (reset_dml_oids)
1828 		pool_discard_dml_table_oid();
1829 
1830 	pool_tmp_stats_reset_num_selects();
1831 }
1832 
1833 /*
1834  * Return true if memory cache method is "shmem".  The purpose of this
1835  * function is to cache the result of stcmp and to save a few cycle.
1836  */
pool_is_shmem_cache(void)1837 bool pool_is_shmem_cache(void)
1838 {
1839 	return pool_config->memqcache_method == SHMEM_CACHE;
1840 }
1841 
1842 /*
1843  * Remember memory cache number of blocks.
1844  */
1845 static int memqcache_num_blocks;
pool_set_memqcache_blocks(int num_blocks)1846 static void pool_set_memqcache_blocks(int num_blocks)
1847 {
1848 	memqcache_num_blocks = num_blocks;
1849 }
1850 
1851 /*
1852  * Return memory cache number of blocks.
1853  */
pool_get_memqcache_blocks(void)1854 static int pool_get_memqcache_blocks(void)
1855 {
1856 	return memqcache_num_blocks;
1857 }
1858 
1859 /*
1860  * Query cache on shared memory management modules.
1861  */
1862 
1863 /*
1864  * Calculate necessary shared memory size.
1865  */
pool_shared_memory_cache_size(void)1866 size_t pool_shared_memory_cache_size(void)
1867 {
1868 	int64 num_blocks;
1869 	size_t size;
1870 
1871 	if (pool_config->memqcache_maxcache > pool_config->memqcache_cache_block_size)
1872 		ereport(FATAL,
1873 			(errmsg("invalid memory cache configuration"),
1874 					errdetail("memqcache_cache_block_size %d should be greater or equal to memqcache_maxcache %d",
1875 								pool_config->memqcache_cache_block_size,
1876 								pool_config->memqcache_maxcache)));
1877 
1878 
1879 	num_blocks = pool_config->memqcache_total_size/
1880 		pool_config->memqcache_cache_block_size;
1881 	if (num_blocks == 0)
1882 		ereport(FATAL,
1883 			(errmsg("invalid memory cache configuration"),
1884 					errdetail("memqcache_total_size %ld should be greater or equal to memqcache_cache_block_size %d",
1885 								pool_config->memqcache_total_size,
1886 								pool_config->memqcache_cache_block_size)));
1887 
1888 		ereport(LOG,
1889 			(errmsg("memory cache initialized"),
1890 					errdetail("memcache blocks :%ld",num_blocks)));
1891 	/* Remember # of blocks */
1892 	pool_set_memqcache_blocks(num_blocks);
1893 	size = pool_config->memqcache_cache_block_size * num_blocks;
1894 	return size;
1895 }
1896 
1897 /*
1898  * Acquire and initialize shared memory cache. This should be called
1899  * only once from pgpool main process at the process staring up time.
1900  */
1901 static void *shmem;
pool_init_memory_cache(size_t size)1902 int pool_init_memory_cache(size_t size)
1903 {
1904 	ereport(DEBUG1,
1905 		(errmsg("memory cache request size : %zd",size)));
1906 
1907 	shmem = pool_shared_memory_create(size);
1908 	return 0;
1909 }
1910 
1911 /*
1912  * Clear all the shared memory cache and reset FSMM and hash table.
1913  */
1914 void
pool_clear_memory_cache(void)1915 pool_clear_memory_cache(void)
1916 {
1917 	size_t size;
1918 	pool_sigset_t oldmask;
1919 
1920 	POOL_SETMASK2(&BlockSig, &oldmask);
1921 	pool_shmem_lock();
1922 
1923     PG_TRY();
1924     {
1925         size = pool_shared_memory_cache_size();
1926         memset(shmem, 0, size);
1927 
1928         size = pool_shared_memory_fsmm_size();
1929         pool_reset_fsmm(size);
1930 
1931         pool_discard_oid_maps();
1932 
1933         pool_hash_reset(pool_config->memqcache_max_num_cache);
1934     }
1935     PG_CATCH();
1936     {
1937         pool_shmem_unlock();
1938         POOL_SETMASK(&oldmask);
1939         PG_RE_THROW();
1940     }
1941     PG_END_TRY();
1942 
1943 	pool_shmem_unlock();
1944 	POOL_SETMASK(&oldmask);
1945 }
1946 
1947 /*
1948  * Return shared memory cache address
1949  */
pool_memory_cache_address(void)1950 static void *pool_memory_cache_address(void)
1951 {
1952 	return shmem;
1953 }
1954 
1955 /*
1956  * Initialize new block
1957  */
1958 
1959 /*
1960  * Free space management map
1961  *
1962  * Free space management map (FSMM) consists of bytes. Each byte
1963  * corresponds to block id. For example, if you have 1GB cache and
1964  * block size is 8Kb, number of blocks = 131,072, thus total size of
1965  * FSMM is 128Kb.  Each FSMM entry has value from 0 to 255. Those
1966  * values describes total free space in each block.
1967  * For example, if the value is 2, the free space can be between 64
1968  * bytes and 95 bytes.
1969  *
1970  * value free space(in bytes)
1971  * 0     0-31
1972  * 1     32-63
1973  * 2     64-95
1974  * 3     96-127
1975  * :
1976  * 255   8160-8192
1977  */
1978 
1979 /*
1980  * Calculate necessary shared memory size for FSMM. Should be called after
1981  * pool_shared_memory_cache_size.
1982  */
pool_shared_memory_fsmm_size(void)1983 size_t pool_shared_memory_fsmm_size(void)
1984 {
1985 	size_t size;
1986 
1987 	size = pool_get_memqcache_blocks() * sizeof(char);
1988 	return size;
1989 }
1990 
1991 /*
1992  * Acquire and initialize shared memory cache for FSMM. This should be
1993  * called after pool_shared_memory_cache_size only once from pgpool
1994  * main process at the process staring up time.
1995  */
1996 static void *fsmm;
pool_init_fsmm(size_t size)1997 int pool_init_fsmm(size_t size)
1998 {
1999 	int maxblock = 	pool_get_memqcache_blocks();
2000 	int encode_value;
2001 
2002 	fsmm = pool_shared_memory_create(size);
2003 	encode_value = POOL_MAX_FREE_SPACE/POOL_FSMM_RATIO;
2004 	memset(fsmm, encode_value, maxblock);
2005 	return 0;
2006 }
2007 
2008 /*
2009  * Return shared memory fsmm address
2010  */
pool_fsmm_address(void)2011 static void *pool_fsmm_address(void)
2012 {
2013 	return fsmm;
2014 }
2015 
2016 /*
2017  * Clock algorithm shared query cache management modules.
2018  */
2019 
2020 /*
2021  * Clock hand pointing to next victim block
2022  */
2023 static int *pool_fsmm_clock_hand;
2024 
2025 /*
2026  * Allocate and initialize clock hand on shmem
2027  */
pool_allocate_fsmm_clock_hand(void)2028 void pool_allocate_fsmm_clock_hand(void)
2029 {
2030 	pool_fsmm_clock_hand = pool_shared_memory_create(sizeof(*pool_fsmm_clock_hand));
2031 	*pool_fsmm_clock_hand = 0;
2032 }
2033 
2034 /*
2035  * Reset FSMM.
2036  */
2037 static void
pool_reset_fsmm(size_t size)2038 pool_reset_fsmm(size_t size)
2039 {
2040 	int encode_value;
2041 
2042 	encode_value = POOL_MAX_FREE_SPACE/POOL_FSMM_RATIO;
2043 	memset(fsmm, encode_value, size);
2044 
2045 	*pool_fsmm_clock_hand = 0;
2046 }
2047 
2048 /*
2049  * Find victim block using clock algorithm and make it free.
2050  * Returns new free block id.
2051  */
pool_reuse_block(void)2052 static POOL_CACHE_BLOCKID pool_reuse_block(void)
2053 {
2054 	int maxblock = pool_get_memqcache_blocks();
2055 	char *block = block_address(*pool_fsmm_clock_hand);
2056 	POOL_CACHE_BLOCK_HEADER *bh = (POOL_CACHE_BLOCK_HEADER *)block;
2057 	POOL_CACHE_BLOCKID reused_block;
2058 	POOL_CACHE_ITEM_POINTER *cip;
2059 	char *p;
2060 	int i;
2061 
2062 	bh->flags = 0;
2063 	reused_block = *pool_fsmm_clock_hand;
2064 	p = block_address(reused_block);
2065 
2066 	for (i=0;i<bh->num_items;i++)
2067 	{
2068 		cip = item_pointer(p, i);
2069 
2070 		if (!(POOL_ITEM_DELETED & cip->flags))
2071 		{
2072 			pool_hash_delete(&cip->query_hash);
2073 			ereport(DEBUG1,
2074 					(errmsg("pool_reuse_block: blockid: %d item: %d", reused_block, i)));
2075 		}
2076 	}
2077 
2078 	pool_init_cache_block(reused_block);
2079 	pool_update_fsmm(reused_block, POOL_MAX_FREE_SPACE);
2080 
2081 	(*pool_fsmm_clock_hand)++;
2082 	if (*pool_fsmm_clock_hand >= maxblock)
2083 		*pool_fsmm_clock_hand = 0;
2084 
2085 	ereport(LOG,
2086 			(errmsg("pool_reuse_block: blockid: %d", reused_block)));
2087 
2088 	return reused_block;
2089 }
2090 
2091 /*
2092  * Get block id which has enough space
2093  */
pool_get_block(size_t free_space)2094 static POOL_CACHE_BLOCKID pool_get_block(size_t free_space)
2095 {
2096 	int encode_value;
2097 	unsigned char *p = pool_fsmm_address();
2098 	int i;
2099 	int maxblock = pool_get_memqcache_blocks();
2100 	POOL_CACHE_BLOCK_HEADER *bh;
2101 
2102 	if (p == NULL)
2103 	{
2104 		ereport(WARNING,
2105 				(errmsg("memcache: getting block: FSMM is not initialized")));
2106 		return -1;
2107 	}
2108 
2109 	if (free_space > POOL_MAX_FREE_SPACE)
2110 	{
2111 		ereport(WARNING,
2112 			(errmsg("memcache: getting block: invalid free space:%zd", free_space),
2113 				 errdetail("requested free space: %zd is more than maximum allowed space:%lu",free_space,POOL_MAX_FREE_SPACE)));
2114 		return -1;
2115 	}
2116 
2117 	encode_value = free_space/POOL_FSMM_RATIO;
2118 
2119 	for (i=0;i<maxblock;i++)
2120 	{
2121 		if (p[i] >= encode_value)
2122 		{
2123 			/*
2124 			 * This block *may" have enough space.
2125 			 * We need to make sure it actually has enough space.
2126 			 */
2127 			bh = (POOL_CACHE_BLOCK_HEADER *)block_address(i);
2128 			if (bh->free_bytes >= free_space)
2129 			{
2130 				return (POOL_CACHE_BLOCKID)i;
2131 			}
2132 		}
2133 	}
2134 
2135 	/*
2136 	 * No enough space found. Reuse victim block
2137 	 */
2138 	return pool_reuse_block();
2139 }
2140 
2141 /*
2142  * Update free space info for specified block
2143  */
pool_update_fsmm(POOL_CACHE_BLOCKID blockid,size_t free_space)2144 static void pool_update_fsmm(POOL_CACHE_BLOCKID blockid, size_t free_space)
2145 {
2146 	int encode_value;
2147 	char *p = pool_fsmm_address();
2148 
2149 	if (p == NULL)
2150 	{
2151 		ereport(WARNING,
2152 				(errmsg("memcache: updating free space in block: FSMM is not initialized")));
2153 		return;
2154 	}
2155 
2156 	if (blockid >= pool_get_memqcache_blocks())
2157 	{
2158 		ereport(WARNING,
2159 				(errmsg("memcache: updating free space in block: block id:%d in invalid",blockid)));
2160 		return;
2161 	}
2162 
2163 	if (free_space > POOL_MAX_FREE_SPACE)
2164 	{
2165 		ereport(WARNING,
2166 			(errmsg("memcache: updating free space in block: invalid free space:%zd", free_space),
2167 				 errdetail("requested free space: %zd is more than maximum allowed space:%lu",free_space,POOL_MAX_FREE_SPACE)));
2168 
2169 		return;
2170 	}
2171 
2172 	encode_value = free_space/POOL_FSMM_RATIO;
2173 
2174 	p[blockid] = encode_value;
2175 
2176 	return;
2177 }
2178 
2179 /*
2180  * Add item data to shared memory cache.
2181  * On successful registration, returns cache id.
2182  * The cache id is overwritten by the subsequent call to this function.
2183  * On error returns NULL.
2184  */
pool_add_item_shmem_cache(POOL_QUERY_HASH * query_hash,char * data,int size)2185 static POOL_CACHEID *pool_add_item_shmem_cache(POOL_QUERY_HASH *query_hash, char *data, int size)
2186 {
2187 	static POOL_CACHEID cacheid;
2188 	POOL_CACHE_BLOCKID blockid;
2189 	POOL_CACHE_BLOCK_HEADER *bh;
2190 	POOL_CACHE_ITEM_POINTER *cip;
2191 
2192 	POOL_CACHE_ITEM ci;
2193 	POOL_CACHE_ITEM_POINTER cip_body;
2194 	char *item;
2195 
2196 	int request_size;
2197 	char *p;
2198 	int i;
2199 	char *src;
2200 	char *dst;
2201 	int num_deleted;
2202 	char *dcip;
2203 	char *dci;
2204 	bool need_pack;
2205 	char *work_buffer;
2206 	int index;
2207 
2208 	if (query_hash == NULL)
2209 	{
2210 		ereport(LOG,
2211 				(errmsg("error while adding item to shmem cache, query hash is NULL")));
2212 		return NULL;
2213 	}
2214 
2215 	if (data == NULL)
2216 	{
2217 		ereport(LOG,
2218 				(errmsg("error while adding item to shmem cache, data is NULL")));
2219 		return NULL;
2220 	}
2221 
2222 	if (size <= 0)
2223 	{
2224 		ereport(LOG,
2225 				(errmsg("error while adding item to shmem cache, invalid request size: %d", size)));
2226 		return NULL;
2227 	}
2228 
2229 	/* Add overhead */
2230 	request_size = size + sizeof(POOL_CACHE_ITEM_POINTER) + sizeof(POOL_CACHE_ITEM_HEADER);
2231 
2232 	/* Get cache block which has enough space */
2233 	blockid = pool_get_block(request_size);
2234 
2235 	if (blockid == -1)
2236 	{
2237 		return NULL;
2238 	}
2239 
2240 	/*
2241 	 * Initialize the block if necessary. If no live items are
2242 	 * remained, we also initialize the block. If there's contiguous
2243 	 * deleted items, we turn them into free space as well.
2244 	 */
2245 	pool_init_cache_block(blockid);
2246 
2247 	/*
2248 	 * Make sure that we have at least one free hash element.
2249 	 */
2250 	while (!is_free_hash_element())
2251 	{
2252 		/* If not, reuse next victim block */
2253 		blockid = pool_reuse_block();
2254 		pool_init_cache_block(blockid);
2255 	}
2256 
2257 	/* Get block address on shmem */
2258 	p = block_address(blockid);
2259 	bh = (POOL_CACHE_BLOCK_HEADER *)p;
2260 
2261 	/*
2262 	 * Create contiguous free space. We assume that item bodies are
2263 	 * ordered from bottom to top of the block, and corresponding item
2264 	 * pointers are ordered from the youngest to the oldest in the
2265 	 * beginning of the block.
2266 	 */
2267 
2268 	/*
2269 	 * Optimization. If there's no deleted items, we don't need to
2270 	 * pack it to create contiguous free space.
2271 	 */
2272 	need_pack = false;
2273 	for (i=0;i<bh->num_items;i++)
2274 	{
2275 		cip = item_pointer(p, i);
2276 
2277 		if (POOL_ITEM_DELETED & cip->flags)		/* Deleted item? */
2278 		{
2279 			need_pack = true;
2280 			ereport(DEBUG1,
2281 				(errmsg("memcache adding item"),
2282 					 errdetail("start creating contiguous space")));
2283 			break;
2284 		}
2285 	}
2286 
2287 	/*
2288 	 * We disable packing for now.
2289 	 * Revisit and remove following code fragment later.
2290 	 */
2291 	need_pack = false;
2292 
2293 	if (need_pack)
2294 	{
2295 		/*
2296 		 * Pack and create contiguous free space.
2297 		 */
2298 		dcip = calloc(1, pool_config->memqcache_cache_block_size);
2299 		if (!dcip)
2300 		{
2301 			ereport(WARNING,
2302 					(errmsg("memcache: adding item to cache: calloc failed")));
2303 			return NULL;
2304 		}
2305 
2306 		work_buffer = dcip;
2307 		dci = dcip + pool_config->memqcache_cache_block_size;
2308 		num_deleted = 0;
2309 		index = 0;
2310 
2311 		for (i=0;i<bh->num_items;i++)
2312 		{
2313 			int total_length;
2314 			POOL_CACHEID cid;
2315 
2316 			cip = item_pointer(p, i);
2317 
2318 			if (POOL_ITEM_DELETED & cip->flags)		/* Deleted item? */
2319 			{
2320 				num_deleted++;
2321 				continue;
2322 			}
2323 
2324 			/* Copy item body */
2325 			src = p + cip->offset;
2326 			total_length = item_header(p, i)->total_length;
2327 			dst = dci - total_length;
2328 			cip->offset = dst - work_buffer;
2329 			memcpy(dst, src, total_length);
2330 
2331 			dci -= total_length;
2332 
2333 			/* Copy item pointer */
2334 			src = (char *)cip;
2335 			dst = (char *)item_pointer(dcip, index);
2336 			memcpy(dst, src, sizeof(POOL_CACHE_ITEM_POINTER));
2337 
2338 			/* Update hash index */
2339 			cid.blockid = blockid;
2340 			cid.itemid = index;
2341 			pool_hash_insert(&cip->query_hash, &cid, true);
2342 			ereport(DEBUG1,
2343 				(errmsg("memcache adding item"),
2344 					errdetail("item cid updated. old:%d %d new:%d %d",
2345 						   blockid, i, blockid, index)));
2346 			index++;
2347 		}
2348 
2349 		/* All items deleted? */
2350 		if (num_deleted > 0 && num_deleted == bh->num_items)
2351 		{
2352 			ereport(DEBUG1,
2353 				(errmsg("memcache adding item"),
2354 					 errdetail("all items deleted, total deleted:%d", num_deleted)));
2355 			bh->flags = 0;
2356 			pool_init_cache_block(blockid);
2357 			pool_update_fsmm(blockid, POOL_MAX_FREE_SPACE);
2358 		}
2359 		else
2360 		{
2361 			/* Update number of items */
2362 			bh->num_items -= num_deleted;
2363 
2364 			/* Copy back the packed block except block header */
2365 			memcpy(p+sizeof(POOL_CACHE_BLOCK_HEADER),
2366 				   work_buffer+sizeof(POOL_CACHE_BLOCK_HEADER),
2367 				   pool_config->memqcache_cache_block_size-sizeof(POOL_CACHE_BLOCK_HEADER));
2368 		}
2369 		free(work_buffer);
2370 	}
2371 
2372 	/*
2373 	 * Make sure that we have enough free space
2374 	 */
2375 	if (bh->free_bytes < request_size)
2376 	{
2377 		/* This should not happen */
2378 		ereport(WARNING,
2379 			(errmsg("memcache: adding item to cache: not enough space"),
2380 				 errdetail("free space: %d required: %d block id:%d",
2381 						   bh->free_bytes, request_size, blockid)));
2382 		return NULL;
2383 	}
2384 
2385 	/*
2386 	 * At this point, new item can be located at block_header->num_items
2387 	 */
2388 
2389 	/* Fill in cache item header */
2390 	ci.header.timestamp = time(NULL);
2391 	ci.header.total_length = sizeof(POOL_CACHE_ITEM_HEADER) + size;
2392 
2393 	/* Calculate item body address */
2394 	if (bh->num_items == 0)
2395 	{
2396 		/* This is the #0 item. So address is block_bottom -
2397 		 * data_length */
2398 		item = p + pool_config->memqcache_cache_block_size - ci.header.total_length;
2399 
2400 		/* Mark this block used */
2401 		bh->flags = POOL_BLOCK_USED;
2402 	}
2403 	else
2404 	{
2405 		cip = item_pointer(p, bh->num_items-1);
2406 		item = p + cip->offset - ci.header.total_length;
2407 	}
2408 
2409 	/* Copy item header */
2410 	memcpy(item, &ci, sizeof(POOL_CACHE_ITEM_HEADER));
2411 	bh->free_bytes -= sizeof(POOL_CACHE_ITEM_HEADER);
2412 
2413 	/* Copy item body */
2414 	memcpy(item + sizeof(POOL_CACHE_ITEM_HEADER), data, size);
2415 	bh->free_bytes -= size;
2416 
2417 	/* Copy cache item pointer */
2418 	memcpy(&cip_body.query_hash, query_hash, sizeof(POOL_QUERY_HASH));
2419 	memset(&cip_body.next, 0, sizeof(POOL_CACHEID));
2420 	cip_body.offset = item - p;
2421 	cip_body.flags = POOL_ITEM_USED;
2422 	memcpy(item_pointer(p, bh->num_items), &cip_body, sizeof(POOL_CACHE_ITEM_POINTER));
2423 	bh->free_bytes -= sizeof(POOL_CACHE_ITEM_POINTER);
2424 
2425 	/* Update FSMM */
2426 	pool_update_fsmm(blockid, bh->free_bytes);
2427 
2428 	cacheid.blockid = blockid;
2429 	cacheid.itemid = bh->num_items;
2430 	ereport(DEBUG1,
2431 		(errmsg("memcache adding item"),
2432 			errdetail("new item inserted. blockid: %d itemid:%d",
2433 				   cacheid.blockid, cacheid.itemid)));
2434 
2435 	/* Add up number of items */
2436 	bh->num_items++;
2437 
2438 	/* Update hash table */
2439 	if (pool_hash_insert(query_hash, &cacheid, false) < 0)
2440 	{
2441 		ereport(LOG,
2442 				(errmsg("error while adding item to shmem cache, hash insert failed")));
2443 
2444 		/* Since we have failed to insert hash index entry, we need to
2445 		 * undo the addition of cache entry.
2446 		 */
2447 		pool_delete_item_shmem_cache(&cacheid);
2448 		return NULL;
2449 	}
2450 	ereport(DEBUG1,
2451 		(errmsg("memcache adding item"),
2452 			 errdetail("block: %d item: %d", cacheid.blockid, cacheid.itemid)));
2453 
2454 #ifdef SHMEMCACHE_DEBUG
2455 	dump_shmem_cache(blockid);
2456 #endif
2457 	return &cacheid;
2458 }
2459 
2460 /*
2461  * Returns item data address on shared memory cache specified by query hash.
2462  * Also data length is set to *size.
2463  * On error or data not found case returns NULL.
2464  * Detail is set to *sts. (0: success, 1: not found, -1: error)
2465  */
pool_get_item_shmem_cache(POOL_QUERY_HASH * query_hash,int * size,int * sts)2466 static char *pool_get_item_shmem_cache(POOL_QUERY_HASH *query_hash, int *size, int *sts)
2467 {
2468 	POOL_CACHEID *cacheid;
2469 	POOL_CACHE_ITEM_HEADER *cih;
2470 
2471 	if (sts == NULL)
2472 	{
2473 		ereport(LOG,
2474 				(errmsg("error while getting item from shmem cache, sts is NULL")));
2475 		return NULL;
2476 	}
2477 
2478 	*sts = -1;
2479 
2480 	if (query_hash == NULL)
2481 	{
2482 		ereport(LOG,
2483 				(errmsg("error while getting item from shmem cache, query hash is NULL")));
2484 		return NULL;
2485 	}
2486 
2487 	if (size == NULL)
2488 	{
2489 		ereport(LOG,
2490 				(errmsg("error while getting item from shmem cache, size is NULL")));
2491 		return NULL;
2492 	}
2493 
2494 	/*
2495 	 * Find cache header by using hash table
2496 	 */
2497 	cacheid = pool_find_item_on_shmem_cache(query_hash);
2498 	if (cacheid == NULL)
2499 	{
2500 		/* Not found */
2501 		*sts = 1;
2502 		return NULL;
2503 	}
2504 
2505 	cih = pool_cache_item_header(cacheid);
2506 
2507 	*size = cih->total_length - sizeof(POOL_CACHE_ITEM_HEADER);
2508 	return (char *)cih + sizeof(POOL_CACHE_ITEM_HEADER);
2509 }
2510 
2511 /*
2512  * Find data on shared memory cache specified query hash.
2513  * On success returns cache id.
2514  * The cache id is overwritten by the subsequent call to this function.
2515  */
pool_find_item_on_shmem_cache(POOL_QUERY_HASH * query_hash)2516 static POOL_CACHEID *pool_find_item_on_shmem_cache(POOL_QUERY_HASH *query_hash)
2517 {
2518 	static POOL_CACHEID cacheid;
2519 	POOL_CACHEID *c;
2520 	POOL_CACHE_ITEM_HEADER *cih;
2521 	time_t now;
2522 
2523 	c = pool_hash_search(query_hash);
2524 	if (!c)
2525 	{
2526 		return NULL;
2527 	}
2528 
2529 	cih = item_header(block_address(c->blockid), c->itemid);
2530 
2531 	/* Check cache expiration */
2532 	if (pool_config->memqcache_expire > 0)
2533 	{
2534 		now = time(NULL);
2535 		if (now > (cih->timestamp + pool_config->memqcache_expire))
2536 		{
2537 			ereport(DEBUG1,
2538 				(errmsg("memcache finding item"),
2539 					errdetail("cache expired: now: %ld timestamp: %ld",
2540 						   now, cih->timestamp + pool_config->memqcache_expire)));
2541 			pool_delete_item_shmem_cache(c);
2542 			return NULL;
2543 		}
2544 	}
2545 
2546 	cacheid.blockid = c->blockid;
2547 	cacheid.itemid = c->itemid;
2548 	return &cacheid;
2549 }
2550 
2551 /*
2552  * Delete item data specified cache id from shmem.
2553  * On successful deletion, returns 0.
2554  * Other wise return -1.
2555  * FSMM is also updated.
2556  */
pool_delete_item_shmem_cache(POOL_CACHEID * cacheid)2557 static int pool_delete_item_shmem_cache(POOL_CACHEID *cacheid)
2558 {
2559 	POOL_CACHE_BLOCK_HEADER *bh;
2560 	POOL_CACHE_ITEM_POINTER *cip;
2561 	POOL_CACHE_ITEM_HEADER *cih;
2562 	POOL_QUERY_HASH key;
2563 	int size;
2564 	ereport(DEBUG1,
2565 		(errmsg("memcache deleting item data"),
2566 			errdetail("cacheid:%d itemid:%d",cacheid->blockid, cacheid->itemid)));
2567 
2568 	if (cacheid->blockid >= pool_get_memqcache_blocks())
2569 	{
2570 		ereport(LOG,
2571 				(errmsg("error while deleting item from shmem cache, invalid block: %d",
2572 						cacheid->blockid)));
2573 		return -1;
2574 	}
2575 
2576 	bh = (POOL_CACHE_BLOCK_HEADER *)block_address(cacheid->blockid);
2577 	if (!(bh->flags & POOL_BLOCK_USED))
2578 	{
2579 		ereport(LOG,
2580 				(errmsg("error while deleting item from shmem cache, block: %d is not used",
2581 						cacheid->blockid)));
2582 		return -1;
2583 	}
2584 
2585 	if (cacheid->itemid >= bh->num_items)
2586 	{
2587 		/*
2588 		 * This could happen if the block is reused.  Since contents
2589 		 * of oid map file is not updated when the block is reused.
2590 		 */
2591 		ereport(DEBUG1,
2592 			(errmsg("memcache error deleting item data"),
2593 				errdetail("invalid item id %d in block:%d",
2594 					   cacheid->itemid, cacheid->blockid)));
2595 
2596 		return -1;
2597 	}
2598 
2599 	cip = item_pointer(block_address(cacheid->blockid), cacheid->itemid);
2600 	if (!(cip->flags & POOL_ITEM_USED))
2601 	{
2602 		ereport(LOG,
2603 				(errmsg("error while deleting item from shmem cache, item: %d was not used",
2604 						cacheid->itemid)));
2605 		return -1;
2606 	}
2607 
2608 	if (cip->flags & POOL_ITEM_DELETED)
2609 	{
2610 		ereport(LOG,
2611 				(errmsg("error while deleting item from shmem cache, item: %d was already deleted",
2612 						cacheid->itemid)));
2613 		return -1;
2614 	}
2615 
2616 	/* Save cache key */
2617 	memcpy(&key, &cip->query_hash, sizeof(POOL_QUERY_HASH));
2618 
2619 	cih = pool_cache_item_header(cacheid);
2620 	size = cih->total_length + sizeof(POOL_CACHE_ITEM_POINTER);
2621 
2622 	/* Delete item pointer */
2623 	cip->flags |= POOL_ITEM_DELETED;
2624 
2625 	/*
2626 	 * We do NOT count down bh->num_items here. The deleted space will be recycled
2627 	 * by pool_add_item_shmem_cache(). However, if this is the last item, we can
2628 	 * recycle whole block.
2629 	 *
2630 	 * 2012/4/1: Now we do not pack data in
2631 	 * pool_add_item_shmem_cache() for performance reason. Also we
2632 	 * count down num_items if it is the last one.
2633 	 */
2634 	if ((bh->num_items -1) == 0)
2635 	{
2636 		ereport(DEBUG1,
2637 			(errmsg("memcache deleting item data"),
2638 				 errdetail("no item remains. initialize block")));
2639 		bh->flags = 0;
2640 		pool_init_cache_block(cacheid->blockid);
2641 	}
2642 
2643 	/* Remove hash index */
2644 	pool_hash_delete(&key);
2645 
2646 	/*
2647 	 * If the deleted item is last one in the block, we add it to the free space.
2648 	 */
2649 	if (cacheid->itemid == (bh->num_items -1))
2650 	{
2651 		bh->free_bytes += size;
2652 		ereport(DEBUG1,
2653 			(errmsg("memcache deleting item data"),
2654 				errdetail("deleted %d bytes, freebytes is = %d",
2655 					   size, bh->free_bytes)));
2656 
2657 		bh->num_items--;
2658 	}
2659 
2660 	/* Update FSMM */
2661 	pool_update_fsmm(cacheid->blockid, bh->free_bytes);
2662 
2663 	return 0;
2664 }
2665 
2666 /*
2667  * Returns item header specified by cache id.
2668  */
pool_cache_item_header(POOL_CACHEID * cacheid)2669 static POOL_CACHE_ITEM_HEADER *pool_cache_item_header(POOL_CACHEID *cacheid)
2670 {
2671 	POOL_CACHE_BLOCK_HEADER *bh;
2672 
2673 	if (cacheid->blockid >= pool_get_memqcache_blocks())
2674 	{
2675 		ereport(WARNING,
2676 				(errmsg("error while getting cache item header, invalid block id: %d", cacheid->blockid)));
2677 		return NULL;
2678 	}
2679 
2680 	bh = (POOL_CACHE_BLOCK_HEADER *)block_address(cacheid->blockid);
2681 	if (cacheid->itemid >= bh->num_items)
2682 	{
2683 		ereport(WARNING,
2684 				(errmsg("error while getting cache item header, invalid item id: %d", cacheid->itemid)));
2685 		return NULL;
2686 	}
2687 
2688 	return item_header((char *)bh, cacheid->itemid);
2689 }
2690 
2691 /*
2692  * Initialize specified block.
2693  */
pool_init_cache_block(POOL_CACHE_BLOCKID blockid)2694 static int pool_init_cache_block(POOL_CACHE_BLOCKID blockid)
2695 {
2696 	char *p;
2697 	POOL_CACHE_BLOCK_HEADER *bh;
2698 
2699 	if (blockid >= pool_get_memqcache_blocks())
2700 	{
2701 		ereport(WARNING,
2702 				(errmsg("error while initializing cache block, invalid block id: %d", blockid)));
2703 		return -1;
2704 	}
2705 
2706 	p = block_address(blockid);
2707 	bh = (POOL_CACHE_BLOCK_HEADER *)p;
2708 
2709 	/* Is this block used? */
2710 	if (!(bh->flags & POOL_BLOCK_USED))
2711 	{
2712 		/* Initialize empty block */
2713 		memset(p, 0, pool_config->memqcache_cache_block_size);
2714 		bh->free_bytes = pool_config->memqcache_cache_block_size -
2715 			sizeof(POOL_CACHE_BLOCK_HEADER);
2716 	}
2717 	return 0;
2718 }
2719 
2720 #if NOT_USED
2721 /*
2722  * Delete all items in the block.
2723  */
pool_wipe_out_cache_block(POOL_CACHE_BLOCKID blockid)2724 static void pool_wipe_out_cache_block(POOL_CACHE_BLOCKID blockid)
2725 {
2726 	char *p;
2727 	POOL_CACHE_BLOCK_HEADER *bh;
2728 	POOL_CACHE_ITEM_POINTER *cip;
2729 	POOL_CACHEID cacheid;
2730 	int i;
2731 
2732 	/* Get block address on shmem */
2733 	p = block_address(blockid);
2734 	bh = (POOL_CACHE_BLOCK_HEADER *)p;
2735 	cacheid.blockid = blockid;
2736 
2737 	for (i=0;i<bh->num_items;i++)
2738 	{
2739 		cip = item_pointer(p, i);
2740 
2741 		if ((POOL_ITEM_DELETED & cip->flags) == 0)		/* Not deleted item? */
2742 		{
2743 			cacheid.itemid = i;
2744 			pool_delete_item_shmem_cache(&cacheid);
2745 		}
2746 	}
2747 
2748 	bh->flags = 0;
2749 	pool_init_cache_block(blockid);
2750 	pool_update_fsmm(blockid, POOL_MAX_FREE_SPACE);
2751 }
2752 #endif
2753 
2754 /*
2755  * Acquire lock: XXX giant lock
2756  */
pool_shmem_lock(void)2757 void pool_shmem_lock(void)
2758 {
2759 	if (pool_is_shmem_cache())
2760 	{
2761 		pool_semaphore_lock(SHM_CACHE_SEM);
2762 
2763 	}
2764 }
2765 
2766 /*
2767  * Release lock
2768  */
pool_shmem_unlock(void)2769 void pool_shmem_unlock(void)
2770 {
2771 	if (pool_is_shmem_cache())
2772 	{
2773 		pool_semaphore_unlock(SHM_CACHE_SEM);
2774 	}
2775 }
2776 
2777 /*
2778  * Returns cache block address specified by block id
2779  */
block_address(int blockid)2780 static char *block_address(int blockid)
2781 {
2782 	char *p;
2783 
2784 	p = pool_memory_cache_address() +
2785 		blockid *	pool_config->memqcache_cache_block_size;
2786 	return p;
2787 }
2788 
2789 /*
2790  * Returns i th item pointer in block address block
2791  */
item_pointer(char * block,int i)2792 static POOL_CACHE_ITEM_POINTER *item_pointer(char *block, int i)
2793 {
2794 	return (POOL_CACHE_ITEM_POINTER *)(block + sizeof(POOL_CACHE_BLOCK_HEADER) +
2795 									   sizeof(POOL_CACHE_ITEM_POINTER) * i);
2796 }
2797 
2798 /*
2799  * Returns i th item header in block address block
2800  */
item_header(char * block,int i)2801 static POOL_CACHE_ITEM_HEADER *item_header(char *block, int i)
2802 {
2803 	POOL_CACHE_ITEM_POINTER *cip;
2804 
2805 	cip = item_pointer(block, i);
2806 	return (POOL_CACHE_ITEM_HEADER *)(block + cip->offset);
2807 }
2808 
2809 #ifdef SHMEMCACHE_DEBUG
2810 /*
2811  * Dump shmem cache block
2812  */
dump_shmem_cache(POOL_CACHE_BLOCKID blockid)2813 static void dump_shmem_cache(POOL_CACHE_BLOCKID blockid)
2814 {
2815 	POOL_CACHE_BLOCK_HEADER *bh;
2816 	POOL_CACHE_ITEM_POINTER *cip;
2817 	POOL_CACHE_ITEM_HEADER *cih;
2818 	int i;
2819 
2820 	bh = (POOL_CACHE_BLOCK_HEADER *)block_address(blockid);
2821 	fprintf(stderr, "shmem: block header(%lu bytes): flags:%x num_items:%d free_bytes:%d\n",
2822 			sizeof(*bh), bh->flags, bh->num_items, bh->free_bytes);
2823 	for (i=0;i<bh->num_items;i++)
2824 	{
2825 		cip = item_pointer((char *)bh, i);
2826 		fprintf(stderr, "shmem: block: %d %d th item pointer(%lu bytes): offset:%d flags:%x\n",
2827 				blockid, i, sizeof(*cip), cip->offset, cip->flags);
2828 		cih = item_header((char *)bh, i);
2829 		fprintf(stderr, "shmem: block: %d %d th item header(%lu bytes): timestamp:%ld length:%d\n",
2830 				blockid, i, sizeof(*cih), cih->timestamp, cih->total_length);
2831 	}
2832 }
2833 #endif
2834 
2835 /*
2836  * SELECT query result array modules
2837  */
pool_create_query_cache_array(void)2838 POOL_QUERY_CACHE_ARRAY *pool_create_query_cache_array(void)
2839 {
2840 #define POOL_QUERY_CACHE_ARRAY_ALLOCATE_NUM 128
2841 #define POOL_QUERY_CACHE_ARRAY_HEADER_SIZE (sizeof(int)+sizeof(int))
2842 
2843 	size_t size;
2844 	POOL_QUERY_CACHE_ARRAY *p;
2845 	POOL_SESSION_CONTEXT *session_context =pool_get_session_context(false);
2846 	MemoryContext old_context = MemoryContextSwitchTo(session_context->memory_context);
2847 
2848 	size = POOL_QUERY_CACHE_ARRAY_HEADER_SIZE + POOL_QUERY_CACHE_ARRAY_ALLOCATE_NUM *
2849 		sizeof(POOL_TEMP_QUERY_CACHE *);
2850 	p = palloc(size);
2851 	p->num_caches = 0;
2852 	p->array_size = POOL_QUERY_CACHE_ARRAY_ALLOCATE_NUM;
2853     MemoryContextSwitchTo(old_context);
2854 	return p;
2855 }
2856 
2857 /*
2858  * Discard query cache array
2859  */
pool_discard_query_cache_array(POOL_QUERY_CACHE_ARRAY * cache_array)2860 void pool_discard_query_cache_array(POOL_QUERY_CACHE_ARRAY *cache_array)
2861 {
2862 	int i;
2863 
2864 	if (!cache_array)
2865 		return;
2866 
2867 	ereport(DEBUG1,
2868 		(errmsg("memcache discarding query cache array"),
2869 			 errdetail("num_caches: %d", cache_array->num_caches)));
2870 
2871 	for (i=0;i<cache_array->num_caches;i++)
2872 	{
2873 		ereport(DEBUG2,
2874 			(errmsg("memcache discarding query cache array"),
2875 				 errdetail("cache no: %d cache: %p", i, cache_array->caches[i])));
2876 		pool_discard_temp_query_cache(cache_array->caches[i]);
2877 	}
2878 	pfree(cache_array);
2879 }
2880 
2881 /*
2882  * Add query cache array
2883  */
pool_add_query_cache_array(POOL_QUERY_CACHE_ARRAY * cache_array,POOL_TEMP_QUERY_CACHE * cache)2884 static POOL_QUERY_CACHE_ARRAY * pool_add_query_cache_array(POOL_QUERY_CACHE_ARRAY *cache_array, POOL_TEMP_QUERY_CACHE *cache)
2885 {
2886 	size_t size;
2887 	POOL_QUERY_CACHE_ARRAY *cp = cache_array;
2888 
2889 	if (!cache_array)
2890 		return cp;
2891 
2892 	ereport(DEBUG2,
2893 		(errmsg("memcache adding query cache array"),
2894 			 errdetail("num_caches: %d cache: %p", cache_array->num_caches, cache)));
2895 	if (cache_array->num_caches >= 	cache_array->array_size)
2896 	{
2897 		cache_array->array_size += POOL_QUERY_CACHE_ARRAY_ALLOCATE_NUM;
2898 		size = POOL_QUERY_CACHE_ARRAY_HEADER_SIZE + cache_array->array_size *
2899 			sizeof(POOL_TEMP_QUERY_CACHE *);
2900 		cache_array = repalloc(cache_array, size);
2901 	}
2902 	cache_array->caches[cache_array->num_caches++] = cache;
2903 	return cache_array;
2904 }
2905 
2906 /*
2907  * SELECT query result temporary cache modules
2908  */
2909 
2910 /*
2911  * Create SELECT result temporary cache
2912  */
pool_create_temp_query_cache(char * query)2913 POOL_TEMP_QUERY_CACHE *pool_create_temp_query_cache(char *query)
2914 {
2915 	POOL_SESSION_CONTEXT *session_context =pool_get_session_context(false);
2916     MemoryContext old_context = MemoryContextSwitchTo(session_context->memory_context);
2917 	POOL_TEMP_QUERY_CACHE *p;
2918 	p = palloc(sizeof(*p));
2919     p->query = pstrdup(query);
2920 
2921     p->buffer = pool_create_buffer();
2922     p->oids = pool_create_buffer();
2923     p->num_oids = 0;
2924     p->is_exceeded = false;
2925     p->is_discarded = false;
2926 
2927     MemoryContextSwitchTo(old_context);
2928 
2929 	ereport(DEBUG1,
2930 			(errmsg("pool_create_temp_query_cache: cache created: %p", p)));
2931 
2932 	return p;
2933 }
2934 
2935 /*
2936  * Discard temp query cache
2937  */
pool_discard_temp_query_cache(POOL_TEMP_QUERY_CACHE * temp_cache)2938 void pool_discard_temp_query_cache(POOL_TEMP_QUERY_CACHE *temp_cache)
2939 {
2940 	if (!temp_cache)
2941 		return;
2942 
2943 	if (temp_cache->query)
2944 		pfree(temp_cache->query);
2945 	if (temp_cache->buffer)
2946 		pool_discard_buffer(temp_cache->buffer);
2947 	if (temp_cache->oids)
2948 		pool_discard_buffer(temp_cache->oids);
2949 	pfree(temp_cache);
2950 
2951 	ereport(DEBUG1,
2952 			(errmsg("pool_discard_temp_query_cache: cache discarded: %p", temp_cache)));
2953 }
2954 
2955 /*
2956  * Add data to temp query cache.
2957  * Data must be FE/BE protocol packet.
2958  */
pool_add_temp_query_cache(POOL_TEMP_QUERY_CACHE * temp_cache,char kind,char * data,int data_len)2959 static void pool_add_temp_query_cache(POOL_TEMP_QUERY_CACHE *temp_cache, char kind, char *data, int data_len)
2960 {
2961 	POOL_INTERNAL_BUFFER *buffer;
2962 	size_t buflen;
2963 	int send_len;
2964 
2965 	if (temp_cache == NULL)
2966 	{
2967 		/* This could happen if cache exceeded in previous query
2968 		 * execution in the same unnamed portal.
2969 		 */
2970 		ereport(DEBUG1,
2971 			(errmsg("memcache adding temporary query cache"),
2972 				 errdetail("POOL_TEMP_QUERY_CACHE is NULL")));
2973 		return;
2974 	}
2975 
2976 	if (temp_cache->is_exceeded)
2977 	{
2978 		ereport(DEBUG1,
2979 			(errmsg("memcache adding temporary query cache"),
2980 				 errdetail("memqcache_maxcache exceeds")));
2981 		return;
2982 	}
2983 
2984 	/*
2985 	 * We only store T(Table Description), D(Data row), C(Command Complete),
2986 	 * 1(ParseComplete), 2(BindComplete)
2987 	 */
2988     if (kind != 'T' && kind != 'D' && kind != 'C' && kind != '1' && kind != '2')
2989     {
2990 		return;
2991 	}
2992 
2993 	/* Check data limit */
2994 	buffer = temp_cache->buffer;
2995 	buflen = pool_get_buffer_length(buffer);
2996 
2997 	if ((buflen+data_len+sizeof(int)+1) > pool_config->memqcache_maxcache)
2998 	{
2999 		ereport(DEBUG1,
3000 			(errmsg("memcache adding temporary query cache"),
3001 				errdetail("data size exceeds memqcache_maxcache. current:%zd requested:%zd memq_maxcache:%d",
3002 					   buflen, data_len+sizeof(int)+1, pool_config->memqcache_maxcache)));
3003 		temp_cache->is_exceeded = true;
3004 		return;
3005 	}
3006 
3007 	pool_add_buffer(buffer, &kind, 1);
3008 	send_len = htonl(data_len + sizeof(int));
3009 	pool_add_buffer(buffer, (char *)&send_len, sizeof(int));
3010 	pool_add_buffer(buffer, data, data_len);
3011 
3012 	return;
3013 }
3014 
3015 /*
3016  * Add table oids used by SELECT to temp query cache.
3017  */
pool_add_oids_temp_query_cache(POOL_TEMP_QUERY_CACHE * temp_cache,int num_oids,int * oids)3018 static void pool_add_oids_temp_query_cache(POOL_TEMP_QUERY_CACHE *temp_cache, int num_oids, int *oids)
3019 {
3020 	POOL_INTERNAL_BUFFER *buffer;
3021 
3022 	if (!temp_cache || num_oids <= 0)
3023 		return;
3024 
3025 	buffer = temp_cache->oids;
3026 	pool_add_buffer(buffer, oids, num_oids*sizeof(int));
3027 	temp_cache->num_oids = num_oids;
3028 }
3029 
3030 /*
3031  * Internal buffer management modules.
3032  * Usage:
3033  * 1) Create buffer using pool_create_buffer().
3034  * 2) Add data to buffer using pool_add_buffer().
3035  * 3) Extract (copied) data from buffer using pool_get_buffer().
3036  * 4) Optionally you can:
3037  *		Obtain buffer length by using pool_get_buffer_length().
3038  *		Obtain buffer pointer by using pool_get_buffer_pointer().
3039  * 5) Discard buffer using pool_discard_buffer().
3040  */
3041 
3042 /*
3043  * Create and return internal buffer
3044  */
pool_create_buffer(void)3045 static POOL_INTERNAL_BUFFER *pool_create_buffer(void)
3046 {
3047 	POOL_INTERNAL_BUFFER *p;
3048 	p = palloc0(sizeof(*p));
3049 	return p;
3050 }
3051 
3052 /*
3053  * Discard internal buffer
3054  */
pool_discard_buffer(POOL_INTERNAL_BUFFER * buffer)3055 static void pool_discard_buffer(POOL_INTERNAL_BUFFER *buffer)
3056 {
3057 	if (buffer)
3058 	{
3059 		if (buffer->buf)
3060 			pfree(buffer->buf);
3061 		pfree(buffer);
3062 	}
3063 }
3064 
3065 /*
3066  * Add data to internal buffer
3067  */
pool_add_buffer(POOL_INTERNAL_BUFFER * buffer,void * data,size_t len)3068 static void pool_add_buffer(POOL_INTERNAL_BUFFER *buffer, void *data, size_t len)
3069 {
3070 #define POOL_ALLOCATE_UNIT 8192
3071 
3072 	/* Sanity check */
3073 	if (!buffer || !data || len == 0)
3074 		return;
3075 	POOL_SESSION_CONTEXT *session_context =pool_get_session_context(false);
3076 	MemoryContext old_context = MemoryContextSwitchTo(session_context->memory_context);
3077 
3078 	/* Check if we need to increase the buffer size */
3079 	if ((buffer->buflen + len) > buffer->bufsize)
3080 	{
3081 		size_t allocate_size = ((buffer->buflen + len)/POOL_ALLOCATE_UNIT +1)*POOL_ALLOCATE_UNIT;
3082 		ereport(DEBUG2,
3083 			(errmsg("memcache adding data to internal buffer"),
3084 				errdetail("realloc old size:%zd new size:%zd",
3085 					   buffer->bufsize, allocate_size)));
3086 		buffer->bufsize = allocate_size;
3087 		buffer->buf = (char *)repalloc(buffer->buf, buffer->bufsize);
3088 	}
3089 	/* Add data to buffer */
3090 	memcpy(buffer->buf+buffer->buflen, data, len);
3091 	buffer->buflen += len;
3092 	ereport(DEBUG2,
3093 		(errmsg("memcache adding data to internal buffer"),
3094 			errdetail("len:%zd, total:%zd bufsize:%zd",
3095 				   len, buffer->buflen, buffer->bufsize)));
3096 	MemoryContextSwitchTo(old_context);
3097 	return;
3098 }
3099 
3100 /*
3101  * Get data from internal buffer.
3102  * Data is stored in newly malloc memory.
3103  * Data length is returned to len.
3104  */
pool_get_buffer(POOL_INTERNAL_BUFFER * buffer,size_t * len)3105 static void *pool_get_buffer(POOL_INTERNAL_BUFFER *buffer, size_t *len)
3106 {
3107 	void *p;
3108 
3109 	if (buffer->bufsize == 0 || buffer->buflen == 0 ||
3110 		buffer->buf == NULL)
3111 	{
3112 		*len = 0;
3113 		return NULL;
3114 	}
3115 
3116 	p = palloc(buffer->buflen);
3117 	memcpy(p, buffer->buf, buffer->buflen);
3118 	*len = buffer->buflen;
3119 	return p;
3120 }
3121 
3122 /*
3123  * Get internal buffer length.
3124  */
pool_get_buffer_length(POOL_INTERNAL_BUFFER * buffer)3125 static size_t pool_get_buffer_length(POOL_INTERNAL_BUFFER *buffer)
3126 {
3127 	if (buffer == NULL)
3128 		return 0;
3129 
3130 	return buffer->buflen;
3131 }
3132 
3133 #ifdef NOT_USED
3134 /*
3135  * Get internal buffer pointer.
3136  */
pool_get_buffer_pointer(POOL_INTERNAL_BUFFER * buffer)3137 static char *pool_get_buffer_pointer(POOL_INTERNAL_BUFFER *buffer)
3138 {
3139 	if (buffer == NULL)
3140 		return NULL;
3141 	return buffer->buf;
3142 }
3143 #endif
3144 /*
3145  * Get query cache buffer struct of current query context
3146  */
pool_get_current_cache(void)3147 POOL_TEMP_QUERY_CACHE *pool_get_current_cache(void)
3148 {
3149 	POOL_SESSION_CONTEXT *session_context;
3150 	POOL_QUERY_CONTEXT *query_context;
3151 	POOL_TEMP_QUERY_CACHE *p = NULL;
3152 
3153 	session_context = pool_get_session_context(true);
3154 	if (session_context)
3155 	{
3156 		query_context = session_context->query_context;
3157 		if (query_context)
3158 		{
3159 			p = query_context->temp_cache;
3160 		}
3161 	}
3162 	return p;
3163 }
3164 
3165 /*
3166  * Get query cache buffer of current query context
3167  */
pool_get_current_cache_buffer(size_t * len)3168 static char *pool_get_current_cache_buffer(size_t *len)
3169 {
3170 	char *p = NULL;
3171 	*len = 0;
3172 	POOL_TEMP_QUERY_CACHE *cache;
3173 
3174 	cache = pool_get_current_cache();
3175 	if (cache)
3176 	{
3177 		p = pool_get_buffer(cache->buffer, len);
3178 	}
3179 	return p;
3180 }
3181 
3182 /*
3183  * Mark this temporary query cache buffer discarded if the SELECT
3184  * uses the table oid specified by oids.
3185  */
pool_check_and_discard_cache_buffer(int num_oids,int * oids)3186 static void pool_check_and_discard_cache_buffer(int num_oids, int *oids)
3187 {
3188 	POOL_SESSION_CONTEXT *session_context;
3189 	POOL_TEMP_QUERY_CACHE *cache;
3190 	int num_caches;
3191 	size_t len;
3192 	int *soids;
3193 	int i, j, k;
3194 
3195 	session_context = pool_get_session_context(true);
3196 
3197 	if (!session_context || !session_context->query_cache_array)
3198 		return;
3199 
3200 	num_caches = session_context->query_cache_array->num_caches;
3201 
3202 	for (i=0;i<num_caches;i++)
3203 	{
3204 		cache = session_context->query_cache_array->caches[i];
3205 		if (!cache || cache->is_discarded)
3206 			continue;
3207 
3208 		soids = (int *)pool_get_buffer(cache->oids, &len);
3209 		if (!soids || !len)
3210 			continue;
3211 
3212 		for(j=0;j<cache->num_oids;j++)
3213 		{
3214 			if (cache->is_discarded)
3215 				break;
3216 
3217 			for (k=0;k<num_oids;k++)
3218 			{
3219 				if (soids[j] == oids[k])
3220 				{
3221 					ereport(DEBUG1,
3222 							(errmsg("discard cache for \"%s\"",cache->query)));
3223 					cache->is_discarded = true;
3224 					break;
3225 				}
3226 			}
3227 		}
3228 		pfree(soids);
3229 	}
3230 }
3231 
3232 /*
3233  * At Ready for Query or Comand Complete handle query cache.  For streaming
3234  * replication mode and extended query at Comand Complete handle query cache.
3235  * For other case At Ready for Query handle query cache.
3236  */
pool_handle_query_cache(POOL_CONNECTION_POOL * backend,char * query,Node * node,char state)3237 void pool_handle_query_cache(POOL_CONNECTION_POOL *backend, char *query, Node *node, char state)
3238 {
3239 	POOL_SESSION_CONTEXT *session_context;
3240 	pool_sigset_t oldmask;
3241 	char *cache_buffer;
3242 	size_t len;
3243 	int num_oids;
3244 	int *oids;
3245 	int i;
3246 
3247 	session_context = pool_get_session_context(true);
3248 
3249 	/* Ok to cache SELECT result? */
3250 	if (pool_is_cache_safe())
3251 	{
3252 		SelectContext ctx;
3253 		MemoryContext old_context;
3254 		old_context = MemoryContextSwitchTo(session_context->memory_context);
3255 		num_oids = pool_extract_table_oids_from_select_stmt(node, &ctx);
3256 		MemoryContextSwitchTo(old_context);
3257 		oids = ctx.table_oids;
3258 		ereport(DEBUG2,
3259 			(errmsg("query cache handler for ReadyForQuery"),
3260 				 errdetail("num_oids: %d oid: %d", num_oids, *oids)));
3261 
3262 		if (state == 'I')		/* Not inside a transaction? */
3263 		{
3264 			/*
3265 			 * Make sure that temporary cache is not exceeded.
3266 			 */
3267 			if (!pool_is_cache_exceeded())
3268 			{
3269 				POOL_TEMP_QUERY_CACHE *cache;
3270 				/*
3271 				 * If we are not inside a transaction, we can
3272 				 * immediately register to cache storage.
3273 				 */
3274 				/* Register to memcached or shmem */
3275 				POOL_SETMASK2(&BlockSig, &oldmask);
3276 				pool_shmem_lock();
3277 
3278 				cache_buffer =  pool_get_current_cache_buffer(&len);
3279 				if (cache_buffer)
3280 				{
3281 					if (session_context->query_context->skip_cache_commit == false)
3282 					{
3283 						if (pool_commit_cache(backend, query, cache_buffer, len, num_oids, oids) != 0)
3284 						{
3285 							ereport(WARNING,
3286 									(errmsg("ReadyForQuery: pool_commit_cache failed")));
3287 						}
3288 					}
3289 					/*
3290 					 * Reset temporary query cache buffer. This is
3291 					 * necessary if extended query protocol is used and a
3292 					 * bind/execute message arrives which uses a statement
3293 					 * created by prior parse message. In this case since
3294 					 * the temp_cache is not initialized by a parse
3295 					 * message, messages are added to pre existing temp
3296 					 * cache buffer. The problem was found in bug#152.
3297 					 * http://www.pgpool.net/mantisbt/view.php?id=152
3298 					 */
3299 					cache = pool_get_current_cache();
3300 					ereport(DEBUG1,
3301 							(errmsg("pool_handle_query_cache: temp_cache: %p", cache)));
3302 					pool_discard_temp_query_cache(cache);
3303 
3304 					if (SL_MODE && pool_is_doing_extended_query_message())
3305 						session_context->query_context->temp_cache = NULL;
3306 					else
3307 						session_context->query_context->temp_cache = pool_create_temp_query_cache(query);
3308 					pfree(cache_buffer);
3309 				}
3310 				pool_shmem_unlock();
3311 				POOL_SETMASK(&oldmask);
3312 			}
3313 
3314 			/* Count up SELECT stats */
3315 			pool_stats_count_up_num_selects(1);
3316 
3317 			/* Reset temp buffer */
3318 			pool_reset_memqcache_buffer(true);
3319 		}
3320 		else
3321 		{
3322 			POOL_TEMP_QUERY_CACHE *cache = pool_get_current_cache();
3323 
3324 			/* In transaction. Keep to temp query cache array */
3325 			pool_add_oids_temp_query_cache(cache, num_oids, oids);
3326 
3327 			/*
3328 			 * If temp cache has been overflowed, just trash the half
3329 			 * baked temp cache.
3330 			 */
3331 			if (pool_is_cache_exceeded())
3332 			{
3333 				POOL_TEMP_QUERY_CACHE *cache;
3334 
3335 				cache = pool_get_current_cache();
3336 				pool_discard_temp_query_cache(cache);
3337 				/*
3338 				 * Reset temp_cache pointer in the current query context
3339 				 * so that we don't double free memory.
3340 				 */
3341 				session_context->query_context->temp_cache = NULL;
3342 
3343 			}
3344 			/*
3345 			 * Otherwise add to the temp cache array.
3346 			 */
3347 			else
3348 			{
3349 				session_context->query_cache_array =
3350 					pool_add_query_cache_array(session_context->query_cache_array, cache);
3351 				/*
3352 				 * Reset temp_cache pointer in the current query
3353 				 * context so that we don't add the same temp cache to
3354 				 * the cache array. This is necessary such that case
3355 				 * when next query is just a "bind message", without
3356 				 * "parse message". In the case the query context is
3357 				 * reused and same cache pointer will be added to the
3358 				 * query_cache_array which we do not want.
3359 				 */
3360 				session_context->query_context->temp_cache = NULL;
3361 			}
3362 
3363 			/* Count up temporary SELECT stats */
3364 			pool_tmp_stats_count_up_num_selects();
3365 		}
3366 	}
3367 	else if (is_rollback_query(node))	/* Rollback? */
3368 	{
3369 		/* Discard buffered data */
3370 		pool_reset_memqcache_buffer(true);
3371 	}
3372 	else if (is_commit_query(node))		/* Commit? */
3373 	{
3374 		int num_caches;
3375 
3376 		POOL_SETMASK2(&BlockSig, &oldmask);
3377 		pool_shmem_lock();
3378 
3379 		/* Invalidate query cache */
3380 		if (pool_config->memqcache_auto_cache_invalidation)
3381 		{
3382 			num_oids = pool_get_dml_table_oid(&oids);
3383 			pool_invalidate_query_cache(num_oids, oids, true, 0);
3384 		}
3385 
3386 		/*
3387 		 * If we have something in the query cache buffer, that means
3388 		 * either:
3389 		 * - We only had SELECTs in the transaction
3390 		 * - We had only SELECTs after last DML
3391 		 * Thus we can register SELECT results to cache storage.
3392 		 */
3393 		num_caches = session_context->query_cache_array->num_caches;
3394 		for (i=0;i<num_caches;i++)
3395 		{
3396 			POOL_TEMP_QUERY_CACHE *cache;
3397 
3398 			cache = session_context->query_cache_array->caches[i];
3399 			if (!cache || cache->is_discarded)
3400 				continue;
3401 
3402 			num_oids = cache->num_oids;
3403 			oids = pool_get_buffer(cache->oids, &len);
3404 			cache_buffer = pool_get_buffer(cache->buffer, &len);
3405 
3406 			if (pool_commit_cache(backend, cache->query, cache_buffer, len, num_oids, oids) != 0)
3407 			{
3408 				ereport(WARNING,
3409 						(errmsg("ReadyForQuery: pool_commit_cache failed")));
3410 			}
3411 			if (oids)
3412 				pfree(oids);
3413 			if (cache_buffer)
3414 				pfree(cache_buffer);
3415 		}
3416 		pool_shmem_unlock();
3417 		POOL_SETMASK(&oldmask);
3418 
3419 		/* Count up number of SELECT stats */
3420 		pool_stats_count_up_num_selects(pool_tmp_stats_get_num_selects());
3421 
3422 		pool_reset_memqcache_buffer(true);
3423 	}
3424 	else		/* Non cache safe queries */
3425 	{
3426 		/* Non cachable SELECT */
3427 		if (node && IsA(node, SelectStmt))
3428 		{
3429 			/* Extract table oids from buffer */
3430 			num_oids = pool_get_dml_table_oid(&oids);
3431 
3432 			if (state == 'I')
3433 			{
3434 				/*
3435 				 * If Data-modifying statements in SELECT's WITH clause,
3436 				 * invalidate query cache.
3437 				 */
3438 				if (num_oids > 0 && pool_config->memqcache_auto_cache_invalidation)
3439 				{
3440 					POOL_SETMASK2(&BlockSig, &oldmask);
3441 					pool_shmem_lock();
3442 					pool_invalidate_query_cache(num_oids, oids, true, 0);
3443 					pool_shmem_unlock();
3444 					POOL_SETMASK(&oldmask);
3445 				}
3446 
3447 				/* Count up SELECT stats */
3448 				pool_stats_count_up_num_selects(1);
3449 				pool_reset_memqcache_buffer(true);
3450 			}
3451 			else
3452 			{
3453 				/*
3454 				 * If we are inside a transaction, we cannot invalidate
3455 				 * query cache yet. However we can clear cache buffer, if
3456 				 * DML/DDL modifies the TABLE which SELECT uses.
3457 				 */
3458 				if (num_oids > 0 && pool_config->memqcache_auto_cache_invalidation)
3459 				{
3460 					pool_check_and_discard_cache_buffer(num_oids, oids);
3461 					pool_reset_memqcache_buffer(false);
3462 				}
3463 
3464 				/* Count up temporary SELECT stats */
3465 				pool_tmp_stats_count_up_num_selects();
3466 			}
3467 		}
3468 		/*
3469 		 * If the query is DROP DATABASE, discard both of caches in shmem/memcached and
3470 		 * oidmap in memqcache_oiddir.
3471 		 */
3472 		else if (is_drop_database(node) && session_context->query_context->dboid != 0)
3473 		{
3474 			int dboid = session_context->query_context->dboid;
3475 			num_oids = pool_get_dropdb_table_oids(&oids, dboid);
3476 
3477 			if (num_oids > 0 && pool_config->memqcache_auto_cache_invalidation)
3478 			{
3479 				pool_shmem_lock();
3480 				pool_invalidate_query_cache(num_oids, oids, true, dboid);
3481 				pool_discard_oid_maps_by_db(dboid);
3482 				pool_shmem_unlock();
3483 				pool_reset_memqcache_buffer(true);
3484 
3485 				pfree(oids);
3486 				ereport(DEBUG2,
3487 					(errmsg("query cache handler for ReadyForQuery"),
3488 						 errdetail("deleted all cache files for the DROPped DB")));
3489 			}
3490 		}
3491 		else
3492 		{
3493 			/*
3494 			 * DML/DCL/DDL case
3495 			 */
3496 
3497 			/* Extract table oids from buffer */
3498 			num_oids = pool_get_dml_table_oid(&oids);
3499 			if (num_oids > 0 && pool_config->memqcache_auto_cache_invalidation)
3500 			{
3501 				/*
3502 				 * If we are not inside a transaction, we can
3503 				 * immediately invalidate query cache.
3504 				 */
3505 				if (state == 'I')
3506 				{
3507 					POOL_SETMASK2(&BlockSig, &oldmask);
3508 					pool_shmem_lock();
3509 					pool_invalidate_query_cache(num_oids, oids, true, 0);
3510 					pool_shmem_unlock();
3511 					POOL_SETMASK(&oldmask);
3512 					pool_reset_memqcache_buffer(true);
3513 				}
3514 				else
3515 				{
3516 					/*
3517 					 * If we are inside a transaction, we
3518 					 * cannot invalidate query cache
3519 					 * yet. However we can clear cache buffer,
3520 					 * if DML/DDL modifies the TABLE which SELECT uses.
3521 					 */
3522 					pool_check_and_discard_cache_buffer(num_oids, oids);
3523 					pool_reset_memqcache_buffer(false);
3524 				}
3525 			}
3526 			else if (num_oids == 0)
3527 			{
3528 				/*
3529 				 * It is also necessary to clear cache buffers in case of
3530 				 * no oid queries (like BEGIN, CHECKPOINT, VACUUM, etc) too.
3531 				 */
3532 				pool_reset_memqcache_buffer(true);
3533 			}
3534 		}
3535 	}
3536 }
3537 
3538 /*
3539  * Create and initialize query cache stats
3540  */
3541 static POOL_QUERY_CACHE_STATS *stats;
pool_init_memqcache_stats(void)3542 int pool_init_memqcache_stats(void)
3543 {
3544 	stats = pool_shared_memory_create(sizeof(POOL_QUERY_CACHE_STATS));
3545 	pool_reset_memqcache_stats();
3546 	return 0;
3547 }
3548 
3549 /*
3550  * Returns copy of stats area. The copy is in static area and will be
3551  * overwritten by next call to this function.
3552  */
pool_get_memqcache_stats(void)3553 POOL_QUERY_CACHE_STATS *pool_get_memqcache_stats(void)
3554 {
3555 	static POOL_QUERY_CACHE_STATS mystats;
3556 	pool_sigset_t oldmask;
3557 
3558 	memset(&mystats, 0, sizeof(POOL_QUERY_CACHE_STATS));
3559 
3560 	if (stats)
3561 	{
3562 		POOL_SETMASK2(&BlockSig, &oldmask);
3563 		pool_semaphore_lock(QUERY_CACHE_STATS_SEM);
3564 		memcpy(&mystats, stats, sizeof(POOL_QUERY_CACHE_STATS));
3565 		pool_semaphore_unlock(QUERY_CACHE_STATS_SEM);
3566 		POOL_SETMASK(&oldmask);
3567 	}
3568 
3569 	return &mystats;
3570 }
3571 
3572 /*
3573  * Reset query cache stats. Caller must lock QUERY_CACHE_STATS_SEM if
3574  * necessary.
3575  */
pool_reset_memqcache_stats(void)3576 void pool_reset_memqcache_stats(void)
3577 {
3578 	memset(stats, 0, sizeof(POOL_QUERY_CACHE_STATS));
3579 	stats->start_time = time(NULL);
3580 }
3581 
3582 /*
3583  * Count up number of successful SELECTs and returns the number.
3584  * QUERY_CACHE_STATS_SEM lock is acquired in this function.
3585  */
pool_stats_count_up_num_selects(long long int num)3586 long long int pool_stats_count_up_num_selects(long long int num)
3587 {
3588 	pool_sigset_t oldmask;
3589 
3590 	POOL_SETMASK2(&BlockSig, &oldmask);
3591 	pool_semaphore_lock(QUERY_CACHE_STATS_SEM);
3592 	stats->num_selects += num;
3593 	pool_semaphore_unlock(QUERY_CACHE_STATS_SEM);
3594 	POOL_SETMASK(&oldmask);
3595 	return stats->num_selects;
3596 }
3597 
3598 /*
3599  * Count up number of successful SELECTs in temporary area and returns
3600  * the number.
3601  */
pool_tmp_stats_count_up_num_selects(void)3602 long long int pool_tmp_stats_count_up_num_selects(void)
3603 {
3604 	POOL_SESSION_CONTEXT *session_context;
3605 
3606 	session_context = pool_get_session_context(false);
3607 	session_context->num_selects++;
3608 	return 	session_context->num_selects;
3609 }
3610 
3611 /*
3612  * Return number of successful SELECTs in temporary area.
3613  */
pool_tmp_stats_get_num_selects(void)3614 long long int pool_tmp_stats_get_num_selects(void)
3615 {
3616 	POOL_SESSION_CONTEXT *session_context;
3617 
3618 	session_context = pool_get_session_context(false);
3619 	return session_context->num_selects;
3620 }
3621 
3622 /*
3623  * Reset number of successful SELECTs in temporary area.
3624  */
pool_tmp_stats_reset_num_selects(void)3625 void pool_tmp_stats_reset_num_selects(void)
3626 {
3627 	POOL_SESSION_CONTEXT *session_context;
3628 
3629 	session_context = pool_get_session_context(false);
3630 	session_context->num_selects = 0;
3631 }
3632 
3633 /*
3634  * Count up number of SELECTs extracted from cache returns the number.
3635  * QUERY_CACHE_STATS_SEM lock is acquired in this function.
3636  */
pool_stats_count_up_num_cache_hits(void)3637 long long int pool_stats_count_up_num_cache_hits(void)
3638 {
3639 	pool_sigset_t oldmask;
3640 
3641 	POOL_SETMASK2(&BlockSig, &oldmask);
3642 	pool_semaphore_lock(QUERY_CACHE_STATS_SEM);
3643 	stats->num_cache_hits++;
3644 	pool_semaphore_unlock(QUERY_CACHE_STATS_SEM);
3645 	POOL_SETMASK(&oldmask);
3646 	return stats->num_cache_hits;
3647 }
3648 
3649 /*
3650  * On shared memory hash table implementation.  We use sub part of md5
3651  * hash key as hash function.  The experiment has shown that has_any()
3652  * of PostgreSQL is a little bit better than the method using part of
3653  * md5 hash value, but it seems adding some cpu cycles to call
3654  * hash_any() is not worth the trouble.
3655  */
3656 
3657 static volatile POOL_HASH_HEADER *hash_header;
3658 static volatile POOL_HASH_ELEMENT *hash_elements;
3659 static volatile POOL_HASH_ELEMENT *hash_free;
3660 
3661 /*
3662  * Initialize hash table on shared memory "nelements" is max number of
3663  * hash keys. The actual number of hash key is rounded up to power of
3664  * 2.
3665  */
3666 #undef POOL_HASH_DEBUG
3667 
pool_hash_init(int nelements)3668 int pool_hash_init(int nelements)
3669 {
3670 	size_t size;
3671 	int nelements2;		/* number of rounded up hash keys */
3672 	int shift;
3673 	uint32 mask;
3674 	POOL_HASH_HEADER hh;
3675 	int i;
3676 
3677 	if (nelements <= 0)
3678 		ereport(ERROR,
3679 			(errmsg("initializing hash table on shared memory, invalid number of elements: %d",nelements)));
3680 
3681 	/* Round up to power of 2 */
3682 	shift = 32;
3683 	nelements2 = 1;
3684 	do
3685 	{
3686 		nelements2 <<= 1;
3687 		shift--;
3688 	} while (nelements2 < nelements);
3689 
3690 	mask = ~0;
3691 	mask >>= shift;
3692 	size = (char *)&hh.elements - (char *)&hh + sizeof(POOL_HEADER_ELEMENT)*nelements2;
3693 	hash_header = pool_shared_memory_create(size);
3694 	hash_header->nhash = nelements2;
3695     hash_header->mask = mask;
3696 
3697 #ifdef POOL_HASH_DEBUG
3698 	ereport(LOG,
3699 		(errmsg("initializing hash table on shared memory"),
3700 			 errdetail("size:%zd nelements2:%d", size, nelements2)));
3701 
3702 #endif
3703 
3704 	size = sizeof(POOL_HASH_ELEMENT)*nelements2;
3705 	hash_elements = pool_shared_memory_create(size);
3706 
3707 #ifdef POOL_HASH_DEBUG
3708 	ereport(LOG,
3709 		(errmsg("initializing hash table on shared memory"),
3710 			 errdetail("size:%zd nelements2:%d", size, nelements2)));
3711 #endif
3712 
3713 	for (i=0;i<nelements2-1;i++)
3714 	{
3715 		hash_elements[i].next = (POOL_HASH_ELEMENT *)&hash_elements[i+1];
3716 	}
3717 	hash_elements[nelements2-1].next = NULL;
3718 	hash_free = hash_elements;
3719 
3720 	return 0;
3721 }
3722 
3723 /*
3724  * Reset hash table on shared memory "nelements" is max number of
3725  * hash keys. The actual number of hash key is rounded up to power of
3726  * 2.
3727  */
3728 static int
pool_hash_reset(int nelements)3729 pool_hash_reset(int nelements)
3730 {
3731 	size_t size;
3732 	int nelements2;		/* number of rounded up hash keys */
3733 	int shift;
3734 	uint32 mask;
3735 	POOL_HASH_HEADER hh;
3736 	int i;
3737 
3738 	if (nelements <= 0)
3739 		ereport(ERROR,
3740 				(errmsg("clearing hash table on shared memory, invalid number of elements: %d",nelements)));
3741 
3742 	/* Round up to power of 2 */
3743 	shift = 32;
3744 	nelements2 = 1;
3745 	do
3746 	{
3747 		nelements2 <<= 1;
3748 		shift--;
3749 	} while (nelements2 < nelements);
3750 
3751 	mask = ~0;
3752 	mask >>= shift;
3753 
3754 	size = (char *)&hh.elements - (char *)&hh + sizeof(POOL_HEADER_ELEMENT)*nelements2;
3755 	memset((void *)hash_header, 0, size);
3756 
3757 	hash_header->nhash = nelements2;
3758     	hash_header->mask = mask;
3759 
3760 	size = sizeof(POOL_HASH_ELEMENT)*nelements2;
3761 	memset((void *)hash_elements, 0, size);
3762 
3763 	for (i=0;i<nelements2-1;i++)
3764 	{
3765 		hash_elements[i].next = (POOL_HASH_ELEMENT *)&hash_elements[i+1];
3766 	}
3767 	hash_elements[nelements2-1].next = NULL;
3768 	hash_free = hash_elements;
3769 
3770 	return 0;
3771 }
3772 
3773 /*
3774  * Search cacheid by MD5 hash key string
3775  * If found, returns cache id, otherwise NULL.
3776  */
pool_hash_search(POOL_QUERY_HASH * key)3777 POOL_CACHEID *pool_hash_search(POOL_QUERY_HASH *key)
3778 {
3779 	volatile POOL_HASH_ELEMENT *element;
3780 
3781 	uint32 hash_key = create_hash_key(key);
3782 
3783 	if (hash_key >= hash_header->nhash)
3784 	{
3785 		ereport(WARNING,
3786 			(errmsg("memcache: searching cacheid from hash. invalid hash key"),
3787 				errdetail("invalid hash key: %uld nhash: %ld",
3788 					   hash_key, hash_header->nhash)));
3789 		return NULL;
3790 	}
3791 
3792 	{
3793 		char md5[POOL_MD5_HASHKEYLEN+1];
3794 		memcpy(md5, key->query_hash, POOL_MD5_HASHKEYLEN);
3795 		md5[POOL_MD5_HASHKEYLEN] = '\0';
3796 #ifdef POOL_HASH_DEBUG
3797 		ereport(LOG,
3798 			(errmsg("searching hash table"),
3799 				 errdetail("hash_key:%d md5:%s", hash_key, md5)));
3800 #endif
3801 	}
3802 
3803 	element = hash_header->elements[hash_key].element;
3804 	while (element)
3805 	{
3806 		{
3807 			char md5[POOL_MD5_HASHKEYLEN+1];
3808 			memcpy(md5, key->query_hash, POOL_MD5_HASHKEYLEN);
3809 			md5[POOL_MD5_HASHKEYLEN] = '\0';
3810 #ifdef POOL_HASH_DEBUG
3811 			ereport(LOG,
3812 				(errmsg("searching hash table"),
3813 					 errdetail("element md5:%s", md5)));
3814 #endif
3815 		}
3816 
3817 		if (memcmp((const void *)element->hashkey.query_hash,
3818 				   (const void *)key->query_hash, sizeof(key->query_hash)) == 0)
3819 		{
3820 			return (POOL_CACHEID *)&element->cacheid;
3821 		}
3822 		element = element->next;
3823 	}
3824 	return NULL;
3825 }
3826 
3827 /*
3828  * Insert MD5 key and associated cache id into shmem hash table.  If
3829  * "update" is true, replace cacheid associated with the MD5 key,
3830  * rather than throw an error.
3831  */
pool_hash_insert(POOL_QUERY_HASH * key,POOL_CACHEID * cacheid,bool update)3832 static int pool_hash_insert(POOL_QUERY_HASH *key, POOL_CACHEID *cacheid, bool update)
3833 {
3834 	POOL_HASH_ELEMENT *element;
3835 	POOL_HASH_ELEMENT *new_element;
3836 
3837 	uint32 hash_key = create_hash_key(key);
3838 
3839 	if (hash_key >= hash_header->nhash)
3840 	{
3841 		ereport(WARNING,
3842 			(errmsg("memcache: adding cacheid to hash. invalid hash key"),
3843 				 errdetail("invalid hash key: %uld nhash: %ld",
3844 						   hash_key, hash_header->nhash)));
3845 		return -1;
3846 	}
3847 
3848 	{
3849 		char md5[POOL_MD5_HASHKEYLEN+1];
3850 		memcpy(md5, key->query_hash, POOL_MD5_HASHKEYLEN);
3851 		md5[POOL_MD5_HASHKEYLEN] = '\0';
3852 #ifdef POOL_HASH_DEBUG
3853 		ereport(LOG,
3854 			(errmsg("searching hash table"),
3855 				 errdetail("hash_key:%d md5:%s block:%d item:%d", hash_key, md5, cacheid->blockid, cacheid->itemid)));
3856 #endif
3857 	}
3858 
3859 	/*
3860 	 * Look for hash key.
3861 	 */
3862 	element = hash_header->elements[hash_key].element;
3863 
3864 	while (element)
3865 	{
3866 		if (memcmp((const void *)element->hashkey.query_hash,
3867 				   (const void *)key->query_hash, sizeof(key->query_hash)) == 0)
3868 		{
3869 			/* Hash key found. If "update" is false, just throw an error. */
3870 			char md5[POOL_MD5_HASHKEYLEN+1];
3871 
3872 			if (!update)
3873 			{
3874 				memcpy(md5, key->query_hash, POOL_MD5_HASHKEYLEN);
3875 				md5[POOL_MD5_HASHKEYLEN] = '\0';
3876 				ereport(LOG,
3877 					(errmsg("memcache: adding cacheid to hash. hash key:\"%s\" already exists",md5)));
3878 				return -1;
3879 			}
3880 			else
3881 			{
3882 				/* Update cache id */
3883 				memcpy((void *)&element->cacheid, cacheid, sizeof(POOL_CACHEID));
3884 				return 0;
3885 			}
3886 		}
3887 		element = element->next;
3888 	}
3889 
3890 	/*
3891 	 * Ok, same key did not exist. Just insert new hash key.
3892 	 */
3893 	new_element = (POOL_HASH_ELEMENT *)get_new_hash_element();
3894 	if (!new_element)
3895 	{
3896 		ereport(LOG,
3897 				(errmsg("memcache: adding cacheid to hash. failed to get new element")));
3898 		return -1;
3899 	}
3900 
3901 	element = hash_header->elements[hash_key].element;
3902 
3903 	hash_header->elements[hash_key].element = new_element;
3904 	new_element->next = element;
3905 
3906 	memcpy((void *)new_element->hashkey.query_hash, key->query_hash, POOL_MD5_HASHKEYLEN);
3907 	memcpy((void *)&new_element->cacheid, cacheid, sizeof(POOL_CACHEID));
3908 
3909 	return 0;
3910 }
3911 
3912 /*
3913  * Delete MD5 key and associated cache id from shmem hash table.
3914  */
pool_hash_delete(POOL_QUERY_HASH * key)3915 int pool_hash_delete(POOL_QUERY_HASH *key)
3916 {
3917 	POOL_HASH_ELEMENT *element;
3918 	POOL_HASH_ELEMENT **delete_point;
3919 	bool found;
3920 
3921 	uint32 hash_key = create_hash_key(key);
3922 
3923 	if (hash_key >= hash_header->nhash)
3924 	{
3925 		ereport(LOG,
3926 			(errmsg("memcache: deleting key from hash. invalid key"),
3927 				 errdetail("invalid hash key: %uld nhash: %ld",
3928 						   hash_key, hash_header->nhash)));
3929 		return -1;
3930 	}
3931 
3932 	/*
3933 	 * Look for delete location
3934 	 */
3935 	found = false;
3936 	delete_point = (POOL_HASH_ELEMENT **)&(hash_header->elements[hash_key].element);
3937 	element = hash_header->elements[hash_key].element;
3938 
3939 	while (element)
3940 	{
3941 		if (memcmp(element->hashkey.query_hash, key->query_hash, sizeof(key->query_hash)) == 0)
3942 		{
3943 			found = true;
3944 			break;
3945 		}
3946 		delete_point = &element->next;
3947 		element = element->next;
3948 	}
3949 
3950 	if (!found)
3951 	{
3952 		char md5[POOL_MD5_HASHKEYLEN+1];
3953 
3954 		memcpy(md5, key->query_hash, POOL_MD5_HASHKEYLEN);
3955 		md5[POOL_MD5_HASHKEYLEN] = '\0';
3956 		ereport(LOG,
3957 			(errmsg("memcache: deleting key from hash. key:\"%s\" not found",md5)));
3958 		return -1;
3959 	}
3960 
3961 	/*
3962 	 * Put back the element to free list
3963 	 */
3964 	*delete_point = element->next;
3965 	put_back_hash_element(element);
3966 
3967 	return 0;
3968 }
3969 
3970 /*
3971  * Calculate 32bit binary hash key (i.e. location in hash header) from MD5
3972  * string. We use top most 8 characters of MD5 string for calculation.
3973 */
create_hash_key(POOL_QUERY_HASH * key)3974 static uint32 create_hash_key(POOL_QUERY_HASH *key)
3975 {
3976 #define POOL_HASH_NCHARS 8
3977 
3978 	char md5[POOL_HASH_NCHARS+1];
3979 	uint32 mask;
3980 
3981 	memcpy(md5, key->query_hash, POOL_HASH_NCHARS);
3982 	md5[POOL_HASH_NCHARS] = '\0';
3983 	mask = strtoul(md5, NULL, 16);
3984 	mask &= hash_header->mask;
3985 	return mask;
3986 }
3987 
3988 /*
3989  * Get new free hash element from free list.
3990  */
get_new_hash_element(void)3991 static volatile POOL_HASH_ELEMENT *get_new_hash_element(void)
3992 {
3993 	volatile POOL_HASH_ELEMENT *elm;
3994 
3995 	if (!hash_free->next)
3996 	{
3997 		/* No free element */
3998 		return NULL;
3999 	}
4000 
4001 #ifdef POOL_HASH_DEBUG
4002 	ereport(LOG,
4003 		(errmsg("getting new hash element"),
4004 			errdetail("hash_free->next:%p hash_free->next->next:%p",
4005 				   hash_free->next, hash_free->next->next)));
4006 #endif
4007 
4008 	elm = hash_free->next;
4009 	hash_free->next = elm->next;
4010 
4011 	return elm;
4012 }
4013 
4014 /*
4015  * Put back hash element to free list.
4016  */
put_back_hash_element(volatile POOL_HASH_ELEMENT * element)4017 static void put_back_hash_element(volatile POOL_HASH_ELEMENT *element)
4018 {
4019 	POOL_HASH_ELEMENT *elm;
4020 
4021 #ifdef POOL_HASH_DEBUG
4022 	ereport(LOG,
4023 		(errmsg("getting new hash element"),
4024 			errdetail("hash_free->next:%p hash_free->next->next:%p",
4025 				   hash_free->next, hash_free->next->next)));
4026 #endif
4027 
4028 	elm = hash_free->next;
4029 	hash_free->next = (POOL_HASH_ELEMENT *)element;
4030 	element->next = elm;
4031 }
4032 
4033 /*
4034  * Return true if there's a free hash element.
4035  */
is_free_hash_element(void)4036 static bool is_free_hash_element(void)
4037 {
4038 	return hash_free->next != NULL;
4039 }
4040 
4041 /*
4042  * Returns shared memory cache stats.
4043  * Subsequent call to this function will break return value
4044  * because its in static memory.
4045  * Caller must hold shmem_lock before calling this function.
4046  * If in memory query cache is not enabled, all stats are 0.
4047  */
pool_get_shmem_storage_stats(void)4048 POOL_SHMEM_STATS *pool_get_shmem_storage_stats(void)
4049 {
4050 	static POOL_SHMEM_STATS mystats;
4051 	POOL_HASH_ELEMENT *element;
4052 	int nblocks;
4053 	int i;
4054 
4055 	memset(&mystats, 0, sizeof(POOL_SHMEM_STATS));
4056 
4057 	if (!pool_config-> memory_cache_enabled)
4058 		return &mystats;
4059 
4060 	/*
4061 	 * Copy cache hit data
4062 	 */
4063 	mystats.cache_stats.num_selects = stats->num_selects;
4064 	mystats.cache_stats.num_cache_hits = stats->num_cache_hits;
4065 
4066 	if (pool_config->memqcache_method != SHMEM_CACHE)
4067 		return &mystats;
4068 
4069 	/* number of total hash entries */
4070 	mystats.num_hash_entries = hash_header->nhash;
4071 
4072 	/* number of used hash entries */
4073 	for (i=0;i<hash_header->nhash;i++)
4074 	{
4075 		element = hash_header->elements[i].element;
4076 		while (element)
4077 		{
4078 			mystats.used_hash_entries++;
4079 			element = element->next;
4080 		}
4081 	}
4082 
4083 	nblocks = pool_get_memqcache_blocks();
4084 
4085 	for (i=0;i<nblocks;i++)
4086 	{
4087 		POOL_CACHE_BLOCK_HEADER *bh;
4088 		POOL_CACHE_ITEM_POINTER *cip;
4089 		char *p = block_address(i);
4090 		bh = (POOL_CACHE_BLOCK_HEADER *)p;
4091 		int j;
4092 
4093 		if (bh->flags & POOL_BLOCK_USED)
4094 		{
4095 			for (j=0;j<bh->num_items;j++)
4096 			{
4097 				cip = item_pointer(p, j);
4098 				if (POOL_ITEM_DELETED & cip->flags)
4099 				{
4100 					mystats.fragment_cache_entries_size += item_header(p, j)->total_length;
4101 				}
4102 				else
4103 				{
4104 					/* number of used cache entries */
4105 					mystats.num_cache_entries++;
4106 					/* total size of used cache entries */
4107 					mystats.used_cache_entries_size += (item_header(p, j)->total_length + sizeof(POOL_CACHE_ITEM_POINTER));
4108 				}
4109 			}
4110 			mystats.used_cache_entries_size += sizeof(POOL_CACHE_BLOCK_HEADER);
4111 			/* total size of free(usable) cache entries */
4112 			mystats.free_cache_entries_size += bh->free_bytes;
4113 		}
4114 		else
4115 		{
4116 			mystats.free_cache_entries_size += pool_config->memqcache_cache_block_size;
4117 		}
4118 	}
4119 
4120 	/*
4121 	 * Copy POOL_QUERY_CACHE_STATS
4122 	 */
4123 	memcpy(&mystats.cache_stats, stats, sizeof(mystats.cache_stats));
4124 
4125 	return &mystats;
4126 }
4127 
4128 /*
4129  * Inject cached message to the target backend buffer to pretend as if backend
4130  * actually replies with Data row and Command Complete message.
4131  */
inject_cached_message(POOL_CONNECTION * backend,char * qcache,int qcachelen)4132 static void inject_cached_message(POOL_CONNECTION *backend, char *qcache, int qcachelen)
4133 {
4134 	char kind;
4135 	int len;
4136 	char *buf;
4137 	int timeout;
4138 	int i = 0;
4139 	bool is_prepared_stmt = false;
4140 	POOL_SESSION_CONTEXT *session_context;
4141 	POOL_QUERY_CONTEXT* query_context;
4142 	POOL_PENDING_MESSAGE *msg;
4143 
4144 	session_context = pool_get_session_context(false);
4145 	query_context = session_context->query_context;
4146 	msg = pool_pending_message_find_lastest_by_query_context(query_context);
4147 
4148 	if (msg)
4149 	{
4150 		/*
4151 		 * If pending message found, we should extract target backend from it
4152 		 */
4153 		int backend_id;
4154 
4155 		backend_id = pool_pending_message_get_target_backend_id(msg);
4156 		backend = CONNECTION(session_context->backend, backend_id);
4157 		timeout = -1;
4158 	}
4159 	else
4160 		timeout = 0;
4161 
4162 	/* Send flush messsage to backend to retrieve response of backend */
4163 	pool_write(backend, "H", 1);
4164 	len = htonl(sizeof(len));
4165 	pool_write_and_flush(backend, &len, sizeof(len));
4166 
4167 	/*
4168 	 * Push any response from backend
4169 	 */
4170 	for(;;)
4171 	{
4172 		/* check if there's any pending data */
4173 		if (!pool_ssl_pending(backend) && pool_read_buffer_is_empty(backend))
4174 		{
4175 			pool_set_timeout(timeout);
4176 			if (pool_check_fd(backend) != 0)
4177 			{
4178 				ereport(DEBUG1,
4179 						(errmsg("inject_cached_message: select shows no pending data")));
4180 				pool_set_timeout(-1);
4181 				break;
4182 			}
4183 			pool_set_timeout(-1);
4184 		}
4185 
4186 		pool_read(backend, &kind, 1);
4187 		ereport(DEBUG1,
4188 				(errmsg("inject_cached_message: push message kind: '%c'", kind)));
4189 		if (msg &&
4190 			((kind == 'T' && msg->type == POOL_DESCRIBE) ||
4191 			 (kind == '2' && msg->type == POOL_BIND)))
4192 		{
4193 			/* Pending message seen. Now it is likely to end of pending data */
4194 			timeout = 0;
4195 		}
4196 		pool_push(backend, &kind, sizeof(kind));
4197 		pool_read(backend, &len, sizeof(len));
4198 		pool_push(backend, &len, sizeof(len));
4199 		if ((ntohl(len)-sizeof(len)) > 0)
4200 		{
4201 			buf = pool_read2(backend, ntohl(len)-sizeof(len));
4202 			pool_push(backend, buf, ntohl(len)-sizeof(len));
4203 		}
4204 	}
4205 
4206 	/*
4207 	 * Inject row data and command complete
4208 	 */
4209 	while (i < qcachelen)
4210 	{
4211 		char tmpkind;
4212 		int tmplen;
4213 		char *p;
4214 
4215 		tmpkind = qcache[i];
4216 		i++;
4217 
4218 		memcpy(&tmplen, qcache+i, sizeof(tmplen));
4219 		i += sizeof(tmplen);
4220 		len = ntohl(tmplen);
4221 		p = qcache + i;
4222 		i += len - sizeof(tmplen);
4223 
4224 		/* No need to cache PARSE and BIND responses */
4225 		if (tmpkind == '1' || tmpkind == '2')
4226 		{
4227 			is_prepared_stmt = true;
4228 			continue;
4229 		}
4230 
4231 		/*
4232 		 * In the prepared statement execution, there is no need to send
4233 		 * 'T' response to the frontend.
4234 		 */
4235 		if (is_prepared_stmt && tmpkind == 'T')
4236 		{
4237 			continue;
4238 		}
4239 
4240 		/* push message */
4241 		ereport(DEBUG1,
4242 			(errmsg("inject_cached_message: push cached messages: '%c' len: %d", tmpkind, len)));
4243 		pool_push(backend, &tmpkind, 1);
4244 		pool_push(backend, &tmplen, sizeof(tmplen));
4245 		if (len > 0)
4246 			pool_push(backend, p, len - sizeof(tmplen));
4247 	}
4248 
4249 	/*
4250 	 * Pop data.
4251 	 */
4252 	pool_pop(backend, &len);
4253 }
4254