1 /* -*-pgsql-c-*- */
2 /*
3  * pgpool: a language independent connection pool server for PostgreSQL
4  * written by Tatsuo Ishii
5  *
6  * Copyright (c) 2003-2020	PgPool Global Development Group
7  *
8  * Permission to use, copy, modify, and distribute this software and
9  * its documentation for any purpose and without fee is hereby
10  * granted, provided that the above copyright notice appear in all
11  * copies and that both that copyright notice and this permission
12  * notice appear in supporting documentation, and that the name of the
13  * author not be used in advertising or publicity pertaining to
14  * distribution of the software without specific, written prior
15  * permission. The author makes no representations about the
16  * suitability of this software for any purpose.  It is provided "as
17  * is" without express or implied warranty.
18  *
19  * pool_memqcache.c: query cache on shmem or memcached
20  *
21  */
22 #define DATABASE_TO_OID_QUERY "SELECT oid FROM pg_database WHERE datname = '%s'"
23 
24 #include "pool.h"
25 
26 #include <stdio.h>
27 #include <string.h>
28 #include <unistd.h>
29 #include <sys/file.h>
30 #include <sys/stat.h>
31 #include <sys/types.h>
32 #include <fcntl.h>
33 #include <errno.h>
34 #include <string.h>
35 #include <stdlib.h>
36 #include <ctype.h>
37 #include <arpa/inet.h>
38 #include <dirent.h>
39 
40 #ifdef USE_MEMCACHED
41 #include <libmemcached/memcached.h>
42 #endif
43 
44 #include "auth/md5.h"
45 #include "pool_config.h"
46 #include "protocol/pool_proto_modules.h"
47 #include "parser/parsenodes.h"
48 #include "context/pool_session_context.h"
49 #include "query_cache/pool_memqcache.h"
50 #include "utils/pool_relcache.h"
51 #include "utils/pool_select_walker.h"
52 #include "utils/pool_stream.h"
53 #include "utils/pool_stream.h"
54 #include "utils/elog.h"
55 #include "utils/palloc.h"
56 #include "utils/memutils.h"
57 
58 
59 #ifdef USE_MEMCACHED
60 memcached_st *memc;
61 #endif
62 
63 static char *encode_key(const char *s, char *buf, POOL_CONNECTION_POOL * backend);
64 #ifdef DEBUG
65 static void dump_cache_data(const char *data, size_t len);
66 #endif
67 static int	pool_commit_cache(POOL_CONNECTION_POOL * backend, char *query, char *data, size_t datalen, int num_oids, int *oids);
68 static int	pool_fetch_cache(POOL_CONNECTION_POOL * backend, const char *query, char **buf, size_t *len);
69 static int	send_cached_messages(POOL_CONNECTION * frontend, const char *qcache, int qcachelen);
70 static void send_message(POOL_CONNECTION * conn, char kind, int len, const char *data);
71 #ifdef USE_MEMCACHED
72 static int	delete_cache_on_memcached(const char *key);
73 #endif
74 static int	pool_get_dml_table_oid(int **oid);
75 static int	pool_get_dropdb_table_oids(int **oids, int dboid);
76 static void pool_discard_dml_table_oid(void);
77 static void pool_invalidate_query_cache(int num_table_oids, int *table_oid, bool unlink, int dboid);
78 static int	pool_get_database_oid(void);
79 static void pool_add_table_oid_map(POOL_CACHEKEY * cachkey, int num_table_oids, int *table_oids);
80 static void pool_reset_memqcache_buffer(bool reset_dml_oids);
81 static POOL_CACHEID * pool_add_item_shmem_cache(POOL_QUERY_HASH * query_hash, char *data, int size);
82 static POOL_CACHEID * pool_find_item_on_shmem_cache(POOL_QUERY_HASH * query_hash);
83 static char *pool_get_item_shmem_cache(POOL_QUERY_HASH * query_hash, int *size, int *sts);
84 static POOL_QUERY_CACHE_ARRAY * pool_add_query_cache_array(POOL_QUERY_CACHE_ARRAY * cache_array, POOL_TEMP_QUERY_CACHE * cache);
85 static void pool_add_temp_query_cache(POOL_TEMP_QUERY_CACHE * temp_cache, char kind, char *data, int data_len);
86 static void pool_add_oids_temp_query_cache(POOL_TEMP_QUERY_CACHE * temp_cache, int num_oids, int *oids);
87 static POOL_INTERNAL_BUFFER * pool_create_buffer(void);
88 static void pool_discard_buffer(POOL_INTERNAL_BUFFER * buffer);
89 static void pool_add_buffer(POOL_INTERNAL_BUFFER * buffer, void *data, size_t len);
90 static void *pool_get_buffer(POOL_INTERNAL_BUFFER * buffer, size_t *len);
91 #ifdef NOT_USED
92 static char *pool_get_buffer_pointer(POOL_INTERNAL_BUFFER * buffer);
93 #endif
94 static char *pool_get_current_cache_buffer(size_t *len);
95 static size_t pool_get_buffer_length(POOL_INTERNAL_BUFFER * buffer);
96 static void pool_check_and_discard_cache_buffer(int num_oids, int *oids);
97 
98 static void pool_set_memqcache_blocks(int num_blocks);
99 static int	pool_get_memqcache_blocks(void);
100 static void *pool_memory_cache_address(void);
101 static void pool_reset_fsmm(size_t size);
102 static void *pool_fsmm_address(void);
103 static void pool_update_fsmm(POOL_CACHE_BLOCKID blockid, size_t free_space);
104 static POOL_CACHE_BLOCKID pool_get_block(size_t free_space);
105 static POOL_CACHE_ITEM_HEADER * pool_cache_item_header(POOL_CACHEID * cacheid);
106 static int	pool_init_cache_block(POOL_CACHE_BLOCKID blockid);
107 #if NOT_USED
108 static void pool_wipe_out_cache_block(POOL_CACHE_BLOCKID blockid);
109 #endif
110 static int	pool_delete_item_shmem_cache(POOL_CACHEID * cacheid);
111 static char *block_address(int blockid);
112 static POOL_CACHE_ITEM_POINTER * item_pointer(char *block, int i);
113 static POOL_CACHE_ITEM_HEADER * item_header(char *block, int i);
114 static POOL_CACHE_BLOCKID pool_reuse_block(void);
115 #ifdef SHMEMCACHE_DEBUG
116 static void dump_shmem_cache(POOL_CACHE_BLOCKID blockid);
117 #endif
118 
119 static int	pool_hash_reset(int nelements);
120 static int	pool_hash_insert(POOL_QUERY_HASH * key, POOL_CACHEID * cacheid, bool update);
121 static uint32 create_hash_key(POOL_QUERY_HASH * key);
122 static volatile POOL_HASH_ELEMENT *get_new_hash_element(void);
123 static void put_back_hash_element(volatile POOL_HASH_ELEMENT * element);
124 static bool is_free_hash_element(void);
125 static void inject_cached_message(POOL_CONNECTION * backend, char *qcache, int qcachelen);
126 
127 /*
128  * Connect to Memcached
129  */
130 int
memcached_connect(void)131 memcached_connect(void)
132 {
133 	char	   *memqcache_memcached_host;
134 	int			memqcache_memcached_port;
135 #ifdef USE_MEMCACHED
136 	memcached_server_st *servers;
137 	memcached_return rc;
138 
139 	/* Already connected? */
140 	if (memc)
141 	{
142 		return 0;
143 	}
144 #endif
145 
146 	memqcache_memcached_host = pool_config->memqcache_memcached_host;
147 	memqcache_memcached_port = pool_config->memqcache_memcached_port;
148 
149 	ereport(DEBUG1,
150 			(errmsg("connecting to memcached on Host:\"%s:%d\"", memqcache_memcached_host, memqcache_memcached_port)));
151 
152 #ifdef USE_MEMCACHED
153 	memc = memcached_create(NULL);
154 	servers = memcached_server_list_append(NULL,
155 										   memqcache_memcached_host,
156 										   memqcache_memcached_port,
157 										   &rc);
158 
159 	rc = memcached_server_push(memc, servers);
160 	if (rc != MEMCACHED_SUCCESS)
161 	{
162 		ereport(WARNING,
163 				(errmsg("failed to connect to memcached, server push error:\"%s\"\n", memcached_strerror(memc, rc))));
164 		memc = (memcached_st *) - 1;
165 		return -1;
166 	}
167 	memcached_server_list_free(servers);
168 #else
169 	ereport(WARNING,
170 			(errmsg("failed to connect to memcached, memcached support is not enabled")));
171 	return -1;
172 #endif
173 	return 0;
174 }
175 
176 /*
177  * Disconnect to Memcached
178  */
179 void
memcached_disconnect(void)180 memcached_disconnect(void)
181 {
182 #ifdef USE_MEMCACHED
183 	if (!memc)
184 	{
185 		return;
186 	}
187 	memcached_free(memc);
188 #else
189 	ereport(WARNING,
190 			(errmsg("failed to disconnect from memcached, memcached support is not enabled")));
191 #endif
192 }
193 
194 /*
195  * Register buffer data for query cache in memory cache
196  */
197 void
memqcache_register(char kind,POOL_CONNECTION * frontend,char * data,int data_len)198 memqcache_register(char kind,
199 				   POOL_CONNECTION * frontend,
200 				   char *data,
201 				   int data_len)
202 {
203 	POOL_TEMP_QUERY_CACHE *cache;
204 	POOL_SESSION_CONTEXT *session_context;
205 	POOL_QUERY_CONTEXT *query_context;
206 
207 	cache = pool_get_current_cache();
208 
209 	if (cache == NULL)
210 	{
211 		session_context = pool_get_session_context(true);
212 
213 		if (session_context && pool_is_query_in_progress())
214 		{
215 			char	   *query = pool_get_query_string();
216 
217 			query_context = session_context->query_context;
218 
219 			if (query)
220 				query_context->temp_cache = pool_create_temp_query_cache(query);
221 		}
222 	}
223 
224 	pool_add_temp_query_cache(cache, kind, data, data_len);
225 }
226 
227 /*
228  * Commit SELECT results to cache storage.
229  */
230 static int
pool_commit_cache(POOL_CONNECTION_POOL * backend,char * query,char * data,size_t datalen,int num_oids,int * oids)231 pool_commit_cache(POOL_CONNECTION_POOL * backend, char *query, char *data, size_t datalen, int num_oids, int *oids)
232 {
233 #ifdef USE_MEMCACHED
234 	memcached_return rc;
235 #endif
236 	POOL_CACHEKEY cachekey;
237 	char		tmpkey[MAX_KEY];
238 	time_t		memqcache_expire;
239 
240 	/*
241 	 * get_buflen() will return -1 if query result exceeds memqcache_maxcache
242 	 */
243 	if (datalen == -1)
244 	{
245 		return -1;
246 	}
247 
248 	/* query disabled */
249 	if (strlen(query) <= 0)
250 	{
251 		return -1;
252 	}
253 	ereport(DEBUG1,
254 			(errmsg("commiting SELECT results to cache storage"),
255 			 errdetail("Query=\"%s\"", query)));
256 
257 #ifdef DEBUG
258 	dump_cache_data(data, datalen);
259 #endif
260 
261 	/* encode md5key for memcached */
262 	encode_key(query, tmpkey, backend);
263 	ereport(DEBUG2,
264 			(errmsg("commiting SELECT results to cache storage"),
265 			 errdetail("search key : \"%s\"", tmpkey)));
266 
267 	memcpy(cachekey.hashkey, tmpkey, 32);
268 
269 	memqcache_expire = pool_config->memqcache_expire;
270 	ereport(DEBUG1,
271 			(errmsg("commiting SELECT results to cache storage"),
272 			 errdetail("memqcache_expire = %ld", memqcache_expire)));
273 
274 	if (pool_is_shmem_cache())
275 	{
276 		POOL_CACHEID *cacheid;
277 		POOL_QUERY_HASH query_hash;
278 
279 		memcpy(query_hash.query_hash, tmpkey, sizeof(query_hash.query_hash));
280 
281 		cacheid = pool_hash_search(&query_hash);
282 
283 		if (cacheid != NULL)
284 		{
285 			ereport(DEBUG1,
286 					(errmsg("commiting SELECT results to cache storage"),
287 					 errdetail("item already exists")));
288 
289 			return 0;
290 		}
291 		else
292 		{
293 			cacheid = pool_add_item_shmem_cache(&query_hash, data, datalen);
294 			if (cacheid == NULL)
295 			{
296 				ereport(LOG,
297 						(errmsg("failed to add item to shmem cache")));
298 				return -1;
299 			}
300 			else
301 			{
302 				ereport(DEBUG2,
303 						(errmsg("commiting SELECT results to cache storage"),
304 						 errdetail("blockid: %d itemid: %d",
305 								   cacheid->blockid, cacheid->itemid)));
306 			}
307 			cachekey.cacheid.blockid = cacheid->blockid;
308 			cachekey.cacheid.itemid = cacheid->itemid;
309 		}
310 	}
311 
312 #ifdef USE_MEMCACHED
313 	else
314 	{
315 		rc = memcached_set(memc, tmpkey, 32,
316 						   data, datalen, (time_t) memqcache_expire, 0);
317 		if (rc != MEMCACHED_SUCCESS)
318 		{
319 			ereport(WARNING,
320 					(errmsg("cache commit failed with error:\"%s\"", memcached_strerror(memc, rc))));
321 			return -1;
322 		}
323 		ereport(DEBUG1,
324 				(errmsg("commiting SELECT results to cache storage"),
325 				 errdetail("set cache succeeded")));
326 	}
327 #endif
328 
329 	/*
330 	 * Register cache id to oid map
331 	 */
332 	pool_add_table_oid_map(&cachekey, num_oids, oids);
333 
334 	return 0;
335 }
336 
337 /*
338  * Fetch from memory cache.
339  * Return:
340  * 0: fetch success,
341  * 1: not found
342  */
343 static int
pool_fetch_cache(POOL_CONNECTION_POOL * backend,const char * query,char ** buf,size_t * len)344 pool_fetch_cache(POOL_CONNECTION_POOL * backend, const char *query, char **buf, size_t *len)
345 {
346 	char	   *ptr;
347 	char		tmpkey[MAX_KEY];
348 	int			sts;
349 	char	   *p;
350 
351 	if (strlen(query) <= 0)
352 		ereport(ERROR,
353 				(errmsg("fetching from cache storage, no query")));
354 
355 	/* encode md5key for memcached */
356 	encode_key(query, tmpkey, backend);
357 	ereport(DEBUG1,
358 			(errmsg("fetching from cache storage"),
359 			 errdetail("search key \"%s\"", tmpkey)));
360 
361 
362 	if (pool_is_shmem_cache())
363 	{
364 		POOL_QUERY_HASH query_hash;
365 		int			mylen;
366 
367 		memcpy(query_hash.query_hash, tmpkey, sizeof(query_hash.query_hash));
368 
369 		ptr = pool_get_item_shmem_cache(&query_hash, &mylen, &sts);
370 		if (ptr == NULL)
371 		{
372 			ereport(DEBUG1,
373 					(errmsg("fetching from cache storage"),
374 					 errdetail("cache not found on shared memory")));
375 
376 			return 1;
377 		}
378 		*len = mylen;
379 	}
380 #ifdef USE_MEMCACHED
381 	else
382 	{
383 		memcached_return rc;
384 		unsigned int flags;
385 
386 		ptr = memcached_get(memc, tmpkey, strlen(tmpkey), len, &flags, &rc);
387 
388 		if (rc != MEMCACHED_SUCCESS)
389 		{
390 			if (rc != MEMCACHED_NOTFOUND)
391 			{
392 				ereport(LOG,
393 						(errmsg("fetching from cache storage, memcached_get failed with error: \"%s\"", memcached_strerror(memc, rc))));
394 
395 				/*
396 				 * Turn off memory cache support to prevent future errors.
397 				 */
398 				pool_config->memory_cache_enabled = 0;
399 				/* Behave as if cache not found */
400 				return 1;
401 			}
402 			else
403 			{
404 				/* Not found */
405 				ereport(DEBUG1,
406 						(errmsg("fetching from cache storage"),
407 						 errdetail("cache item not found for key: \"%s\" and query:\"%s\"", tmpkey, query)));
408 				return 1;
409 			}
410 		}
411 	}
412 #else
413 	else
414 	{
415 		ereport(ERROR,
416 				(errmsg("memcached support is not enabled")));
417 	}
418 #endif
419 
420 	p = palloc(*len);
421 
422 	memcpy(p, ptr, *len);
423 
424 	if (!pool_is_shmem_cache())
425 	{
426 		free(ptr);
427 	}
428 
429 	ereport(DEBUG1,
430 			(errmsg("fetching from cache storage"),
431 			 errdetail("query=\"%s\" len:%zd", query, *len)));
432 #ifdef DEBUG
433 	dump_cache_data(p, *len);
434 #endif
435 
436 	*buf = p;
437 
438 	return 0;
439 }
440 
441 /*
442  * encode key.
443  * create cache key as md5(username + query string + database name)
444  */
445 static char *
encode_key(const char * s,char * buf,POOL_CONNECTION_POOL * backend)446 encode_key(const char *s, char *buf, POOL_CONNECTION_POOL * backend)
447 {
448 	char	   *strkey;
449 	int			u_length;
450 	int			d_length;
451 	int			q_length;
452 	int			length;
453 
454 	u_length = strlen(backend->info->user);
455 	ereport(DEBUG1,
456 			(errmsg("memcache encode key"),
457 			 errdetail("username: \"%s\" database_name: \"%s\"", backend->info->user, backend->info->database)));
458 
459 	d_length = strlen(backend->info->database);
460 
461 	q_length = strlen(s);
462 	ereport(DEBUG1,
463 			(errmsg("memcache encode key"),
464 			 errdetail("query: \"%s\"", s)));
465 
466 	length = u_length + d_length + q_length + 1;
467 
468 	strkey = (char *) palloc(sizeof(char) * length);
469 
470 	snprintf(strkey, length, "%s%s%s", backend->info->user, s, backend->info->database);
471 
472 	pool_md5_hash(strkey, strlen(strkey), buf);
473 	ereport(DEBUG1,
474 			(errmsg("memcache encode key"),
475 			 errdetail("`%s' -> `%s'", strkey, buf)));
476 	pfree(strkey);
477 	return buf;
478 }
479 
480 #ifdef DEBUG
481 /*
482  * dump cache data
483  */
484 static void
dump_cache_data(const char * data,size_t len)485 dump_cache_data(const char *data, size_t len)
486 {
487 	int			i;
488 	int			plen;
489 
490 
491 	fprintf(stderr, "shmem: len = %zd\n", len);
492 
493 	while (len > 0)
494 	{
495 		fprintf(stderr, "shmem: kind:%c\n", *data++);
496 		len--;
497 		memmove(&plen, data, 4);
498 		len -= 4;
499 		data += 4;
500 		plen = ntohl(plen);
501 		fprintf(stderr, "shmem: len:%d\n", plen);
502 		plen -= 4;
503 
504 		fprintf(stderr, "shmem: ");
505 		for (i = 0; i < plen; i++)
506 		{
507 			fprintf(stderr, "%02x ", (unsigned char) (*data++));
508 			len--;
509 		}
510 		fprintf(stderr, "\n");
511 	}
512 }
513 #endif
514 
515 /*
516  * send cached messages
517  */
518 static int
send_cached_messages(POOL_CONNECTION * frontend,const char * qcache,int qcachelen)519 send_cached_messages(POOL_CONNECTION * frontend, const char *qcache, int qcachelen)
520 {
521 	int			msg = 0;
522 	int			i = 0;
523 	int			is_prepared_stmt = 0;
524 	int			len;
525 	const char *p;
526 
527 	while (i < qcachelen)
528 	{
529 		char		tmpkind;
530 		int			tmplen;
531 
532 		tmpkind = qcache[i];
533 		i++;
534 
535 		memcpy(&tmplen, qcache + i, sizeof(tmplen));
536 		i += sizeof(tmplen);
537 		len = ntohl(tmplen);
538 		p = qcache + i;
539 		i += len - sizeof(tmplen);
540 
541 		/* No need to cache PARSE and BIND responses */
542 		if (tmpkind == '1' || tmpkind == '2')
543 		{
544 			is_prepared_stmt = 1;
545 			continue;
546 		}
547 
548 		/*
549 		 * In the prepared statement execution, there is no need to send 'T'
550 		 * response to the frontend.
551 		 */
552 		if (is_prepared_stmt && tmpkind == 'T')
553 		{
554 			continue;
555 		}
556 
557 		/* send message to frontend */
558 		ereport(DEBUG1,
559 				(errmsg("memcache: sending cached messages: '%c' len: %d", tmpkind, len)));
560 		send_message(frontend, tmpkind, len, p);
561 
562 		msg++;
563 	}
564 
565 	return msg;
566 }
567 
568 /*
569  * send message to frontend
570  */
571 static void
send_message(POOL_CONNECTION * conn,char kind,int len,const char * data)572 send_message(POOL_CONNECTION * conn, char kind, int len, const char *data)
573 {
574 	ereport(DEBUG2,
575 			(errmsg("memcache: sending messages: kind '%c', len=%d, data=%p", kind, len, data)));
576 
577 	pool_write(conn, &kind, 1);
578 
579 	len = htonl(len);
580 	pool_write(conn, &len, sizeof(len));
581 
582 	len = ntohl(len);
583 	pool_write(conn, (void *) data, len - sizeof(len));
584 }
585 
586 #ifdef USE_MEMCACHED
587 /*
588  * delete query cache on memcached
589  */
590 static int
delete_cache_on_memcached(const char * key)591 delete_cache_on_memcached(const char *key)
592 {
593 
594 	memcached_return rc;
595 
596 	ereport(DEBUG2,
597 			(errmsg("memcache: deleteing cache on memcached with key: \"%s\"", key)));
598 
599 
600 	/* delete cache data on memcached. key is md5 hash query */
601 	rc = memcached_delete(memc, key, 32, (time_t) 0);
602 
603 	/* delete cache data on memcached is failed */
604 	if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_BUFFERED)
605 	{
606 		ereport(LOG,
607 				(errmsg("failed to delete cache on memcached, error:\"%s\"", memcached_strerror(memc, rc))));
608 		return 0;
609 	}
610 	return 1;
611 
612 }
613 #endif
614 
615 /*
616  * Fetch SELECT data from cache if possible.
617  */
618 POOL_STATUS
pool_fetch_from_memory_cache(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend,char * contents,bool * foundp)619 pool_fetch_from_memory_cache(POOL_CONNECTION * frontend,
620 							 POOL_CONNECTION_POOL * backend,
621 							 char *contents, bool *foundp)
622 {
623 	char	   *qcache;
624 	size_t		qcachelen;
625 	int			sts;
626 	pool_sigset_t oldmask;
627 
628 	ereport(DEBUG1,
629 			(errmsg("pool_fetch_from_memory_cache called")));
630 
631 	*foundp = false;
632 
633 	POOL_SETMASK2(&BlockSig, &oldmask);
634 	pool_shmem_lock();
635 
636 	PG_TRY();
637 	{
638 		sts = pool_fetch_cache(backend, contents, &qcache, &qcachelen);
639 	}
640 	PG_CATCH();
641 	{
642 		pool_shmem_unlock();
643 		POOL_SETMASK(&oldmask);
644 		PG_RE_THROW();
645 	}
646 	PG_END_TRY();
647 
648 	pool_shmem_unlock();
649 	POOL_SETMASK(&oldmask);
650 
651 	if (sts != 0)
652 		/* Cache not found */
653 		return POOL_CONTINUE;
654 
655 	/*
656 	 * Cache found. If we are doing extended query and in streaming
657 	 * replication mode, we need to retrieve any responses from backend and
658 	 * forward them to frontend.
659 	 */
660 	if (pool_is_doing_extended_query_message() && SL_MODE)
661 	{
662 		POOL_SESSION_CONTEXT *session_context;
663 		POOL_CONNECTION *target_backend;
664 
665 		ereport(DEBUG1,
666 				(errmsg("memcache: injecting cache data")));
667 
668 		session_context = pool_get_session_context(true);
669 		target_backend = CONNECTION(backend, session_context->load_balance_node_id);
670 		inject_cached_message(target_backend, qcache, qcachelen);
671 	}
672 	else
673 	{
674 		/*
675 		 * Send each messages to frontend
676 		 */
677 		send_cached_messages(frontend, qcache, qcachelen);
678 	}
679 
680 	pfree(qcache);
681 
682 	/*
683 	 * Send a "READY FOR QUERY" if not in extended query.
684 	 */
685 	if (!pool_is_doing_extended_query_message() && MAJOR(backend) == PROTO_MAJOR_V3)
686 	{
687 		signed char state;
688 
689 		/*
690 		 * We keep previous transaction state.
691 		 */
692 		state = MASTER(backend)->tstate;
693 		send_message(frontend, 'Z', 5, (char *) &state);
694 	}
695 
696 	if (!pool_is_doing_extended_query_message() || !SL_MODE)
697 	{
698 		if (pool_flush(frontend))
699 		{
700 			return POOL_END;
701 		}
702 	}
703 
704 	*foundp = true;
705 
706 	if (pool_config->log_per_node_statement)
707 		ereport(LOG,
708 				(errmsg("fetch from memory cache"),
709 				 errdetail("query result fetched from cache. statement: %s", contents)));
710 
711 	ereport(DEBUG1,
712 			(errmsg("fetch from memory cache"),
713 			 errdetail("query result found in the query cache, %s", contents)));
714 
715 	return POOL_CONTINUE;
716 }
717 
718 /*
719  * Simple and rough (thus unreliable) check if the query is likely
720  * SELECT. Just check if the query starts with SELECT or WITH. This
721  * can be used before parse tree is available.
722  */
723 bool
pool_is_likely_select(char * query)724 pool_is_likely_select(char *query)
725 {
726 	bool		do_continue = false;
727 
728 	if (query == NULL)
729 		return false;
730 
731 	if (pool_config->ignore_leading_white_space)
732 	{
733 		/* Ignore leading white spaces */
734 		while (*query && isspace(*query))
735 			query++;
736 	}
737 	if (!*query)
738 	{
739 		return false;
740 	}
741 
742 	/*
743 	 * Get rid of head comment. It is sure that the query is in correct
744 	 * format, because the parser has rejected bad queries such as the one
745 	 * with not-ended comment.
746 	 */
747 	while (*query)
748 	{
749 		/* Ignore spaces and return marks */
750 		do_continue = false;
751 		while (*query && isspace(*query))
752 		{
753 			query++;
754 			do_continue = true;
755 		}
756 		if (do_continue)
757 		{
758 			continue;
759 		}
760 
761 		while (*query && !strncmp(query, "\n", 2))
762 		{
763 			query++;
764 			do_continue = true;
765 		}
766 		if (do_continue)
767 		{
768 			query += 2;
769 			continue;
770 		}
771 
772 		/* Ignore comments like C */
773 		if (!strncmp(query, "/*", 2))
774 		{
775 			while (*query && strncmp(query, "*/", 2))
776 				query++;
777 
778 			query += 2;
779 			continue;
780 		}
781 
782 		/* Ignore SQL comments */
783 		if (!strncmp(query, "--", 2))
784 		{
785 			while (*query && strncmp(query, "\n", 2))
786 				query++;
787 
788 			query += 2;
789 			continue;
790 		}
791 
792 		if (!strncasecmp(query, "SELECT", 6) || !strncasecmp(query, "WITH", 4))
793 		{
794 			return true;
795 		}
796 
797 		query++;
798 	}
799 
800 	return false;
801 }
802 
803 /*
804  * Return true if SELECT can be cached.  "node" is the parse tree for
805  * the query and "query" is the query string.
806  * The query must be SELECT or WITH.
807  */
808 bool
pool_is_allow_to_cache(Node * node,char * query)809 pool_is_allow_to_cache(Node *node, char *query)
810 {
811 	int			i = 0;
812 	int			num_oids = -1;
813 	SelectContext ctx;
814 
815 	/*
816 	 * If NO QUERY CACHE comment exists, do not cache.
817 	 */
818 	if (!strncasecmp(query, NO_QUERY_CACHE, NO_QUERY_CACHE_COMMENT_SZ))
819 		return false;
820 
821 	/*
822 	 * Check black table list first.
823 	 */
824 	if (pool_config->num_black_memqcache_table_list > 0)
825 	{
826 		/*
827 		 * Extract oids in from clause of SELECT, and check if SELECT to them
828 		 * could be cached.
829 		 */
830 		num_oids = pool_extract_table_oids_from_select_stmt(node, &ctx);
831 		if (num_oids > 0)
832 		{
833 			for (i = 0; i < num_oids; i++)
834 			{
835 				ereport(DEBUG1,
836 						(errmsg("memcache: checking if node is allowed to cache: check table_names[%d] = \"%s\"", i, ctx.table_names[i])));
837 				if (pool_is_table_in_black_list(ctx.table_names[i]) == true)
838 				{
839 					ereport(DEBUG1,
840 							(errmsg("memcache: node is not allowed to cache")));
841 					return false;
842 				}
843 			}
844 		}
845 	}
846 
847 	/* SELECT INTO or SELECT FOR SHARE or UPDATE cannot be cached */
848 	if (pool_has_insertinto_or_locking_clause(node))
849 		return false;
850 
851 	/*
852 	 * If SELECT uses non immutable functions, it's not allowed to cache.
853 	 */
854 	if (pool_has_non_immutable_function_call(node))
855 		return false;
856 
857 	/*
858 	 * If SELECT uses temporary tables it's not allowed to cache.
859 	 */
860 	if (pool_config->check_temp_table && pool_has_temp_table(node))
861 		return false;
862 
863 	/*
864 	 * If SELECT uses system catalogs, it's not allowed to cache.
865 	 */
866 	if (pool_has_system_catalog(node))
867 		return false;
868 
869 	/*
870 	 * TABLESAMPLE is not allowed to cache.
871 	 */
872 	if (IsA(node, SelectStmt) &&((SelectStmt *) node)->fromClause)
873 	{
874 		List	   *tbl_list = ((SelectStmt *) node)->fromClause;
875 		ListCell   *tbl;
876 
877 		foreach(tbl, tbl_list)
878 		{
879 			if (IsA(lfirst(tbl), RangeTableSample))
880 				return false;
881 		}
882 	}
883 
884 
885 	/*
886 	 * If the table is in the while list, allow to cache even if it is VIEW or
887 	 * unlogged table.
888 	 */
889 	if (pool_config->num_white_memqcache_table_list > 0)
890 	{
891 		if (num_oids < 0)
892 			num_oids = pool_extract_table_oids_from_select_stmt(node, &ctx);
893 
894 		if (num_oids > 0)
895 		{
896 			for (i = 0; i < num_oids; i++)
897 			{
898 				char	   *table = ctx.table_names[i];
899 
900 				ereport(DEBUG1,
901 						(errmsg("memcache: checking if node is allowed to cache: check table_names[%d] = \"%s\"", i, table)));
902 				if (is_view(table) || is_unlogged_table(table))
903 				{
904 					if (pool_is_table_in_white_list(table) == false)
905 					{
906 						ereport(DEBUG1,
907 								(errmsg("memcache: node is not allowed to cache")));
908 						return false;
909 					}
910 				}
911 			}
912 		}
913 	}
914 	else
915 	{
916 		/*
917 		 * If SELECT uses views, it's not allowed to cache.
918 		 */
919 		if (pool_has_view(node))
920 			return false;
921 
922 		/*
923 		 * If SELECT uses unlogged tables, it's not allowed to cache.
924 		 */
925 		if (pool_has_unlogged_table(node))
926 			return false;
927 	}
928 
929 	/*
930 	 * If Data-modifying statements in WITH clause, it's not allowed to cache.
931 	 */
932 	if(IsA(node, SelectStmt) && ((SelectStmt *) node)->withClause)
933 	{
934 		ListCell	*lc;
935 		WithClause	*withClause = ((SelectStmt *) node)->withClause;
936 
937 		foreach(lc, withClause->ctes)
938 		{
939 			CommonTableExpr *cte = (CommonTableExpr *)lfirst(lc);
940 			if(IsA(cte->ctequery, InsertStmt) ||
941 			   IsA(cte->ctequery, DeleteStmt) ||
942 			   IsA(cte->ctequery, UpdateStmt))
943 			{
944 				return false;
945 			}
946 		}
947 	}
948 
949 	return true;
950 }
951 
952 
953 /*
954  * Return true If the SELECTed table is in back list.
955  */
956 bool
pool_is_table_in_black_list(const char * table_name)957 pool_is_table_in_black_list(const char *table_name)
958 {
959 
960 	if (pool_config->num_black_memqcache_table_list > 0 &&
961 		pattern_compare((char *) table_name, BLACKLIST, "black_memqcache_table_list") == 1)
962 	{
963 		return true;
964 	}
965 
966 	return false;
967 }
968 
969 /*
970  * Return true If the SELECTed table is in white list.
971  */
972 bool
pool_is_table_in_white_list(const char * table_name)973 pool_is_table_in_white_list(const char *table_name)
974 {
975 	if (pool_config->num_white_memqcache_table_list > 0 &&
976 		pattern_compare((char *) table_name, WHITELIST, "white_memqcache_table_list") == 1)
977 	{
978 		return true;
979 	}
980 
981 	return false;
982 }
983 
984 /*
985  * Extract table oid from INSERT/UPDATE/DELETE/TRUNCATE/
986  * DROP TABLE/ALTER TABLE/COPY FROM statement.
987  * For SELECT, if Data-modifying statements in its WITH clause,
988  * extract table oid from Data-modifying statements too.
989  * Returns number of oids.
990  * In case of error, returns 0 (InvalidOid).
991  * oids buffer (oidsp) will be discarded by subsequent call.
992  */
993 int
pool_extract_table_oids(Node * node,int ** oidsp)994 pool_extract_table_oids(Node *node, int **oidsp)
995 {
996 #define POOL_MAX_DML_OIDS 128
997 	char	   *table;
998 	static int	oids[POOL_MAX_DML_OIDS];
999 	int			num_oids;
1000 	int			oid;
1001 
1002 	if (node == NULL)
1003 	{
1004 		ereport(LOG,
1005 				(errmsg("memcache: error while extracting table oids. statement is NULL")));
1006 		return 0;
1007 	}
1008 
1009 	num_oids = 0;
1010 	*oidsp = oids;
1011 
1012 	if (IsA(node, InsertStmt))
1013 	{
1014 		InsertStmt *stmt = (InsertStmt *) node;
1015 
1016 		num_oids = pool_extract_withclause_oids((Node *) stmt->withClause, *oidsp);
1017 		table = make_table_name_from_rangevar(stmt->relation);
1018 	}
1019 	else if (IsA(node, UpdateStmt))
1020 	{
1021 		UpdateStmt *stmt = (UpdateStmt *) node;
1022 
1023 		num_oids = pool_extract_withclause_oids((Node *) stmt->withClause, *oidsp);
1024 		table = make_table_name_from_rangevar(stmt->relation);
1025 	}
1026 	else if (IsA(node, DeleteStmt))
1027 	{
1028 		DeleteStmt *stmt = (DeleteStmt *) node;
1029 
1030 		num_oids = pool_extract_withclause_oids((Node *) stmt->withClause, *oidsp);
1031 		table = make_table_name_from_rangevar(stmt->relation);
1032 	}
1033 	else if(IsA(node, SelectStmt))
1034 	{
1035 		SelectStmt *stmt = (SelectStmt *) node;
1036 		num_oids = pool_extract_withclause_oids((Node *) stmt->withClause, *oidsp);
1037 		table = NULL;
1038 	}
1039 #ifdef NOT_USED
1040 
1041 	/*
1042 	 * We do not handle CREATE TABLE here.  It is possible that
1043 	 * pool_extract_table_oids() is called before CREATE TABLE gets executed.
1044 	 */
1045 	else if (IsA(node, CreateStmt))
1046 	{
1047 		CreateStmt *stmt = (CreateStmt *) node;
1048 
1049 		table = make_table_name_from_rangevar(stmt->relation);
1050 	}
1051 #endif
1052 
1053 	else if (IsA(node, AlterTableStmt))
1054 	{
1055 		AlterTableStmt *stmt = (AlterTableStmt *) node;
1056 
1057 		table = make_table_name_from_rangevar(stmt->relation);
1058 	}
1059 
1060 	else if (IsA(node, CopyStmt))
1061 	{
1062 		CopyStmt   *stmt = (CopyStmt *) node;
1063 
1064 		if (stmt->is_from)		/* COPY FROM? */
1065 		{
1066 			table = make_table_name_from_rangevar(stmt->relation);
1067 		}
1068 		else
1069 		{
1070 			return 0;
1071 		}
1072 	}
1073 
1074 	else if (IsA(node, DropStmt))
1075 	{
1076 		ListCell   *cell;
1077 
1078 		DropStmt   *stmt = (DropStmt *) node;
1079 
1080 		if (stmt->removeType != OBJECT_TABLE)
1081 		{
1082 			return 0;
1083 		}
1084 
1085 		/*
1086 		 * Here, stmt->objects is list of target relation info.  The first
1087 		 * cell of target relation info is a list (possibly) consists of
1088 		 * database, schema and relation.  We need to call
1089 		 * makeRangeVarFromNameList() before passing to
1090 		 * make_table_name_from_rangevar. Otherwise we get weird excessively
1091 		 * decorated relation name (''table_name'').
1092 		 */
1093 		foreach(cell, stmt->objects)
1094 		{
1095 			if (num_oids >= POOL_MAX_DML_OIDS)
1096 			{
1097 				ereport(LOG,
1098 						(errmsg("memcache: error while extracting table oids. too many oids:%d", num_oids)));
1099 				return 0;
1100 			}
1101 
1102 			table = make_table_name_from_rangevar(makeRangeVarFromNameList(lfirst(cell)));
1103 			oid = pool_table_name_to_oid(table);
1104 			if (oid > 0)
1105 			{
1106 				oids[num_oids++] = pool_table_name_to_oid(table);
1107 				ereport(DEBUG1,
1108 						(errmsg("memcache: extracting table oids: table: \"%s\" oid:%d", table, oids[num_oids - 1])));
1109 			}
1110 		}
1111 		return num_oids;
1112 	}
1113 	else if (IsA(node, TruncateStmt))
1114 	{
1115 		ListCell   *cell;
1116 
1117 		TruncateStmt *stmt = (TruncateStmt *) node;
1118 
1119 		foreach(cell, stmt->relations)
1120 		{
1121 			if (num_oids >= POOL_MAX_DML_OIDS)
1122 			{
1123 				ereport(LOG,
1124 						(errmsg("memcache: error while extracting table oids. too many oids:%d", num_oids)));
1125 				return 0;
1126 			}
1127 
1128 			table = make_table_name_from_rangevar(lfirst(cell));
1129 			oid = pool_table_name_to_oid(table);
1130 			if (oid > 0)
1131 			{
1132 				oids[num_oids++] = pool_table_name_to_oid(table);
1133 				ereport(DEBUG1,
1134 						(errmsg("memcache: extracting table oids: table: \"%s\" oid:%d", table, oids[num_oids - 1])));
1135 			}
1136 		}
1137 		return num_oids;
1138 	}
1139 	else if (IsA(node, ExplainStmt))
1140 	{
1141 		ListCell	*cell;
1142 		DefElem		*def;
1143 		ExplainStmt *stmt = (ExplainStmt *) node;
1144 
1145 		foreach(cell, stmt->options)
1146 		{
1147 			def = lfirst(cell);
1148 			if (strncmp("analyze", def->defname, 7) == 0)
1149 			{
1150 				return pool_extract_table_oids(stmt->query, oidsp);
1151 			}
1152 		}
1153 
1154 		table = NULL;
1155 	}
1156 	else
1157 	{
1158 		ereport(DEBUG1,
1159 				(errmsg("memcache: extracting table oids: statment is different from INSERT/UPDATE/DELETE/TRUNCATE/DROP TABLE/ALTER TABLE")));
1160 		return 0;
1161 	}
1162 
1163 	oid = pool_table_name_to_oid(table);
1164 	if (oid > 0)
1165 	{
1166 		if (num_oids >= POOL_MAX_DML_OIDS)
1167 		{
1168 			ereport(LOG,
1169 					(errmsg("memcache: error while extracting table oids. too many oids:%d", num_oids)));
1170 			return 0;
1171 		}
1172 
1173 		oids[num_oids++] = pool_table_name_to_oid(table);
1174 		ereport(DEBUG1,
1175 				(errmsg("memcache: extracting table oids: table: \"%s\" oid:%d", table, oid)));
1176 	}
1177 	return num_oids;
1178 }
1179 
1180 /*
1181  * Extract table oid from INSERT/UPDATE/DELETE
1182  * FROM statement in WITH clause.
1183  * Returns number of oids.
1184  * oids buffer (oidsp) will be discarded by subsequent call.
1185  */
1186 int
pool_extract_withclause_oids(Node * node,int * oidsp)1187 pool_extract_withclause_oids(Node *node, int *oidsp)
1188 {
1189 	int			num_oids = 0;
1190 	int			oid;
1191 	char	   *table;
1192 	ListCell   *lc;
1193 	WithClause *with;
1194 
1195 	if(oidsp == NULL)
1196 	{
1197 		return 0;
1198 	}
1199 
1200 	if(!node || !IsA(node, WithClause))
1201 	{
1202 		return 0;
1203 	}
1204 
1205 	with = (WithClause *) node;
1206 	foreach(lc, with->ctes)
1207 	{
1208 		CommonTableExpr *cte = (CommonTableExpr *)lfirst(lc);
1209 		if(IsA(cte->ctequery, InsertStmt))
1210 		{
1211 			InsertStmt *stmt = (InsertStmt *) cte->ctequery;
1212 			table = make_table_name_from_rangevar(stmt->relation);
1213 		}
1214 		else if(IsA(cte->ctequery, DeleteStmt))
1215 		{
1216 			DeleteStmt *stmt = (DeleteStmt *) cte->ctequery;
1217 			table = make_table_name_from_rangevar(stmt->relation);
1218 		}
1219 		else if(IsA(cte->ctequery, UpdateStmt))
1220 		{
1221 			UpdateStmt *stmt = (UpdateStmt *) cte->ctequery;
1222 			table = make_table_name_from_rangevar(stmt->relation);
1223 		}
1224 		else
1225 		{
1226 			/* only check INSERT/DELETE/UPDATE in WITH clause */
1227 			table = NULL;
1228 		}
1229 
1230 		oid = pool_table_name_to_oid(table);
1231 		if (oid > 0)
1232 		{
1233 			if (num_oids >= POOL_MAX_DML_OIDS)
1234 			{
1235 				break;
1236 			}
1237 
1238 			oidsp[num_oids++] = pool_table_name_to_oid(table);
1239 			ereport(DEBUG1,
1240 					(errmsg("memcache: extracting table oids: table: \"%s\" oid:%d", table, oidsp[num_oids - 1])));
1241 		}
1242 	}
1243 
1244 	return num_oids;
1245 }
1246 
1247 #define POOL_OIDBUF_SIZE 1024
1248 static int *oidbuf;
1249 static int	oidbufp;
1250 static int	oidbuf_size;
1251 
1252 /*
1253  * Add table oid to internal buffer
1254  */
1255 void
pool_add_dml_table_oid(int oid)1256 pool_add_dml_table_oid(int oid)
1257 {
1258 	int			i;
1259 	int		   *tmp;
1260 
1261 	if (oid == 0)
1262 		return;
1263 
1264 	if (oidbufp >= oidbuf_size)
1265 	{
1266 		MemoryContext oldcxt;
1267 
1268 		oidbuf_size += POOL_OIDBUF_SIZE;
1269 
1270 		/*
1271 		 * This need to live throughout the life of child so home it in
1272 		 * TopMemoryContext
1273 		 */
1274 		oldcxt = MemoryContextSwitchTo(TopMemoryContext);
1275 		tmp = repalloc(oidbuf, sizeof(int) * oidbuf_size);
1276 		MemoryContextSwitchTo(oldcxt);
1277 		if (tmp == NULL)
1278 			return;
1279 
1280 		oidbuf = tmp;
1281 	}
1282 
1283 	for (i = 0; i < oidbufp; i++)
1284 	{
1285 		if (oidbuf[i] == oid)
1286 			/* Already same oid exists */
1287 			return;
1288 	}
1289 	oidbuf[oidbufp++] = oid;
1290 }
1291 
1292 
1293 /*
1294  * Get table oid buffer
1295  */
1296 static int
pool_get_dml_table_oid(int ** oid)1297 pool_get_dml_table_oid(int **oid)
1298 {
1299 	*oid = oidbuf;
1300 	return oidbufp;
1301 }
1302 
1303 static int
pool_get_dropdb_table_oids(int ** oids,int dboid)1304 pool_get_dropdb_table_oids(int **oids, int dboid)
1305 {
1306 	int		   *rtn = 0;
1307 	int			oids_size = 0;
1308 	int		   *tmp;
1309 
1310 	int			num_oids = 0;
1311 	DIR		   *dir;
1312 	struct dirent *dp;
1313 	char		path[1024];
1314 
1315 	snprintf(path, sizeof(path), "%s/%d", pool_config->memqcache_oiddir, dboid);
1316 	if ((dir = opendir(path)) == NULL)
1317 	{
1318 		ereport(DEBUG1,
1319 				(errmsg("memcache: getting drop table oids"),
1320 				 errdetail("Failed to open dir: %s", path)));
1321 		return 0;
1322 	}
1323 
1324 	while ((dp = readdir(dir)) != NULL)
1325 	{
1326 		if (strcmp(dp->d_name, ".") == 0 || strcmp(dp->d_name, "..") == 0)
1327 			continue;
1328 
1329 		if (num_oids >= oids_size)
1330 		{
1331 			oids_size += POOL_OIDBUF_SIZE;
1332 			tmp = repalloc(rtn, sizeof(int) * oids_size);
1333 			if (tmp == NULL)
1334 			{
1335 				closedir(dir);
1336 				return 0;
1337 			}
1338 			rtn = tmp;
1339 		}
1340 
1341 		rtn[num_oids] = atol(dp->d_name);
1342 		num_oids++;
1343 	}
1344 
1345 	closedir(dir);
1346 	*oids = rtn;
1347 
1348 	return num_oids;
1349 }
1350 
1351 /* Discard oid internal buffer */
1352 static void
pool_discard_dml_table_oid(void)1353 pool_discard_dml_table_oid(void)
1354 {
1355 	oidbufp = 0;
1356 }
1357 
1358 /*
1359  * Management modules for oid map.  When caching SELECT results, we
1360  * record table oids to file, which has following structure.
1361  *
1362  * memqcache_oiddir -+- database_oid -+-table_oid_file1
1363  *                                    |
1364  *                                    +-table_oid_file2
1365  *                                    |
1366  *                                    +-table_oid_file3...
1367  *
1368  * table_oid_file's name is table oid, which was used by the SELECT
1369  * statement. The file has 1 or more cacheid(s). When SELECT result is
1370  * cached, the file is created and cache id is appended. Later SELECT
1371  * using same table oid will add to the same file. If the SELECT uses
1372  * multiple tables, multiple table_oid_file will be created. When
1373  * INSERT/UPDATE/DELETE is executed, corresponding caches must be
1374  * deleted (cache invalidation) (when DROP TABLE, ALTER TABLE is
1375  * executed, the caches must be deleted as well). When database is
1376  * dropped, all caches belonging to the database must be deleted.
1377  */
1378 
1379 /*
1380  * Get oid of current database
1381  */
1382 static int
pool_get_database_oid(void)1383 pool_get_database_oid(void)
1384 {
1385 /*
1386  * Query to convert table name to oid
1387  */
1388 	int			oid = 0;
1389 	static POOL_RELCACHE * relcache;
1390 	POOL_CONNECTION_POOL *backend;
1391 
1392 	backend = pool_get_session_context(false)->backend;
1393 
1394 	/*
1395 	 * If relcache does not exist, create it.
1396 	 */
1397 	if (!relcache)
1398 	{
1399 		relcache = pool_create_relcache(pool_config->relcache_size, DATABASE_TO_OID_QUERY,
1400 										int_register_func, int_unregister_func,
1401 										false);
1402 		if (relcache == NULL)
1403 		{
1404 			ereport(LOG,
1405 					(errmsg("memcache: error creating relcache while getting database OID")));
1406 			return oid;
1407 		}
1408 	}
1409 
1410 	/*
1411 	 * Search relcache.
1412 	 */
1413 	oid = (int) (intptr_t) pool_search_relcache(relcache, backend,
1414 												MASTER_CONNECTION(backend)->sp->database);
1415 	return oid;
1416 }
1417 
1418 /*
1419  * Get oid of current database for discarding cache files
1420  * after executing DROP DATABASE
1421  */
1422 int
pool_get_database_oid_from_dbname(char * dbname)1423 pool_get_database_oid_from_dbname(char *dbname)
1424 {
1425 	int			dboid = 0;
1426 	POOL_SELECT_RESULT *res;
1427 	char		query[1024];
1428 
1429 	POOL_CONNECTION_POOL *backend;
1430 
1431 	backend = pool_get_session_context(false)->backend;
1432 
1433 	snprintf(query, sizeof(query), DATABASE_TO_OID_QUERY, dbname);
1434 	do_query(MASTER(backend), query, &res, MAJOR(backend));
1435 
1436 	if (res->numrows != 1)
1437 	{
1438 		ereport(DEBUG1,
1439 				(errmsg("memcache: getting oid of current database"),
1440 				 errdetail("received %d rows", res->numrows)));
1441 		free_select_result(res);
1442 		return 0;
1443 	}
1444 
1445 	dboid = atol(res->data[0]);
1446 	free_select_result(res);
1447 
1448 	return dboid;
1449 }
1450 
1451 /*
1452  * Add cache id (shmem case) or hash key (memcached case) to table oid
1453  * map file.  Caller must hold shmem lock before calling this function
1454  * to avoid file extension conflict among different pgpool child
1455  * process.
1456  * As of pgpool-II 3.2, pool_handle_query_cache is responsible for that.
1457  * (pool_handle_query_cache -> pool_commit_cache -> pool_add_table_oid_map)
1458  */
1459 static void
pool_add_table_oid_map(POOL_CACHEKEY * cachekey,int num_table_oids,int * table_oids)1460 pool_add_table_oid_map(POOL_CACHEKEY * cachekey, int num_table_oids, int *table_oids)
1461 {
1462 	char	   *dir;
1463 	int			dboid;
1464 	char		path[1024];
1465 	int			i;
1466 	int			len;
1467 
1468 	/*
1469 	 * Create memqcache_oiddir
1470 	 */
1471 	dir = pool_config->memqcache_oiddir;
1472 
1473 	if (mkdir(dir, S_IREAD | S_IWRITE | S_IEXEC) == -1)
1474 	{
1475 		if (errno != EEXIST)
1476 		{
1477 			ereport(WARNING,
1478 					(errmsg("memcache: adding table oid maps, failed to create directory:\"%s\"", dir),
1479 					 errdetail("%m")));
1480 			return;
1481 		}
1482 	}
1483 
1484 	/*
1485 	 * Create memqcache_oiddir/database_oid
1486 	 */
1487 	dboid = pool_get_database_oid();
1488 	ereport(DEBUG1,
1489 			(errmsg("memcache: adding table oid maps"),
1490 			 errdetail("dboid %d", dboid)));
1491 
1492 	if (dboid <= 0)
1493 	{
1494 		ereport(WARNING,
1495 				(errmsg("memcache: adding table oid maps, failed to get database OID")));
1496 		return;
1497 	}
1498 
1499 	snprintf(path, sizeof(path), "%s/%d", dir, dboid);
1500 	if (mkdir(path, S_IREAD | S_IWRITE | S_IEXEC) == -1)
1501 	{
1502 		if (errno != EEXIST)
1503 		{
1504 			ereport(WARNING,
1505 					(errmsg("memcache: adding table oid maps, failed to create directory:\"%s\"", path),
1506 					 errdetail("%m")));
1507 			return;
1508 		}
1509 	}
1510 
1511 	if (pool_is_shmem_cache())
1512 	{
1513 		len = sizeof(cachekey->cacheid);
1514 	}
1515 	else
1516 	{
1517 		len = sizeof(cachekey->hashkey);
1518 	}
1519 
1520 	for (i = 0; i < num_table_oids; i++)
1521 	{
1522 		int			fd;
1523 		int			oid = table_oids[i];
1524 		int			sts;
1525 		struct flock fl;
1526 
1527 		/*
1528 		 * Create or open each memqcache_oiddir/database_oid/table_oid
1529 		 */
1530 		snprintf(path, sizeof(path), "%s/%d/%d", dir, dboid, oid);
1531 		if ((fd = open(path, O_CREAT | O_RDWR, S_IRUSR | S_IWUSR)) == -1)
1532 		{
1533 			ereport(WARNING,
1534 					(errmsg("memcache: adding table oid maps, failed to open file:\"%s\"", path),
1535 					 errdetail("%m")));
1536 			return;
1537 		}
1538 
1539 		fl.l_type = F_WRLCK;
1540 		fl.l_whence = SEEK_SET;
1541 		fl.l_start = 0;			/* Offset from l_whence         */
1542 		fl.l_len = 0;			/* length, 0 = to EOF           */
1543 
1544 		sts = fcntl(fd, F_SETLKW, &fl);
1545 		if (sts == -1)
1546 		{
1547 			ereport(WARNING,
1548 					(errmsg("memcache: adding table oid maps, failed to lock file:\"%s\"", path),
1549 					 errdetail("%m")));
1550 
1551 			close(fd);
1552 			return;
1553 		}
1554 
1555 		/*
1556 		 * Below was ifdef-out because of a performance reason. Looking for
1557 		 * duplicate cache entries in a file needed unacceptably high cost. So
1558 		 * we gave up this and decided not to care about duplicate entries in
1559 		 * the file.
1560 		 */
1561 #ifdef NOT_USED
1562 		for (;;)
1563 		{
1564 			sts = read(fd, (char *) &buf, len);
1565 			if (sts == -1)
1566 			{
1567 				ereport(WARNING,
1568 						(errmsg("memcache: adding table oid maps, failed to read file:\"%s\"", path),
1569 						 errdetail("%m")));
1570 				close(fd);
1571 				return;
1572 			}
1573 			else if (sts == len)
1574 			{
1575 				if (memcmp(cachekey, &buf, len) == 0)
1576 				{
1577 					/* Same key found. Skip this */
1578 					close(fd);
1579 					return;
1580 				}
1581 				continue;
1582 			}
1583 
1584 			/*
1585 			 * Must be EOF
1586 			 */
1587 			if (sts != 0)
1588 			{
1589 				ereport(WARNING,
1590 						(errmsg("memcache: adding table oid maps, invalid data length:%d in file:\"%s\". error:\"%s\"", sts, path)));
1591 				close(fd);
1592 				return;
1593 			}
1594 			break;
1595 		}
1596 #endif
1597 
1598 		if (lseek(fd, 0, SEEK_END) == -1)
1599 		{
1600 			ereport(WARNING,
1601 					(errmsg("memcache: adding table oid maps, failed seek on file:\"%s\"", path),
1602 					 errdetail("%m")));
1603 			close(fd);
1604 			return;
1605 		}
1606 
1607 		/*
1608 		 * Write cache_id or cache key at the end of file
1609 		 */
1610 		sts = write(fd, (char *) cachekey, len);
1611 		if (sts == -1 || sts != len)
1612 		{
1613 			ereport(WARNING,
1614 					(errmsg("memcache: adding table oid maps, failed to write file:\"%s\"", path),
1615 					 errdetail("%m")));
1616 			close(fd);
1617 			return;
1618 		}
1619 		close(fd);
1620 	}
1621 }
1622 
1623 /*
1624  * Discard all oid maps at pgpool-II startup.
1625  * This is necessary for shmem case.
1626  */
1627 void
pool_discard_oid_maps(void)1628 pool_discard_oid_maps(void)
1629 {
1630 	char		command[1024];
1631 
1632 	snprintf(command, sizeof(command), "/bin/rm -fr %s/[0-9]*",
1633 			 pool_config->memqcache_oiddir);
1634 	if (system(command) == -1)
1635 		ereport(WARNING,
1636 				(errmsg("unable to execute command \"%s\"", command),
1637 				 errdetail("system() command failed with error \"%m\"")));
1638 
1639 
1640 }
1641 
1642 void
pool_discard_oid_maps_by_db(int dboid)1643 pool_discard_oid_maps_by_db(int dboid)
1644 {
1645 	char		command[1024];
1646 
1647 	if (pool_is_shmem_cache())
1648 	{
1649 		snprintf(command, sizeof(command), "/bin/rm -fr %s/%d/",
1650 				 pool_config->memqcache_oiddir, dboid);
1651 
1652 		ereport(DEBUG1,
1653 				(errmsg("memcache: discarding oid maps by db"),
1654 				 errdetail("command: '%s\'", command)));
1655 
1656 		if (system(command) == -1)
1657 			ereport(WARNING,
1658 					(errmsg("unable to execute command \"%s\"", command),
1659 					 errdetail("system() command failed with error \"%m\"")));
1660 	}
1661 }
1662 
1663 /*
1664  * Read cache id (shmem case) or hash key (memcached case) from table
1665  * oid map file according to table_oids and discard cache entries.  If
1666  * unlink is true, the file will be unlinked after successful cache
1667  * removal.
1668  */
1669 static void
pool_invalidate_query_cache(int num_table_oids,int * table_oid,bool unlinkp,int dboid)1670 pool_invalidate_query_cache(int num_table_oids, int *table_oid, bool unlinkp, int dboid)
1671 {
1672 	char	   *dir;
1673 	char		path[1024];
1674 	int			i;
1675 	int			len;
1676 	POOL_CACHEKEY buf;
1677 
1678 	/*
1679 	 * Create memqcache_oiddir
1680 	 */
1681 	dir = pool_config->memqcache_oiddir;
1682 	if (mkdir(dir, S_IREAD | S_IWRITE | S_IEXEC) == -1)
1683 	{
1684 		if (errno != EEXIST)
1685 		{
1686 			ereport(WARNING,
1687 					(errmsg("memcache: invalidating query cache, failed to create directory:\"%s\"", dir),
1688 					 errdetail("%m")));
1689 			return;
1690 		}
1691 	}
1692 
1693 	/*
1694 	 * Create memqcache_oiddir/database_oid
1695 	 */
1696 	if (dboid == 0)
1697 	{
1698 		dboid = pool_get_database_oid();
1699 		ereport(DEBUG1,
1700 				(errmsg("memcache invalidating query cache"),
1701 				 errdetail("dboid %d", dboid)));
1702 
1703 		if (dboid <= 0)
1704 		{
1705 			ereport(WARNING,
1706 					(errmsg("memcache: invalidating query cache, could not get database OID")));
1707 			return;
1708 		}
1709 	}
1710 
1711 	snprintf(path, sizeof(path), "%s/%d", dir, dboid);
1712 	if (mkdir(path, S_IREAD | S_IWRITE | S_IEXEC) == -1)
1713 	{
1714 		if (errno != EEXIST)
1715 		{
1716 			ereport(WARNING,
1717 					(errmsg("memcache: invalidating query cache, failed to create directory:\"%s\"", path),
1718 					 errdetail("%m")));
1719 			return;
1720 		}
1721 	}
1722 
1723 	if (pool_is_shmem_cache())
1724 	{
1725 		len = sizeof(buf.cacheid);
1726 	}
1727 	else
1728 	{
1729 		len = sizeof(buf.hashkey);
1730 	}
1731 
1732 	for (i = 0; i < num_table_oids; i++)
1733 	{
1734 		int			fd;
1735 		int			oid = table_oid[i];
1736 		int			sts;
1737 		struct flock fl;
1738 
1739 		/*
1740 		 * Open each memqcache_oiddir/database_oid/table_oid
1741 		 */
1742 		snprintf(path, sizeof(path), "%s/%d/%d", dir, dboid, oid);
1743 		if ((fd = open(path, O_RDONLY)) == -1)
1744 		{
1745 			/*
1746 			 * This may be normal. It is possible that no SELECT has been
1747 			 * issued since the table has been created or since pgpool-II
1748 			 * started up.
1749 			 */
1750 			ereport(DEBUG1,
1751 					(errmsg("memcache invalidating query cache"),
1752 					 errdetail("failed to open \"%s\". reason:\"%m\"", path)));
1753 			continue;
1754 		}
1755 
1756 		fl.l_type = F_RDLCK;
1757 		fl.l_whence = SEEK_SET;
1758 		fl.l_start = 0;			/* Offset from l_whence         */
1759 		fl.l_len = 0;			/* length, 0 = to EOF           */
1760 
1761 		sts = fcntl(fd, F_SETLKW, &fl);
1762 		if (sts == -1)
1763 		{
1764 			ereport(WARNING,
1765 					(errmsg("memcache: invalidating query cache, failed to lock file:\"%s\"", path),
1766 					 errdetail("%m")));
1767 			close(fd);
1768 			return;
1769 		}
1770 		for (;;)
1771 		{
1772 			sts = read(fd, (char *) &buf, len);
1773 			if (sts == -1)
1774 			{
1775 				ereport(WARNING,
1776 						(errmsg("memcache: invalidating query cache, failed to read file:\"%s\"", path),
1777 						 errdetail("%m")));
1778 
1779 				close(fd);
1780 				return;
1781 			}
1782 			else if (sts == len)
1783 			{
1784 				if (pool_is_shmem_cache())
1785 				{
1786 					ereport(DEBUG1,
1787 							(errmsg("memcache invalidating query cache"),
1788 							 errdetail("deleting cacheid:%d itemid:%d",
1789 									   buf.cacheid.blockid, buf.cacheid.itemid)));
1790 					pool_delete_item_shmem_cache(&buf.cacheid);
1791 				}
1792 #ifdef USE_MEMCACHED
1793 				else
1794 				{
1795 					char		delbuf[33];
1796 
1797 					memcpy(delbuf, buf.hashkey, 32);
1798 					delbuf[32] = 0;
1799 					ereport(DEBUG1,
1800 							(errmsg("memcache invalidating query cache"),
1801 							 errdetail("deleting %s", delbuf)));
1802 
1803 					delete_cache_on_memcached(delbuf);
1804 				}
1805 #endif
1806 				continue;
1807 			}
1808 
1809 			/*
1810 			 * Must be EOF
1811 			 */
1812 			if (sts != 0)
1813 			{
1814 				ereport(WARNING,
1815 						(errmsg("memcache: invalidating query cache, invalid data length:%d in file:\"%s\"", sts, path)));
1816 				close(fd);
1817 				return;
1818 			}
1819 			break;
1820 		}
1821 
1822 		if (unlinkp)
1823 		{
1824 			unlink(path);
1825 		}
1826 		close(fd);
1827 	}
1828 #ifdef SHMEMCACHE_DEBUG
1829 	dump_shmem_cache(0);
1830 #endif
1831 }
1832 
1833 /*
1834  * Reset SELECT data buffers.  If reset_dml_oids is true, call
1835  * pool_discard_dml_table_oid() to reset table oids used in DML statements.
1836  */
1837 static void
pool_reset_memqcache_buffer(bool reset_dml_oids)1838 pool_reset_memqcache_buffer(bool reset_dml_oids)
1839 {
1840 	POOL_SESSION_CONTEXT *session_context;
1841 
1842 	session_context = pool_get_session_context(true);
1843 
1844 	if (session_context && session_context->query_cache_array)
1845 	{
1846 		POOL_TEMP_QUERY_CACHE *cache;
1847 
1848 		ereport(DEBUG1,
1849 				(errmsg("memcache reset buffer"),
1850 				 errdetail("discard: %p", session_context->query_cache_array)));
1851 
1852 		pool_discard_query_cache_array(session_context->query_cache_array);
1853 		session_context->query_cache_array = pool_create_query_cache_array();
1854 
1855 		ereport(DEBUG1,
1856 				(errmsg("memcache reset buffer"),
1857 				 errdetail("create: %p", session_context->query_cache_array)));
1858 
1859 		/*
1860 		 * if the query context is still under use, we cannot discard
1861 		 * temporary cache.
1862 		 */
1863 		if ((SL_MODE && pool_is_doing_extended_query_message()) ||
1864 			can_query_context_destroy(session_context->query_context))
1865 		{
1866 			ereport(DEBUG1,
1867 					(errmsg("memcache reset buffer"),
1868 					 errdetail("discard temp buffer of %p (%s)",
1869 							   session_context->query_context, session_context->query_context->original_query)));
1870 
1871 			cache = pool_get_current_cache();
1872 			pool_discard_temp_query_cache(cache);
1873 
1874 			/*
1875 			 * Reset temp_cache pointer in the current query context so that
1876 			 * we don't double free memory.
1877 			 */
1878 			session_context->query_context->temp_cache = NULL;
1879 		}
1880 	}
1881 
1882 	if (reset_dml_oids)
1883 		pool_discard_dml_table_oid();
1884 
1885 	pool_tmp_stats_reset_num_selects();
1886 }
1887 
1888 /*
1889  * Return true if memory cache method is "shmem".  The purpose of this
1890  * function is to cache the result of stcmp and to save a few cycle.
1891  */
1892 bool
pool_is_shmem_cache(void)1893 pool_is_shmem_cache(void)
1894 {
1895 	return pool_config->memqcache_method == SHMEM_CACHE;
1896 }
1897 
1898 /*
1899  * Remember memory cache number of blocks.
1900  */
1901 static int	memqcache_num_blocks;
1902 static void
pool_set_memqcache_blocks(int num_blocks)1903 pool_set_memqcache_blocks(int num_blocks)
1904 {
1905 	memqcache_num_blocks = num_blocks;
1906 }
1907 
1908 /*
1909  * Return memory cache number of blocks.
1910  */
1911 static int
pool_get_memqcache_blocks(void)1912 pool_get_memqcache_blocks(void)
1913 {
1914 	return memqcache_num_blocks;
1915 }
1916 
1917 /*
1918  * Query cache on shared memory management modules.
1919  */
1920 
1921 /*
1922  * Calculate necessary shared memory size.
1923  */
1924 size_t
pool_shared_memory_cache_size(void)1925 pool_shared_memory_cache_size(void)
1926 {
1927 	int64		num_blocks;
1928 	size_t		size;
1929 
1930 	if (pool_config->memqcache_maxcache > pool_config->memqcache_cache_block_size)
1931 		ereport(FATAL,
1932 				(errmsg("invalid memory cache configuration"),
1933 				 errdetail("memqcache_cache_block_size %d should be greater or equal to memqcache_maxcache %d",
1934 						   pool_config->memqcache_cache_block_size,
1935 						   pool_config->memqcache_maxcache)));
1936 
1937 
1938 	num_blocks = pool_config->memqcache_total_size /
1939 		pool_config->memqcache_cache_block_size;
1940 	if (num_blocks == 0)
1941 		ereport(FATAL,
1942 				(errmsg("invalid memory cache configuration"),
1943 				 errdetail("memqcache_total_size %ld should be greater or equal to memqcache_cache_block_size %d",
1944 						   pool_config->memqcache_total_size,
1945 						   pool_config->memqcache_cache_block_size)));
1946 
1947 	ereport(LOG,
1948 			(errmsg("memory cache initialized"),
1949 			 errdetail("memcache blocks :%ld", num_blocks)));
1950 	/* Remember # of blocks */
1951 	pool_set_memqcache_blocks(num_blocks);
1952 	size = pool_config->memqcache_cache_block_size * num_blocks;
1953 	return size;
1954 }
1955 
1956 /*
1957  * Acquire and initialize shared memory cache. This should be called
1958  * only once from pgpool main process at the process staring up time.
1959  */
1960 static void *shmem;
1961 int
pool_init_memory_cache(size_t size)1962 pool_init_memory_cache(size_t size)
1963 {
1964 	ereport(DEBUG1,
1965 			(errmsg("memory cache request size : %zd", size)));
1966 
1967 	shmem = pool_shared_memory_create(size);
1968 	return 0;
1969 }
1970 
1971 /*
1972  * Clear all the shared memory cache and reset FSMM and hash table.
1973  */
1974 void
pool_clear_memory_cache(void)1975 pool_clear_memory_cache(void)
1976 {
1977 	size_t		size;
1978 	pool_sigset_t oldmask;
1979 
1980 	POOL_SETMASK2(&BlockSig, &oldmask);
1981 	pool_shmem_lock();
1982 
1983 	PG_TRY();
1984 	{
1985 		size = pool_shared_memory_cache_size();
1986 		memset(shmem, 0, size);
1987 
1988 		size = pool_shared_memory_fsmm_size();
1989 		pool_reset_fsmm(size);
1990 
1991 		pool_discard_oid_maps();
1992 
1993 		pool_hash_reset(pool_config->memqcache_max_num_cache);
1994 	}
1995 	PG_CATCH();
1996 	{
1997 		pool_shmem_unlock();
1998 		POOL_SETMASK(&oldmask);
1999 		PG_RE_THROW();
2000 	}
2001 	PG_END_TRY();
2002 
2003 	pool_shmem_unlock();
2004 	POOL_SETMASK(&oldmask);
2005 }
2006 
2007 /*
2008  * Return shared memory cache address
2009  */
2010 static void *
pool_memory_cache_address(void)2011 pool_memory_cache_address(void)
2012 {
2013 	return shmem;
2014 }
2015 
2016 /*
2017  * Initialize new block
2018  */
2019 
2020 /*
2021  * Free space management map
2022  *
2023  * Free space management map (FSMM) consists of bytes. Each byte
2024  * corresponds to block id. For example, if you have 1GB cache and
2025  * block size is 8Kb, number of blocks = 131,072, thus total size of
2026  * FSMM is 128Kb.  Each FSMM entry has value from 0 to 255. Those
2027  * values describes total free space in each block.
2028  * For example, if the value is 2, the free space can be between 64
2029  * bytes and 95 bytes.
2030  *
2031  * value free space(in bytes)
2032  * 0     0-31
2033  * 1     32-63
2034  * 2     64-95
2035  * 3     96-127
2036  * :
2037  * 255   8160-8192
2038  */
2039 
2040 /*
2041  * Calculate necessary shared memory size for FSMM. Should be called after
2042  * pool_shared_memory_cache_size.
2043  */
2044 size_t
pool_shared_memory_fsmm_size(void)2045 pool_shared_memory_fsmm_size(void)
2046 {
2047 	size_t		size;
2048 
2049 	size = pool_get_memqcache_blocks() * sizeof(char);
2050 	return size;
2051 }
2052 
2053 /*
2054  * Acquire and initialize shared memory cache for FSMM. This should be
2055  * called after pool_shared_memory_cache_size only once from pgpool
2056  * main process at the process staring up time.
2057  */
2058 static void *fsmm;
2059 int
pool_init_fsmm(size_t size)2060 pool_init_fsmm(size_t size)
2061 {
2062 	int			maxblock = pool_get_memqcache_blocks();
2063 	int			encode_value;
2064 
2065 	fsmm = pool_shared_memory_create(size);
2066 	encode_value = POOL_MAX_FREE_SPACE / POOL_FSMM_RATIO;
2067 	memset(fsmm, encode_value, maxblock);
2068 	return 0;
2069 }
2070 
2071 /*
2072  * Return shared memory fsmm address
2073  */
2074 static void *
pool_fsmm_address(void)2075 pool_fsmm_address(void)
2076 {
2077 	return fsmm;
2078 }
2079 
2080 /*
2081  * Clock algorithm shared query cache management modules.
2082  */
2083 
2084 /*
2085  * Clock hand pointing to next victim block
2086  */
2087 static int *pool_fsmm_clock_hand;
2088 
2089 /*
2090  * Allocate and initialize clock hand on shmem
2091  */
2092 void
pool_allocate_fsmm_clock_hand(void)2093 pool_allocate_fsmm_clock_hand(void)
2094 {
2095 	pool_fsmm_clock_hand = pool_shared_memory_create(sizeof(*pool_fsmm_clock_hand));
2096 	*pool_fsmm_clock_hand = 0;
2097 }
2098 
2099 /*
2100  * Reset FSMM.
2101  */
2102 static void
pool_reset_fsmm(size_t size)2103 pool_reset_fsmm(size_t size)
2104 {
2105 	int			encode_value;
2106 
2107 	encode_value = POOL_MAX_FREE_SPACE / POOL_FSMM_RATIO;
2108 	memset(fsmm, encode_value, size);
2109 
2110 	*pool_fsmm_clock_hand = 0;
2111 }
2112 
2113 /*
2114  * Find victim block using clock algorithm and make it free.
2115  * Returns new free block id.
2116  */
pool_reuse_block(void)2117 static POOL_CACHE_BLOCKID pool_reuse_block(void)
2118 {
2119 	int			maxblock = pool_get_memqcache_blocks();
2120 	char	   *block = block_address(*pool_fsmm_clock_hand);
2121 	POOL_CACHE_BLOCK_HEADER *bh = (POOL_CACHE_BLOCK_HEADER *) block;
2122 	POOL_CACHE_BLOCKID reused_block;
2123 	POOL_CACHE_ITEM_POINTER *cip;
2124 	char	   *p;
2125 	int			i;
2126 
2127 	bh->flags = 0;
2128 	reused_block = *pool_fsmm_clock_hand;
2129 	p = block_address(reused_block);
2130 
2131 	for (i = 0; i < bh->num_items; i++)
2132 	{
2133 		cip = item_pointer(p, i);
2134 
2135 		if (!(POOL_ITEM_DELETED & cip->flags))
2136 		{
2137 			pool_hash_delete(&cip->query_hash);
2138 			ereport(DEBUG1,
2139 					(errmsg("pool_reuse_block: blockid: %d item: %d", reused_block, i)));
2140 		}
2141 	}
2142 
2143 	pool_init_cache_block(reused_block);
2144 	pool_update_fsmm(reused_block, POOL_MAX_FREE_SPACE);
2145 
2146 	(*pool_fsmm_clock_hand)++;
2147 	if (*pool_fsmm_clock_hand >= maxblock)
2148 		*pool_fsmm_clock_hand = 0;
2149 
2150 	ereport(LOG,
2151 			(errmsg("pool_reuse_block: blockid: %d", reused_block)));
2152 
2153 	return reused_block;
2154 }
2155 
2156 /*
2157  * Get block id which has enough space
2158  */
pool_get_block(size_t free_space)2159 static POOL_CACHE_BLOCKID pool_get_block(size_t free_space)
2160 {
2161 	int			encode_value;
2162 	unsigned char *p = pool_fsmm_address();
2163 	int			i;
2164 	int			maxblock = pool_get_memqcache_blocks();
2165 	POOL_CACHE_BLOCK_HEADER *bh;
2166 
2167 	if (p == NULL)
2168 	{
2169 		ereport(WARNING,
2170 				(errmsg("memcache: getting block: FSMM is not initialized")));
2171 		return -1;
2172 	}
2173 
2174 	if (free_space > POOL_MAX_FREE_SPACE)
2175 	{
2176 		ereport(WARNING,
2177 				(errmsg("memcache: getting block: invalid free space:%zd", free_space),
2178 				 errdetail("requested free space: %zd is more than maximum allowed space:%lu", free_space, POOL_MAX_FREE_SPACE)));
2179 		return -1;
2180 	}
2181 
2182 	encode_value = free_space / POOL_FSMM_RATIO;
2183 
2184 	for (i = 0; i < maxblock; i++)
2185 	{
2186 		if (p[i] >= encode_value)
2187 		{
2188 			/*
2189 			 * This block *may" have enough space. We need to make sure it
2190 			 * actually has enough space.
2191 			 */
2192 			bh = (POOL_CACHE_BLOCK_HEADER *) block_address(i);
2193 			if (bh->free_bytes >= free_space)
2194 			{
2195 				return (POOL_CACHE_BLOCKID) i;
2196 			}
2197 		}
2198 	}
2199 
2200 	/*
2201 	 * No enough space found. Reuse victim block
2202 	 */
2203 	return pool_reuse_block();
2204 }
2205 
2206 /*
2207  * Update free space info for specified block
2208  */
2209 static void
pool_update_fsmm(POOL_CACHE_BLOCKID blockid,size_t free_space)2210 pool_update_fsmm(POOL_CACHE_BLOCKID blockid, size_t free_space)
2211 {
2212 	int			encode_value;
2213 	char	   *p = pool_fsmm_address();
2214 
2215 	if (p == NULL)
2216 	{
2217 		ereport(WARNING,
2218 				(errmsg("memcache: updating free space in block: FSMM is not initialized")));
2219 		return;
2220 	}
2221 
2222 	if (blockid >= pool_get_memqcache_blocks())
2223 	{
2224 		ereport(WARNING,
2225 				(errmsg("memcache: updating free space in block: block id:%d in invalid", blockid)));
2226 		return;
2227 	}
2228 
2229 	if (free_space > POOL_MAX_FREE_SPACE)
2230 	{
2231 		ereport(WARNING,
2232 				(errmsg("memcache: updating free space in block: invalid free space:%zd", free_space),
2233 				 errdetail("requested free space: %zd is more than maximum allowed space:%lu", free_space, POOL_MAX_FREE_SPACE)));
2234 
2235 		return;
2236 	}
2237 
2238 	encode_value = free_space / POOL_FSMM_RATIO;
2239 
2240 	p[blockid] = encode_value;
2241 
2242 	return;
2243 }
2244 
2245 /*
2246  * Add item data to shared memory cache.
2247  * On successful registration, returns cache id.
2248  * The cache id is overwritten by the subsequent call to this function.
2249  * On error returns NULL.
2250  */
pool_add_item_shmem_cache(POOL_QUERY_HASH * query_hash,char * data,int size)2251 static POOL_CACHEID * pool_add_item_shmem_cache(POOL_QUERY_HASH * query_hash, char *data, int size)
2252 {
2253 	static POOL_CACHEID cacheid;
2254 	POOL_CACHE_BLOCKID blockid;
2255 	POOL_CACHE_BLOCK_HEADER *bh;
2256 	POOL_CACHE_ITEM_POINTER *cip;
2257 
2258 	POOL_CACHE_ITEM ci;
2259 	POOL_CACHE_ITEM_POINTER cip_body;
2260 	char	   *item;
2261 
2262 	int			request_size;
2263 	char	   *p;
2264 	int			i;
2265 	char	   *src;
2266 	char	   *dst;
2267 	int			num_deleted;
2268 	char	   *dcip;
2269 	char	   *dci;
2270 	bool		need_pack;
2271 	char	   *work_buffer;
2272 	int			index;
2273 
2274 	if (query_hash == NULL)
2275 	{
2276 		ereport(LOG,
2277 				(errmsg("error while adding item to shmem cache, query hash is NULL")));
2278 		return NULL;
2279 	}
2280 
2281 	if (data == NULL)
2282 	{
2283 		ereport(LOG,
2284 				(errmsg("error while adding item to shmem cache, data is NULL")));
2285 		return NULL;
2286 	}
2287 
2288 	if (size <= 0)
2289 	{
2290 		ereport(LOG,
2291 				(errmsg("error while adding item to shmem cache, invalid request size: %d", size)));
2292 		return NULL;
2293 	}
2294 
2295 	/* Add overhead */
2296 	request_size = size + sizeof(POOL_CACHE_ITEM_POINTER) + sizeof(POOL_CACHE_ITEM_HEADER);
2297 
2298 	/* Get cache block which has enough space */
2299 	blockid = pool_get_block(request_size);
2300 
2301 	if (blockid == -1)
2302 	{
2303 		return NULL;
2304 	}
2305 
2306 	/*
2307 	 * Initialize the block if necessary. If no live items are remained, we
2308 	 * also initialize the block. If there's contiguous deleted items, we turn
2309 	 * them into free space as well.
2310 	 */
2311 	pool_init_cache_block(blockid);
2312 
2313 	/*
2314 	 * Make sure that we have at least one free hash element.
2315 	 */
2316 	while (!is_free_hash_element())
2317 	{
2318 		/* If not, reuse next victim block */
2319 		blockid = pool_reuse_block();
2320 		pool_init_cache_block(blockid);
2321 	}
2322 
2323 	/* Get block address on shmem */
2324 	p = block_address(blockid);
2325 	bh = (POOL_CACHE_BLOCK_HEADER *) p;
2326 
2327 	/*
2328 	 * Create contiguous free space. We assume that item bodies are ordered
2329 	 * from bottom to top of the block, and corresponding item pointers are
2330 	 * ordered from the youngest to the oldest in the beginning of the block.
2331 	 */
2332 
2333 	/*
2334 	 * Optimization. If there's no deleted items, we don't need to pack it to
2335 	 * create contiguous free space.
2336 	 */
2337 	need_pack = false;
2338 	for (i = 0; i < bh->num_items; i++)
2339 	{
2340 		cip = item_pointer(p, i);
2341 
2342 		if (POOL_ITEM_DELETED & cip->flags) /* Deleted item? */
2343 		{
2344 			need_pack = true;
2345 			ereport(DEBUG1,
2346 					(errmsg("memcache adding item"),
2347 					 errdetail("start creating contiguous space")));
2348 			break;
2349 		}
2350 	}
2351 
2352 	/*
2353 	 * We disable packing for now. Revisit and remove following code fragment
2354 	 * later.
2355 	 */
2356 	need_pack = false;
2357 
2358 	if (need_pack)
2359 	{
2360 		/*
2361 		 * Pack and create contiguous free space.
2362 		 */
2363 		dcip = calloc(1, pool_config->memqcache_cache_block_size);
2364 		if (!dcip)
2365 		{
2366 			ereport(WARNING,
2367 					(errmsg("memcache: adding item to cache: calloc failed")));
2368 			return NULL;
2369 		}
2370 
2371 		work_buffer = dcip;
2372 		dci = dcip + pool_config->memqcache_cache_block_size;
2373 		num_deleted = 0;
2374 		index = 0;
2375 
2376 		for (i = 0; i < bh->num_items; i++)
2377 		{
2378 			int			total_length;
2379 			POOL_CACHEID cid;
2380 
2381 			cip = item_pointer(p, i);
2382 
2383 			if (POOL_ITEM_DELETED & cip->flags) /* Deleted item? */
2384 			{
2385 				num_deleted++;
2386 				continue;
2387 			}
2388 
2389 			/* Copy item body */
2390 			src = p + cip->offset;
2391 			total_length = item_header(p, i)->total_length;
2392 			dst = dci - total_length;
2393 			cip->offset = dst - work_buffer;
2394 			memcpy(dst, src, total_length);
2395 
2396 			dci -= total_length;
2397 
2398 			/* Copy item pointer */
2399 			src = (char *) cip;
2400 			dst = (char *) item_pointer(dcip, index);
2401 			memcpy(dst, src, sizeof(POOL_CACHE_ITEM_POINTER));
2402 
2403 			/* Update hash index */
2404 			cid.blockid = blockid;
2405 			cid.itemid = index;
2406 			pool_hash_insert(&cip->query_hash, &cid, true);
2407 			ereport(DEBUG1,
2408 					(errmsg("memcache adding item"),
2409 					 errdetail("item cid updated. old:%d %d new:%d %d",
2410 							   blockid, i, blockid, index)));
2411 			index++;
2412 		}
2413 
2414 		/* All items deleted? */
2415 		if (num_deleted > 0 && num_deleted == bh->num_items)
2416 		{
2417 			ereport(DEBUG1,
2418 					(errmsg("memcache adding item"),
2419 					 errdetail("all items deleted, total deleted:%d", num_deleted)));
2420 			bh->flags = 0;
2421 			pool_init_cache_block(blockid);
2422 			pool_update_fsmm(blockid, POOL_MAX_FREE_SPACE);
2423 		}
2424 		else
2425 		{
2426 			/* Update number of items */
2427 			bh->num_items -= num_deleted;
2428 
2429 			/* Copy back the packed block except block header */
2430 			memcpy(p + sizeof(POOL_CACHE_BLOCK_HEADER),
2431 				   work_buffer + sizeof(POOL_CACHE_BLOCK_HEADER),
2432 				   pool_config->memqcache_cache_block_size - sizeof(POOL_CACHE_BLOCK_HEADER));
2433 		}
2434 		free(work_buffer);
2435 	}
2436 
2437 	/*
2438 	 * Make sure that we have enough free space
2439 	 */
2440 	if (bh->free_bytes < request_size)
2441 	{
2442 		/* This should not happen */
2443 		ereport(WARNING,
2444 				(errmsg("memcache: adding item to cache: not enough space"),
2445 				 errdetail("free space: %d required: %d block id:%d",
2446 						   bh->free_bytes, request_size, blockid)));
2447 		return NULL;
2448 	}
2449 
2450 	/*
2451 	 * At this point, new item can be located at block_header->num_items
2452 	 */
2453 
2454 	/* Fill in cache item header */
2455 	ci.header.timestamp = time(NULL);
2456 	ci.header.total_length = sizeof(POOL_CACHE_ITEM_HEADER) + size;
2457 
2458 	/* Calculate item body address */
2459 	if (bh->num_items == 0)
2460 	{
2461 		/*
2462 		 * This is the #0 item. So address is block_bottom - data_length
2463 		 */
2464 		item = p + pool_config->memqcache_cache_block_size - ci.header.total_length;
2465 
2466 		/* Mark this block used */
2467 		bh->flags = POOL_BLOCK_USED;
2468 	}
2469 	else
2470 	{
2471 		cip = item_pointer(p, bh->num_items - 1);
2472 		item = p + cip->offset - ci.header.total_length;
2473 	}
2474 
2475 	/* Copy item header */
2476 	memcpy(item, &ci, sizeof(POOL_CACHE_ITEM_HEADER));
2477 	bh->free_bytes -= sizeof(POOL_CACHE_ITEM_HEADER);
2478 
2479 	/* Copy item body */
2480 	memcpy(item + sizeof(POOL_CACHE_ITEM_HEADER), data, size);
2481 	bh->free_bytes -= size;
2482 
2483 	/* Copy cache item pointer */
2484 	memcpy(&cip_body.query_hash, query_hash, sizeof(POOL_QUERY_HASH));
2485 	memset(&cip_body.next, 0, sizeof(POOL_CACHEID));
2486 	cip_body.offset = item - p;
2487 	cip_body.flags = POOL_ITEM_USED;
2488 	memcpy(item_pointer(p, bh->num_items), &cip_body, sizeof(POOL_CACHE_ITEM_POINTER));
2489 	bh->free_bytes -= sizeof(POOL_CACHE_ITEM_POINTER);
2490 
2491 	/* Update FSMM */
2492 	pool_update_fsmm(blockid, bh->free_bytes);
2493 
2494 	cacheid.blockid = blockid;
2495 	cacheid.itemid = bh->num_items;
2496 	ereport(DEBUG1,
2497 			(errmsg("memcache adding item"),
2498 			 errdetail("new item inserted. blockid: %d itemid:%d",
2499 					   cacheid.blockid, cacheid.itemid)));
2500 
2501 	/* Add up number of items */
2502 	bh->num_items++;
2503 
2504 	/* Update hash table */
2505 	if (pool_hash_insert(query_hash, &cacheid, false) < 0)
2506 	{
2507 		ereport(LOG,
2508 				(errmsg("error while adding item to shmem cache, hash insert failed")));
2509 
2510 		/*
2511 		 * Since we have failed to insert hash index entry, we need to undo
2512 		 * the addition of cache entry.
2513 		 */
2514 		pool_delete_item_shmem_cache(&cacheid);
2515 		return NULL;
2516 	}
2517 	ereport(DEBUG1,
2518 			(errmsg("memcache adding item"),
2519 			 errdetail("block: %d item: %d", cacheid.blockid, cacheid.itemid)));
2520 
2521 #ifdef SHMEMCACHE_DEBUG
2522 	dump_shmem_cache(blockid);
2523 #endif
2524 	return &cacheid;
2525 }
2526 
2527 /*
2528  * Returns item data address on shared memory cache specified by query hash.
2529  * Also data length is set to *size.
2530  * On error or data not found case returns NULL.
2531  * Detail is set to *sts. (0: success, 1: not found, -1: error)
2532  */
2533 static char *
pool_get_item_shmem_cache(POOL_QUERY_HASH * query_hash,int * size,int * sts)2534 pool_get_item_shmem_cache(POOL_QUERY_HASH * query_hash, int *size, int *sts)
2535 {
2536 	POOL_CACHEID *cacheid;
2537 	POOL_CACHE_ITEM_HEADER *cih;
2538 
2539 	if (sts == NULL)
2540 	{
2541 		ereport(LOG,
2542 				(errmsg("error while getting item from shmem cache, sts is NULL")));
2543 		return NULL;
2544 	}
2545 
2546 	*sts = -1;
2547 
2548 	if (query_hash == NULL)
2549 	{
2550 		ereport(LOG,
2551 				(errmsg("error while getting item from shmem cache, query hash is NULL")));
2552 		return NULL;
2553 	}
2554 
2555 	if (size == NULL)
2556 	{
2557 		ereport(LOG,
2558 				(errmsg("error while getting item from shmem cache, size is NULL")));
2559 		return NULL;
2560 	}
2561 
2562 	/*
2563 	 * Find cache header by using hash table
2564 	 */
2565 	cacheid = pool_find_item_on_shmem_cache(query_hash);
2566 	if (cacheid == NULL)
2567 	{
2568 		/* Not found */
2569 		*sts = 1;
2570 		return NULL;
2571 	}
2572 
2573 	cih = pool_cache_item_header(cacheid);
2574 
2575 	*size = cih->total_length - sizeof(POOL_CACHE_ITEM_HEADER);
2576 	return (char *) cih + sizeof(POOL_CACHE_ITEM_HEADER);
2577 }
2578 
2579 /*
2580  * Find data on shared memory cache specified query hash.
2581  * On success returns cache id.
2582  * The cache id is overwritten by the subsequent call to this function.
2583  */
pool_find_item_on_shmem_cache(POOL_QUERY_HASH * query_hash)2584 static POOL_CACHEID * pool_find_item_on_shmem_cache(POOL_QUERY_HASH * query_hash)
2585 {
2586 	static POOL_CACHEID cacheid;
2587 	POOL_CACHEID *c;
2588 	POOL_CACHE_ITEM_HEADER *cih;
2589 	time_t		now;
2590 
2591 	c = pool_hash_search(query_hash);
2592 	if (!c)
2593 	{
2594 		return NULL;
2595 	}
2596 
2597 	cih = item_header(block_address(c->blockid), c->itemid);
2598 
2599 	/* Check cache expiration */
2600 	if (pool_config->memqcache_expire > 0)
2601 	{
2602 		now = time(NULL);
2603 		if (now > (cih->timestamp + pool_config->memqcache_expire))
2604 		{
2605 			ereport(DEBUG1,
2606 					(errmsg("memcache finding item"),
2607 					 errdetail("cache expired: now: %ld timestamp: %ld",
2608 							   now, cih->timestamp + pool_config->memqcache_expire)));
2609 			pool_delete_item_shmem_cache(c);
2610 			return NULL;
2611 		}
2612 	}
2613 
2614 	cacheid.blockid = c->blockid;
2615 	cacheid.itemid = c->itemid;
2616 	return &cacheid;
2617 }
2618 
2619 /*
2620  * Delete item data specified cache id from shmem.
2621  * On successful deletion, returns 0.
2622  * Other wise return -1.
2623  * FSMM is also updated.
2624  */
2625 static int
pool_delete_item_shmem_cache(POOL_CACHEID * cacheid)2626 pool_delete_item_shmem_cache(POOL_CACHEID * cacheid)
2627 {
2628 	POOL_CACHE_BLOCK_HEADER *bh;
2629 	POOL_CACHE_ITEM_POINTER *cip;
2630 	POOL_CACHE_ITEM_HEADER *cih;
2631 	POOL_QUERY_HASH key;
2632 	int			size;
2633 
2634 	ereport(DEBUG1,
2635 			(errmsg("memcache deleting item data"),
2636 			 errdetail("cacheid:%d itemid:%d", cacheid->blockid, cacheid->itemid)));
2637 
2638 	if (cacheid->blockid >= pool_get_memqcache_blocks())
2639 	{
2640 		ereport(LOG,
2641 				(errmsg("error while deleting item from shmem cache, invalid block: %d",
2642 						cacheid->blockid)));
2643 		return -1;
2644 	}
2645 
2646 	bh = (POOL_CACHE_BLOCK_HEADER *) block_address(cacheid->blockid);
2647 	if (!(bh->flags & POOL_BLOCK_USED))
2648 	{
2649 		ereport(LOG,
2650 				(errmsg("error while deleting item from shmem cache, block: %d is not used",
2651 						cacheid->blockid)));
2652 		return -1;
2653 	}
2654 
2655 	if (cacheid->itemid >= bh->num_items)
2656 	{
2657 		/*
2658 		 * This could happen if the block is reused.  Since contents of oid
2659 		 * map file is not updated when the block is reused.
2660 		 */
2661 		ereport(DEBUG1,
2662 				(errmsg("memcache error deleting item data"),
2663 				 errdetail("invalid item id %d in block:%d",
2664 						   cacheid->itemid, cacheid->blockid)));
2665 
2666 		return -1;
2667 	}
2668 
2669 	cip = item_pointer(block_address(cacheid->blockid), cacheid->itemid);
2670 	if (!(cip->flags & POOL_ITEM_USED))
2671 	{
2672 		ereport(LOG,
2673 				(errmsg("error while deleting item from shmem cache, item: %d was not used",
2674 						cacheid->itemid)));
2675 		return -1;
2676 	}
2677 
2678 	if (cip->flags & POOL_ITEM_DELETED)
2679 	{
2680 		ereport(LOG,
2681 				(errmsg("error while deleting item from shmem cache, item: %d was already deleted",
2682 						cacheid->itemid)));
2683 		return -1;
2684 	}
2685 
2686 	/* Save cache key */
2687 	memcpy(&key, &cip->query_hash, sizeof(POOL_QUERY_HASH));
2688 
2689 	cih = pool_cache_item_header(cacheid);
2690 	size = cih->total_length + sizeof(POOL_CACHE_ITEM_POINTER);
2691 
2692 	/* Delete item pointer */
2693 	cip->flags |= POOL_ITEM_DELETED;
2694 
2695 	/*
2696 	 * We do NOT count down bh->num_items here. The deleted space will be
2697 	 * recycled by pool_add_item_shmem_cache(). However, if this is the last
2698 	 * item, we can recycle whole block.
2699 	 *
2700 	 * 2012/4/1: Now we do not pack data in pool_add_item_shmem_cache() for
2701 	 * performance reason. Also we count down num_items if it is the last one.
2702 	 */
2703 	if ((bh->num_items - 1) == 0)
2704 	{
2705 		ereport(DEBUG1,
2706 				(errmsg("memcache deleting item data"),
2707 				 errdetail("no item remains. initialize block")));
2708 		bh->flags = 0;
2709 		pool_init_cache_block(cacheid->blockid);
2710 	}
2711 
2712 	/* Remove hash index */
2713 	pool_hash_delete(&key);
2714 
2715 	/*
2716 	 * If the deleted item is last one in the block, we add it to the free
2717 	 * space.
2718 	 */
2719 	if (cacheid->itemid == (bh->num_items - 1))
2720 	{
2721 		bh->free_bytes += size;
2722 		ereport(DEBUG1,
2723 				(errmsg("memcache deleting item data"),
2724 				 errdetail("deleted %d bytes, freebytes is = %d",
2725 						   size, bh->free_bytes)));
2726 
2727 		bh->num_items--;
2728 	}
2729 
2730 	/* Update FSMM */
2731 	pool_update_fsmm(cacheid->blockid, bh->free_bytes);
2732 
2733 	return 0;
2734 }
2735 
2736 /*
2737  * Returns item header specified by cache id.
2738  */
pool_cache_item_header(POOL_CACHEID * cacheid)2739 static POOL_CACHE_ITEM_HEADER * pool_cache_item_header(POOL_CACHEID * cacheid)
2740 {
2741 	POOL_CACHE_BLOCK_HEADER *bh;
2742 
2743 	if (cacheid->blockid >= pool_get_memqcache_blocks())
2744 	{
2745 		ereport(WARNING,
2746 				(errmsg("error while getting cache item header, invalid block id: %d", cacheid->blockid)));
2747 		return NULL;
2748 	}
2749 
2750 	bh = (POOL_CACHE_BLOCK_HEADER *) block_address(cacheid->blockid);
2751 	if (cacheid->itemid >= bh->num_items)
2752 	{
2753 		ereport(WARNING,
2754 				(errmsg("error while getting cache item header, invalid item id: %d", cacheid->itemid)));
2755 		return NULL;
2756 	}
2757 
2758 	return item_header((char *) bh, cacheid->itemid);
2759 }
2760 
2761 /*
2762  * Initialize specified block.
2763  */
2764 static int
pool_init_cache_block(POOL_CACHE_BLOCKID blockid)2765 pool_init_cache_block(POOL_CACHE_BLOCKID blockid)
2766 {
2767 	char	   *p;
2768 	POOL_CACHE_BLOCK_HEADER *bh;
2769 
2770 	if (blockid >= pool_get_memqcache_blocks())
2771 	{
2772 		ereport(WARNING,
2773 				(errmsg("error while initializing cache block, invalid block id: %d", blockid)));
2774 		return -1;
2775 	}
2776 
2777 	p = block_address(blockid);
2778 	bh = (POOL_CACHE_BLOCK_HEADER *) p;
2779 
2780 	/* Is this block used? */
2781 	if (!(bh->flags & POOL_BLOCK_USED))
2782 	{
2783 		/* Initialize empty block */
2784 		memset(p, 0, pool_config->memqcache_cache_block_size);
2785 		bh->free_bytes = pool_config->memqcache_cache_block_size -
2786 			sizeof(POOL_CACHE_BLOCK_HEADER);
2787 	}
2788 	return 0;
2789 }
2790 
2791 #if NOT_USED
2792 /*
2793  * Delete all items in the block.
2794  */
2795 static void
pool_wipe_out_cache_block(POOL_CACHE_BLOCKID blockid)2796 pool_wipe_out_cache_block(POOL_CACHE_BLOCKID blockid)
2797 {
2798 	char	   *p;
2799 	POOL_CACHE_BLOCK_HEADER *bh;
2800 	POOL_CACHE_ITEM_POINTER *cip;
2801 	POOL_CACHEID cacheid;
2802 	int			i;
2803 
2804 	/* Get block address on shmem */
2805 	p = block_address(blockid);
2806 	bh = (POOL_CACHE_BLOCK_HEADER *) p;
2807 	cacheid.blockid = blockid;
2808 
2809 	for (i = 0; i < bh->num_items; i++)
2810 	{
2811 		cip = item_pointer(p, i);
2812 
2813 		if ((POOL_ITEM_DELETED & cip->flags) == 0)	/* Not deleted item? */
2814 		{
2815 			cacheid.itemid = i;
2816 			pool_delete_item_shmem_cache(&cacheid);
2817 		}
2818 	}
2819 
2820 	bh->flags = 0;
2821 	pool_init_cache_block(blockid);
2822 	pool_update_fsmm(blockid, POOL_MAX_FREE_SPACE);
2823 }
2824 #endif
2825 
2826 /*
2827  * Acquire lock: XXX giant lock
2828  */
2829 void
pool_shmem_lock(void)2830 pool_shmem_lock(void)
2831 {
2832 	if (pool_is_shmem_cache())
2833 	{
2834 		pool_semaphore_lock(SHM_CACHE_SEM);
2835 
2836 	}
2837 }
2838 
2839 /*
2840  * Release lock
2841  */
2842 void
pool_shmem_unlock(void)2843 pool_shmem_unlock(void)
2844 {
2845 	if (pool_is_shmem_cache())
2846 	{
2847 		pool_semaphore_unlock(SHM_CACHE_SEM);
2848 	}
2849 }
2850 
2851 /*
2852  * Returns cache block address specified by block id
2853  */
2854 static char *
block_address(int blockid)2855 block_address(int blockid)
2856 {
2857 	char	   *p;
2858 
2859 	p = pool_memory_cache_address() +
2860 		blockid * pool_config->memqcache_cache_block_size;
2861 	return p;
2862 }
2863 
2864 /*
2865  * Returns i th item pointer in block address block
2866  */
item_pointer(char * block,int i)2867 static POOL_CACHE_ITEM_POINTER * item_pointer(char *block, int i)
2868 {
2869 	return (POOL_CACHE_ITEM_POINTER *) (block + sizeof(POOL_CACHE_BLOCK_HEADER) +
2870 										sizeof(POOL_CACHE_ITEM_POINTER) * i);
2871 }
2872 
2873 /*
2874  * Returns i th item header in block address block
2875  */
item_header(char * block,int i)2876 static POOL_CACHE_ITEM_HEADER * item_header(char *block, int i)
2877 {
2878 	POOL_CACHE_ITEM_POINTER *cip;
2879 
2880 	cip = item_pointer(block, i);
2881 	return (POOL_CACHE_ITEM_HEADER *) (block + cip->offset);
2882 }
2883 
2884 #ifdef SHMEMCACHE_DEBUG
2885 /*
2886  * Dump shmem cache block
2887  */
2888 static void
dump_shmem_cache(POOL_CACHE_BLOCKID blockid)2889 dump_shmem_cache(POOL_CACHE_BLOCKID blockid)
2890 {
2891 	POOL_CACHE_BLOCK_HEADER *bh;
2892 	POOL_CACHE_ITEM_POINTER *cip;
2893 	POOL_CACHE_ITEM_HEADER *cih;
2894 	int			i;
2895 
2896 	bh = (POOL_CACHE_BLOCK_HEADER *) block_address(blockid);
2897 	fprintf(stderr, "shmem: block header(%lu bytes): flags:%x num_items:%d free_bytes:%d\n",
2898 			sizeof(*bh), bh->flags, bh->num_items, bh->free_bytes);
2899 	for (i = 0; i < bh->num_items; i++)
2900 	{
2901 		cip = item_pointer((char *) bh, i);
2902 		fprintf(stderr, "shmem: block: %d %d th item pointer(%lu bytes): offset:%d flags:%x\n",
2903 				blockid, i, sizeof(*cip), cip->offset, cip->flags);
2904 		cih = item_header((char *) bh, i);
2905 		fprintf(stderr, "shmem: block: %d %d th item header(%lu bytes): timestamp:%ld length:%d\n",
2906 				blockid, i, sizeof(*cih), cih->timestamp, cih->total_length);
2907 	}
2908 }
2909 #endif
2910 
2911 /*
2912  * SELECT query result array modules
2913  */
2914 POOL_QUERY_CACHE_ARRAY *
pool_create_query_cache_array(void)2915 pool_create_query_cache_array(void)
2916 {
2917 #define POOL_QUERY_CACHE_ARRAY_ALLOCATE_NUM 128
2918 #define POOL_QUERY_CACHE_ARRAY_HEADER_SIZE (sizeof(int)+sizeof(int))
2919 
2920 	size_t		size;
2921 	POOL_QUERY_CACHE_ARRAY *p;
2922 	POOL_SESSION_CONTEXT *session_context = pool_get_session_context(false);
2923 	MemoryContext old_context = MemoryContextSwitchTo(session_context->memory_context);
2924 
2925 	size = POOL_QUERY_CACHE_ARRAY_HEADER_SIZE + POOL_QUERY_CACHE_ARRAY_ALLOCATE_NUM *
2926 		sizeof(POOL_TEMP_QUERY_CACHE *);
2927 	p = palloc(size);
2928 	p->num_caches = 0;
2929 	p->array_size = POOL_QUERY_CACHE_ARRAY_ALLOCATE_NUM;
2930 	MemoryContextSwitchTo(old_context);
2931 	return p;
2932 }
2933 
2934 /*
2935  * Discard query cache array
2936  */
2937 void
pool_discard_query_cache_array(POOL_QUERY_CACHE_ARRAY * cache_array)2938 pool_discard_query_cache_array(POOL_QUERY_CACHE_ARRAY * cache_array)
2939 {
2940 	int			i;
2941 
2942 	if (!cache_array)
2943 		return;
2944 
2945 	ereport(DEBUG1,
2946 			(errmsg("memcache discarding query cache array"),
2947 			 errdetail("num_caches: %d", cache_array->num_caches)));
2948 
2949 	for (i = 0; i < cache_array->num_caches; i++)
2950 	{
2951 		ereport(DEBUG2,
2952 				(errmsg("memcache discarding query cache array"),
2953 				 errdetail("cache no: %d cache: %p", i, cache_array->caches[i])));
2954 		pool_discard_temp_query_cache(cache_array->caches[i]);
2955 	}
2956 	pfree(cache_array);
2957 }
2958 
2959 /*
2960  * Add query cache array
2961  */
pool_add_query_cache_array(POOL_QUERY_CACHE_ARRAY * cache_array,POOL_TEMP_QUERY_CACHE * cache)2962 static POOL_QUERY_CACHE_ARRAY * pool_add_query_cache_array(POOL_QUERY_CACHE_ARRAY * cache_array, POOL_TEMP_QUERY_CACHE * cache)
2963 {
2964 	size_t		size;
2965 	POOL_QUERY_CACHE_ARRAY *cp = cache_array;
2966 
2967 	if (!cache_array)
2968 		return cp;
2969 
2970 	ereport(DEBUG2,
2971 			(errmsg("memcache adding query cache array"),
2972 			 errdetail("num_caches: %d cache: %p", cache_array->num_caches, cache)));
2973 	if (cache_array->num_caches >= cache_array->array_size)
2974 	{
2975 		cache_array->array_size += POOL_QUERY_CACHE_ARRAY_ALLOCATE_NUM;
2976 		size = POOL_QUERY_CACHE_ARRAY_HEADER_SIZE + cache_array->array_size *
2977 			sizeof(POOL_TEMP_QUERY_CACHE *);
2978 		cache_array = repalloc(cache_array, size);
2979 	}
2980 	cache_array->caches[cache_array->num_caches++] = cache;
2981 	return cache_array;
2982 }
2983 
2984 /*
2985  * SELECT query result temporary cache modules
2986  */
2987 
2988 /*
2989  * Create SELECT result temporary cache
2990  */
2991 POOL_TEMP_QUERY_CACHE *
pool_create_temp_query_cache(char * query)2992 pool_create_temp_query_cache(char *query)
2993 {
2994 	POOL_SESSION_CONTEXT *session_context = pool_get_session_context(false);
2995 	MemoryContext old_context = MemoryContextSwitchTo(session_context->memory_context);
2996 	POOL_TEMP_QUERY_CACHE *p;
2997 
2998 	p = palloc(sizeof(*p));
2999 	p->query = pstrdup(query);
3000 
3001 	p->buffer = pool_create_buffer();
3002 	p->oids = pool_create_buffer();
3003 	p->num_oids = 0;
3004 	p->is_exceeded = false;
3005 	p->is_discarded = false;
3006 
3007 	MemoryContextSwitchTo(old_context);
3008 
3009 	ereport(DEBUG1,
3010 			(errmsg("pool_create_temp_query_cache: cache created: %p", p)));
3011 
3012 	return p;
3013 }
3014 
3015 /*
3016  * Discard temp query cache
3017  */
3018 void
pool_discard_temp_query_cache(POOL_TEMP_QUERY_CACHE * temp_cache)3019 pool_discard_temp_query_cache(POOL_TEMP_QUERY_CACHE * temp_cache)
3020 {
3021 	if (!temp_cache)
3022 		return;
3023 
3024 	if (temp_cache->query)
3025 		pfree(temp_cache->query);
3026 	if (temp_cache->buffer)
3027 		pool_discard_buffer(temp_cache->buffer);
3028 	if (temp_cache->oids)
3029 		pool_discard_buffer(temp_cache->oids);
3030 
3031 	ereport(DEBUG1,
3032 			(errmsg("pool_discard_temp_query_cache: cache discarded: %p", temp_cache)));
3033 
3034 	pfree(temp_cache);
3035 }
3036 
3037 /*
3038  * Add data to temp query cache.
3039  * Data must be FE/BE protocol packet.
3040  */
3041 static void
pool_add_temp_query_cache(POOL_TEMP_QUERY_CACHE * temp_cache,char kind,char * data,int data_len)3042 pool_add_temp_query_cache(POOL_TEMP_QUERY_CACHE * temp_cache, char kind, char *data, int data_len)
3043 {
3044 	POOL_INTERNAL_BUFFER *buffer;
3045 	size_t		buflen;
3046 	int			send_len;
3047 
3048 	if (temp_cache == NULL)
3049 	{
3050 		/*
3051 		 * This could happen if cache exceeded in previous query execution in
3052 		 * the same unnamed portal.
3053 		 */
3054 		ereport(DEBUG1,
3055 				(errmsg("memcache adding temporary query cache"),
3056 				 errdetail("POOL_TEMP_QUERY_CACHE is NULL")));
3057 		return;
3058 	}
3059 
3060 	if (temp_cache->is_exceeded)
3061 	{
3062 		ereport(DEBUG1,
3063 				(errmsg("memcache adding temporary query cache"),
3064 				 errdetail("memqcache_maxcache exceeds")));
3065 		return;
3066 	}
3067 
3068 	/*
3069 	 * We only store T(Table Description), D(Data row), C(Command Complete),
3070 	 * 1(ParseComplete), 2(BindComplete)
3071 	 */
3072 	if (kind != 'T' && kind != 'D' && kind != 'C' && kind != '1' && kind != '2')
3073 	{
3074 		return;
3075 	}
3076 
3077 	/* Check data limit */
3078 	buffer = temp_cache->buffer;
3079 	buflen = pool_get_buffer_length(buffer);
3080 
3081 	if ((buflen + data_len + sizeof(int) + 1) > pool_config->memqcache_maxcache)
3082 	{
3083 		ereport(DEBUG1,
3084 				(errmsg("memcache adding temporary query cache"),
3085 				 errdetail("data size exceeds memqcache_maxcache. current:%zd requested:%zd memq_maxcache:%d",
3086 						   buflen, data_len + sizeof(int) + 1, pool_config->memqcache_maxcache)));
3087 		temp_cache->is_exceeded = true;
3088 		return;
3089 	}
3090 
3091 	pool_add_buffer(buffer, &kind, 1);
3092 	send_len = htonl(data_len + sizeof(int));
3093 	pool_add_buffer(buffer, (char *) &send_len, sizeof(int));
3094 	pool_add_buffer(buffer, data, data_len);
3095 
3096 	return;
3097 }
3098 
3099 /*
3100  * Add table oids used by SELECT to temp query cache.
3101  */
3102 static void
pool_add_oids_temp_query_cache(POOL_TEMP_QUERY_CACHE * temp_cache,int num_oids,int * oids)3103 pool_add_oids_temp_query_cache(POOL_TEMP_QUERY_CACHE * temp_cache, int num_oids, int *oids)
3104 {
3105 	POOL_INTERNAL_BUFFER *buffer;
3106 
3107 	if (!temp_cache || num_oids <= 0)
3108 		return;
3109 
3110 	buffer = temp_cache->oids;
3111 	pool_add_buffer(buffer, oids, num_oids * sizeof(int));
3112 	temp_cache->num_oids = num_oids;
3113 }
3114 
3115 /*
3116  * Internal buffer management modules.
3117  * Usage:
3118  * 1) Create buffer using pool_create_buffer().
3119  * 2) Add data to buffer using pool_add_buffer().
3120  * 3) Extract (copied) data from buffer using pool_get_buffer().
3121  * 4) Optionally you can:
3122  *		Obtain buffer length by using pool_get_buffer_length().
3123  *		Obtain buffer pointer by using pool_get_buffer_pointer().
3124  * 5) Discard buffer using pool_discard_buffer().
3125  */
3126 
3127 /*
3128  * Create and return internal buffer
3129  */
pool_create_buffer(void)3130 static POOL_INTERNAL_BUFFER * pool_create_buffer(void)
3131 {
3132 	POOL_INTERNAL_BUFFER *p;
3133 
3134 	p = palloc0(sizeof(*p));
3135 	return p;
3136 }
3137 
3138 /*
3139  * Discard internal buffer
3140  */
3141 static void
pool_discard_buffer(POOL_INTERNAL_BUFFER * buffer)3142 pool_discard_buffer(POOL_INTERNAL_BUFFER * buffer)
3143 {
3144 	if (buffer)
3145 	{
3146 		if (buffer->buf)
3147 			pfree(buffer->buf);
3148 		pfree(buffer);
3149 	}
3150 }
3151 
3152 /*
3153  * Add data to internal buffer
3154  */
3155 static void
pool_add_buffer(POOL_INTERNAL_BUFFER * buffer,void * data,size_t len)3156 pool_add_buffer(POOL_INTERNAL_BUFFER * buffer, void *data, size_t len)
3157 {
3158 #define POOL_ALLOCATE_UNIT 8192
3159 
3160 	/* Sanity check */
3161 	if (!buffer || !data || len == 0)
3162 		return;
3163 	POOL_SESSION_CONTEXT *session_context = pool_get_session_context(false);
3164 	MemoryContext old_context = MemoryContextSwitchTo(session_context->memory_context);
3165 
3166 	/* Check if we need to increase the buffer size */
3167 	if ((buffer->buflen + len) > buffer->bufsize)
3168 	{
3169 		size_t		allocate_size = ((buffer->buflen + len) / POOL_ALLOCATE_UNIT + 1) * POOL_ALLOCATE_UNIT;
3170 
3171 		ereport(DEBUG2,
3172 				(errmsg("memcache adding data to internal buffer"),
3173 				 errdetail("realloc old size:%zd new size:%zd",
3174 						   buffer->bufsize, allocate_size)));
3175 		buffer->bufsize = allocate_size;
3176 		buffer->buf = (char *) repalloc(buffer->buf, buffer->bufsize);
3177 	}
3178 	/* Add data to buffer */
3179 	memcpy(buffer->buf + buffer->buflen, data, len);
3180 	buffer->buflen += len;
3181 	ereport(DEBUG2,
3182 			(errmsg("memcache adding data to internal buffer"),
3183 			 errdetail("len:%zd, total:%zd bufsize:%zd",
3184 					   len, buffer->buflen, buffer->bufsize)));
3185 	MemoryContextSwitchTo(old_context);
3186 	return;
3187 }
3188 
3189 /*
3190  * Get data from internal buffer.
3191  * Data is stored in newly malloc memory.
3192  * Data length is returned to len.
3193  */
3194 static void *
pool_get_buffer(POOL_INTERNAL_BUFFER * buffer,size_t * len)3195 pool_get_buffer(POOL_INTERNAL_BUFFER * buffer, size_t *len)
3196 {
3197 	void	   *p;
3198 
3199 	if (buffer->bufsize == 0 || buffer->buflen == 0 ||
3200 		buffer->buf == NULL)
3201 	{
3202 		*len = 0;
3203 		return NULL;
3204 	}
3205 
3206 	p = palloc(buffer->buflen);
3207 	memcpy(p, buffer->buf, buffer->buflen);
3208 	*len = buffer->buflen;
3209 	return p;
3210 }
3211 
3212 /*
3213  * Get internal buffer length.
3214  */
3215 static size_t
pool_get_buffer_length(POOL_INTERNAL_BUFFER * buffer)3216 pool_get_buffer_length(POOL_INTERNAL_BUFFER * buffer)
3217 {
3218 	if (buffer == NULL)
3219 		return 0;
3220 
3221 	return buffer->buflen;
3222 }
3223 
3224 #ifdef NOT_USED
3225 /*
3226  * Get internal buffer pointer.
3227  */
3228 static char *
pool_get_buffer_pointer(POOL_INTERNAL_BUFFER * buffer)3229 pool_get_buffer_pointer(POOL_INTERNAL_BUFFER * buffer)
3230 {
3231 	if (buffer == NULL)
3232 		return NULL;
3233 	return buffer->buf;
3234 }
3235 #endif
3236 /*
3237  * Get query cache buffer struct of current query context
3238  */
3239 POOL_TEMP_QUERY_CACHE *
pool_get_current_cache(void)3240 pool_get_current_cache(void)
3241 {
3242 	POOL_SESSION_CONTEXT *session_context;
3243 	POOL_QUERY_CONTEXT *query_context;
3244 	POOL_TEMP_QUERY_CACHE *p = NULL;
3245 
3246 	session_context = pool_get_session_context(true);
3247 	if (session_context)
3248 	{
3249 		query_context = session_context->query_context;
3250 		if (query_context)
3251 		{
3252 			p = query_context->temp_cache;
3253 		}
3254 	}
3255 	return p;
3256 }
3257 
3258 /*
3259  * Get query cache buffer of current query context
3260  */
3261 static char *
pool_get_current_cache_buffer(size_t * len)3262 pool_get_current_cache_buffer(size_t *len)
3263 {
3264 	char	   *p = NULL;
3265 
3266 	*len = 0;
3267 	POOL_TEMP_QUERY_CACHE *cache;
3268 
3269 	cache = pool_get_current_cache();
3270 	if (cache)
3271 	{
3272 		p = pool_get_buffer(cache->buffer, len);
3273 	}
3274 	return p;
3275 }
3276 
3277 /*
3278  * Mark this temporary query cache buffer discarded if the SELECT
3279  * uses the table oid specified by oids.
3280  */
3281 static void
pool_check_and_discard_cache_buffer(int num_oids,int * oids)3282 pool_check_and_discard_cache_buffer(int num_oids, int *oids)
3283 {
3284 	POOL_SESSION_CONTEXT *session_context;
3285 	POOL_TEMP_QUERY_CACHE *cache;
3286 	int			num_caches;
3287 	size_t		len;
3288 	int		   *soids;
3289 	int			i,
3290 				j,
3291 				k;
3292 
3293 	session_context = pool_get_session_context(true);
3294 
3295 	if (!session_context || !session_context->query_cache_array)
3296 		return;
3297 
3298 	num_caches = session_context->query_cache_array->num_caches;
3299 
3300 	for (i = 0; i < num_caches; i++)
3301 	{
3302 		cache = session_context->query_cache_array->caches[i];
3303 		if (!cache || cache->is_discarded)
3304 			continue;
3305 
3306 		soids = (int *) pool_get_buffer(cache->oids, &len);
3307 		if (!soids || !len)
3308 			continue;
3309 
3310 		for (j = 0; j < cache->num_oids; j++)
3311 		{
3312 			if (cache->is_discarded)
3313 				break;
3314 
3315 			for (k = 0; k < num_oids; k++)
3316 			{
3317 				if (soids[j] == oids[k])
3318 				{
3319 					ereport(DEBUG1,
3320 							(errmsg("discard cache for \"%s\"", cache->query)));
3321 					cache->is_discarded = true;
3322 					break;
3323 				}
3324 			}
3325 		}
3326 		pfree(soids);
3327 	}
3328 }
3329 
3330 /*
3331  * At Ready for Query or Comand Complete handle query cache.  For streaming
3332  * replication mode and extended query at Comand Complete handle query cache.
3333  * For other case At Ready for Query handle query cache.
3334  */
3335 void
pool_handle_query_cache(POOL_CONNECTION_POOL * backend,char * query,Node * node,char state)3336 pool_handle_query_cache(POOL_CONNECTION_POOL * backend, char *query, Node *node, char state)
3337 {
3338 	POOL_SESSION_CONTEXT *session_context;
3339 	pool_sigset_t oldmask;
3340 	char	   *cache_buffer;
3341 	size_t		len;
3342 	int			num_oids;
3343 	int		   *oids;
3344 	int			i;
3345 
3346 	session_context = pool_get_session_context(true);
3347 
3348 	/* Ok to cache SELECT result? */
3349 	if (pool_is_cache_safe())
3350 	{
3351 		SelectContext ctx;
3352 		MemoryContext old_context;
3353 
3354 		old_context = MemoryContextSwitchTo(session_context->memory_context);
3355 		num_oids = pool_extract_table_oids_from_select_stmt(node, &ctx);
3356 		MemoryContextSwitchTo(old_context);
3357 		oids = ctx.table_oids;
3358 		ereport(DEBUG2,
3359 				(errmsg("query cache handler for ReadyForQuery"),
3360 				 errdetail("num_oids: %d oid: %d", num_oids, *oids)));
3361 
3362 		if (state == 'I')		/* Not inside a transaction? */
3363 		{
3364 			/*
3365 			 * Make sure that temporary cache is not exceeded.
3366 			 */
3367 			if (!pool_is_cache_exceeded())
3368 			{
3369 				POOL_TEMP_QUERY_CACHE *cache;
3370 
3371 				/*
3372 				 * If we are not inside a transaction, we can immediately
3373 				 * register to cache storage.
3374 				 */
3375 				/* Register to memcached or shmem */
3376 				POOL_SETMASK2(&BlockSig, &oldmask);
3377 				pool_shmem_lock();
3378 
3379 				cache_buffer = pool_get_current_cache_buffer(&len);
3380 				if (cache_buffer)
3381 				{
3382 					if (session_context->query_context->skip_cache_commit == false)
3383 					{
3384 						if (pool_commit_cache(backend, query, cache_buffer, len, num_oids, oids) != 0)
3385 						{
3386 							ereport(WARNING,
3387 									(errmsg("ReadyForQuery: pool_commit_cache failed")));
3388 						}
3389 					}
3390 
3391 					/*
3392 					 * Reset temporary query cache buffer. This is necessary
3393 					 * if extended query protocol is used and a bind/execute
3394 					 * message arrives which uses a statement created by prior
3395 					 * parse message. In this case since the temp_cache is not
3396 					 * initialized by a parse message, messages are added to
3397 					 * pre existing temp cache buffer. The problem was found
3398 					 * in bug#152.
3399 					 * http://www.pgpool.net/mantisbt/view.php?id=152
3400 					 */
3401 					cache = pool_get_current_cache();
3402 					ereport(DEBUG1,
3403 							(errmsg("pool_handle_query_cache: temp_cache: %p", cache)));
3404 					pool_discard_temp_query_cache(cache);
3405 
3406 					if (SL_MODE && pool_is_doing_extended_query_message())
3407 						session_context->query_context->temp_cache = NULL;
3408 					else
3409 						session_context->query_context->temp_cache = pool_create_temp_query_cache(query);
3410 					pfree(cache_buffer);
3411 				}
3412 				pool_shmem_unlock();
3413 				POOL_SETMASK(&oldmask);
3414 			}
3415 
3416 			/* Count up SELECT stats */
3417 			pool_stats_count_up_num_selects(1);
3418 
3419 			/* Reset temp buffer */
3420 			pool_reset_memqcache_buffer(true);
3421 		}
3422 		else
3423 		{
3424 			POOL_TEMP_QUERY_CACHE *cache = pool_get_current_cache();
3425 
3426 			/* In transaction. Keep to temp query cache array */
3427 			pool_add_oids_temp_query_cache(cache, num_oids, oids);
3428 
3429 			/*
3430 			 * If temp cache has been overflowed, just trash the half baked
3431 			 * temp cache.
3432 			 */
3433 			if (pool_is_cache_exceeded())
3434 			{
3435 				POOL_TEMP_QUERY_CACHE *cache;
3436 
3437 				cache = pool_get_current_cache();
3438 				pool_discard_temp_query_cache(cache);
3439 
3440 				/*
3441 				 * Reset temp_cache pointer in the current query context so
3442 				 * that we don't double free memory.
3443 				 */
3444 				session_context->query_context->temp_cache = NULL;
3445 
3446 			}
3447 
3448 			/*
3449 			 * Otherwise add to the temp cache array.
3450 			 */
3451 			else
3452 			{
3453 				session_context->query_cache_array =
3454 					pool_add_query_cache_array(session_context->query_cache_array, cache);
3455 
3456 				/*
3457 				 * Reset temp_cache pointer in the current query context so
3458 				 * that we don't add the same temp cache to the cache array.
3459 				 * This is necessary such that case when next query is just a
3460 				 * "bind message", without "parse message". In the case the
3461 				 * query context is reused and same cache pointer will be
3462 				 * added to the query_cache_array which we do not want.
3463 				 */
3464 				session_context->query_context->temp_cache = NULL;
3465 			}
3466 
3467 			/* Count up temporary SELECT stats */
3468 			pool_tmp_stats_count_up_num_selects();
3469 		}
3470 	}
3471 	else if (is_rollback_query(node))	/* Rollback? */
3472 	{
3473 		/* Discard buffered data */
3474 		pool_reset_memqcache_buffer(true);
3475 	}
3476 	else if (is_commit_query(node)) /* Commit? */
3477 	{
3478 		int			num_caches;
3479 
3480 		POOL_SETMASK2(&BlockSig, &oldmask);
3481 		pool_shmem_lock();
3482 
3483 		/* Invalidate query cache */
3484 		if (pool_config->memqcache_auto_cache_invalidation)
3485 		{
3486 			num_oids = pool_get_dml_table_oid(&oids);
3487 			pool_invalidate_query_cache(num_oids, oids, true, 0);
3488 		}
3489 
3490 		/*--------------------------------------------------------------------
3491 		 * If we have something in the query cache buffer, that means either:
3492 		 * - We only had SELECTs in the transaction
3493 		 * - We had only SELECTs after the last DML
3494 		 * Thus we can register SELECT results to cache storage.
3495 		 *--------------------------------------------------------------------
3496 		 */
3497 		num_caches = session_context->query_cache_array->num_caches;
3498 		for (i = 0; i < num_caches; i++)
3499 		{
3500 			POOL_TEMP_QUERY_CACHE *cache;
3501 
3502 			cache = session_context->query_cache_array->caches[i];
3503 			if (!cache || cache->is_discarded)
3504 				continue;
3505 
3506 			num_oids = cache->num_oids;
3507 			oids = pool_get_buffer(cache->oids, &len);
3508 			cache_buffer = pool_get_buffer(cache->buffer, &len);
3509 
3510 			if (pool_commit_cache(backend, cache->query, cache_buffer, len, num_oids, oids) != 0)
3511 			{
3512 				ereport(WARNING,
3513 						(errmsg("ReadyForQuery: pool_commit_cache failed")));
3514 			}
3515 			if (oids)
3516 				pfree(oids);
3517 			if (cache_buffer)
3518 				pfree(cache_buffer);
3519 		}
3520 		pool_shmem_unlock();
3521 		POOL_SETMASK(&oldmask);
3522 
3523 		/* Count up number of SELECT stats */
3524 		pool_stats_count_up_num_selects(pool_tmp_stats_get_num_selects());
3525 
3526 		pool_reset_memqcache_buffer(true);
3527 	}
3528 	else						/* Non cache safe queries */
3529 	{
3530 		/* Non cachable SELECT */
3531 		if (node && IsA(node, SelectStmt))
3532 		{
3533 			/* Extract table oids from buffer */
3534 			num_oids = pool_get_dml_table_oid(&oids);
3535 
3536 			if (state == 'I')
3537 			{
3538 				/*
3539 				 * If Data-modifying statements in SELECT's WITH clause,
3540 				 * invalidate query cache.
3541 				 */
3542 				if (num_oids > 0 && pool_config->memqcache_auto_cache_invalidation)
3543 				{
3544 					POOL_SETMASK2(&BlockSig, &oldmask);
3545 					pool_shmem_lock();
3546 					pool_invalidate_query_cache(num_oids, oids, true, 0);
3547 					pool_shmem_unlock();
3548 					POOL_SETMASK(&oldmask);
3549 				}
3550 
3551 				/* Count up SELECT stats */
3552 				pool_stats_count_up_num_selects(1);
3553 				pool_reset_memqcache_buffer(true);
3554 			}
3555 			else
3556 			{
3557 				/*
3558 				 * If we are inside a transaction, we cannot invalidate
3559 				 * query cache yet. However we can clear cache buffer, if
3560 				 * DML/DDL modifies the TABLE which SELECT uses.
3561 				 */
3562 				if (num_oids > 0 && pool_config->memqcache_auto_cache_invalidation)
3563 				{
3564 					pool_check_and_discard_cache_buffer(num_oids, oids);
3565 					pool_reset_memqcache_buffer(false);
3566 				}
3567 
3568 				/* Count up temporary SELECT stats */
3569 				pool_tmp_stats_count_up_num_selects();
3570 			}
3571 		}
3572 
3573 		/*
3574 		 * If the query is DROP DATABASE, discard both of caches in
3575 		 * shmem/memcached and oidmap in memqcache_oiddir.
3576 		 */
3577 		else if (is_drop_database(node) && session_context->query_context->dboid != 0)
3578 		{
3579 			int			dboid = session_context->query_context->dboid;
3580 
3581 			num_oids = pool_get_dropdb_table_oids(&oids, dboid);
3582 
3583 			if (num_oids > 0 && pool_config->memqcache_auto_cache_invalidation)
3584 			{
3585 				pool_shmem_lock();
3586 				pool_invalidate_query_cache(num_oids, oids, true, dboid);
3587 				pool_discard_oid_maps_by_db(dboid);
3588 				pool_shmem_unlock();
3589 				pool_reset_memqcache_buffer(true);
3590 
3591 				pfree(oids);
3592 				ereport(DEBUG2,
3593 						(errmsg("query cache handler for ReadyForQuery"),
3594 						 errdetail("deleted all cache files for the DROPped DB")));
3595 			}
3596 		}
3597 		else
3598 		{
3599 			/*
3600 			 * DML/DCL/DDL case
3601 			 */
3602 
3603 			/* Extract table oids from buffer */
3604 			num_oids = pool_get_dml_table_oid(&oids);
3605 			if (num_oids > 0 && pool_config->memqcache_auto_cache_invalidation)
3606 			{
3607 				/*
3608 				 * If we are not inside a transaction, we can immediately
3609 				 * invalidate query cache.
3610 				 */
3611 				if (state == 'I')
3612 				{
3613 					POOL_SETMASK2(&BlockSig, &oldmask);
3614 					pool_shmem_lock();
3615 					pool_invalidate_query_cache(num_oids, oids, true, 0);
3616 					pool_shmem_unlock();
3617 					POOL_SETMASK(&oldmask);
3618 					pool_reset_memqcache_buffer(true);
3619 				}
3620 				else
3621 				{
3622 					/*
3623 					 * If we are inside a transaction, we cannot invalidate
3624 					 * query cache yet. However we can clear cache buffer, if
3625 					 * DML/DDL modifies the TABLE which SELECT uses.
3626 					 */
3627 					pool_check_and_discard_cache_buffer(num_oids, oids);
3628 					pool_reset_memqcache_buffer(false);
3629 				}
3630 			}
3631 			else if (num_oids == 0)
3632 			{
3633 				/*
3634 				 * It is also necessary to clear cache buffers in case of no
3635 				 * oid queries (like BEGIN, CHECKPOINT, VACUUM, etc) too.
3636 				 */
3637 				pool_reset_memqcache_buffer(true);
3638 			}
3639 		}
3640 	}
3641 }
3642 
3643 /*
3644  * Create and initialize query cache stats
3645  */
3646 static POOL_QUERY_CACHE_STATS * stats;
3647 int
pool_init_memqcache_stats(void)3648 pool_init_memqcache_stats(void)
3649 {
3650 	stats = pool_shared_memory_create(sizeof(POOL_QUERY_CACHE_STATS));
3651 	pool_reset_memqcache_stats();
3652 	return 0;
3653 }
3654 
3655 /*
3656  * Returns copy of stats area. The copy is in static area and will be
3657  * overwritten by next call to this function.
3658  */
3659 POOL_QUERY_CACHE_STATS *
pool_get_memqcache_stats(void)3660 pool_get_memqcache_stats(void)
3661 {
3662 	static POOL_QUERY_CACHE_STATS mystats;
3663 	pool_sigset_t oldmask;
3664 
3665 	memset(&mystats, 0, sizeof(POOL_QUERY_CACHE_STATS));
3666 
3667 	if (stats)
3668 	{
3669 		POOL_SETMASK2(&BlockSig, &oldmask);
3670 		pool_semaphore_lock(QUERY_CACHE_STATS_SEM);
3671 		memcpy(&mystats, stats, sizeof(POOL_QUERY_CACHE_STATS));
3672 		pool_semaphore_unlock(QUERY_CACHE_STATS_SEM);
3673 		POOL_SETMASK(&oldmask);
3674 	}
3675 
3676 	return &mystats;
3677 }
3678 
3679 /*
3680  * Reset query cache stats. Caller must lock QUERY_CACHE_STATS_SEM if
3681  * necessary.
3682  */
3683 void
pool_reset_memqcache_stats(void)3684 pool_reset_memqcache_stats(void)
3685 {
3686 	memset(stats, 0, sizeof(POOL_QUERY_CACHE_STATS));
3687 	stats->start_time = time(NULL);
3688 }
3689 
3690 /*
3691  * Count up number of successful SELECTs and returns the number.
3692  * QUERY_CACHE_STATS_SEM lock is acquired in this function.
3693  */
3694 long long int
pool_stats_count_up_num_selects(long long int num)3695 pool_stats_count_up_num_selects(long long int num)
3696 {
3697 	pool_sigset_t oldmask;
3698 
3699 	POOL_SETMASK2(&BlockSig, &oldmask);
3700 	pool_semaphore_lock(QUERY_CACHE_STATS_SEM);
3701 	stats->num_selects += num;
3702 	pool_semaphore_unlock(QUERY_CACHE_STATS_SEM);
3703 	POOL_SETMASK(&oldmask);
3704 	return stats->num_selects;
3705 }
3706 
3707 /*
3708  * Count up number of successful SELECTs in temporary area and returns
3709  * the number.
3710  */
3711 long long int
pool_tmp_stats_count_up_num_selects(void)3712 pool_tmp_stats_count_up_num_selects(void)
3713 {
3714 	POOL_SESSION_CONTEXT *session_context;
3715 
3716 	session_context = pool_get_session_context(false);
3717 	session_context->num_selects++;
3718 	return session_context->num_selects;
3719 }
3720 
3721 /*
3722  * Return number of successful SELECTs in temporary area.
3723  */
3724 long long int
pool_tmp_stats_get_num_selects(void)3725 pool_tmp_stats_get_num_selects(void)
3726 {
3727 	POOL_SESSION_CONTEXT *session_context;
3728 
3729 	session_context = pool_get_session_context(false);
3730 	return session_context->num_selects;
3731 }
3732 
3733 /*
3734  * Reset number of successful SELECTs in temporary area.
3735  */
3736 void
pool_tmp_stats_reset_num_selects(void)3737 pool_tmp_stats_reset_num_selects(void)
3738 {
3739 	POOL_SESSION_CONTEXT *session_context;
3740 
3741 	session_context = pool_get_session_context(false);
3742 	session_context->num_selects = 0;
3743 }
3744 
3745 /*
3746  * Count up number of SELECTs extracted from cache returns the number.
3747  * QUERY_CACHE_STATS_SEM lock is acquired in this function.
3748  */
3749 long long int
pool_stats_count_up_num_cache_hits(void)3750 pool_stats_count_up_num_cache_hits(void)
3751 {
3752 	pool_sigset_t oldmask;
3753 
3754 	POOL_SETMASK2(&BlockSig, &oldmask);
3755 	pool_semaphore_lock(QUERY_CACHE_STATS_SEM);
3756 	stats->num_cache_hits++;
3757 	pool_semaphore_unlock(QUERY_CACHE_STATS_SEM);
3758 	POOL_SETMASK(&oldmask);
3759 	return stats->num_cache_hits;
3760 }
3761 
3762 /*
3763  * On shared memory hash table implementation.  We use sub part of md5
3764  * hash key as hash function.  The experiment has shown that has_any()
3765  * of PostgreSQL is a little bit better than the method using part of
3766  * md5 hash value, but it seems adding some cpu cycles to call
3767  * hash_any() is not worth the trouble.
3768  */
3769 
3770 static volatile POOL_HASH_HEADER *hash_header;
3771 static volatile POOL_HASH_ELEMENT *hash_elements;
3772 static volatile POOL_HASH_ELEMENT *hash_free;
3773 
3774 /*
3775  * Initialize hash table on shared memory "nelements" is max number of
3776  * hash keys. The actual number of hash key is rounded up to power of
3777  * 2.
3778  */
3779 #undef POOL_HASH_DEBUG
3780 
3781 int
pool_hash_init(int nelements)3782 pool_hash_init(int nelements)
3783 {
3784 	size_t		size;
3785 	int			nelements2;		/* number of rounded up hash keys */
3786 	int			shift;
3787 	uint32		mask;
3788 	POOL_HASH_HEADER hh;
3789 	int			i;
3790 
3791 	if (nelements <= 0)
3792 		ereport(ERROR,
3793 				(errmsg("initializing hash table on shared memory, invalid number of elements: %d", nelements)));
3794 
3795 	/* Round up to power of 2 */
3796 	shift = 32;
3797 	nelements2 = 1;
3798 	do
3799 	{
3800 		nelements2 <<= 1;
3801 		shift--;
3802 	} while (nelements2 < nelements);
3803 
3804 	mask = ~0;
3805 	mask >>= shift;
3806 	size = (char *) &hh.elements - (char *) &hh + sizeof(POOL_HEADER_ELEMENT) * nelements2;
3807 	hash_header = pool_shared_memory_create(size);
3808 	hash_header->nhash = nelements2;
3809 	hash_header->mask = mask;
3810 
3811 #ifdef POOL_HASH_DEBUG
3812 	ereport(LOG,
3813 			(errmsg("initializing hash table on shared memory"),
3814 			 errdetail("size:%zd nelements2:%d", size, nelements2)));
3815 
3816 #endif
3817 
3818 	size = sizeof(POOL_HASH_ELEMENT) * nelements2;
3819 	hash_elements = pool_shared_memory_create(size);
3820 
3821 #ifdef POOL_HASH_DEBUG
3822 	ereport(LOG,
3823 			(errmsg("initializing hash table on shared memory"),
3824 			 errdetail("size:%zd nelements2:%d", size, nelements2)));
3825 #endif
3826 
3827 	for (i = 0; i < nelements2 - 1; i++)
3828 	{
3829 		hash_elements[i].next = (POOL_HASH_ELEMENT *) & hash_elements[i + 1];
3830 	}
3831 	hash_elements[nelements2 - 1].next = NULL;
3832 	hash_free = hash_elements;
3833 
3834 	return 0;
3835 }
3836 
3837 /*
3838  * Reset hash table on shared memory "nelements" is max number of
3839  * hash keys. The actual number of hash key is rounded up to power of
3840  * 2.
3841  */
3842 static int
pool_hash_reset(int nelements)3843 pool_hash_reset(int nelements)
3844 {
3845 	size_t		size;
3846 	int			nelements2;		/* number of rounded up hash keys */
3847 	int			shift;
3848 	uint32		mask;
3849 	POOL_HASH_HEADER hh;
3850 	int			i;
3851 
3852 	if (nelements <= 0)
3853 		ereport(ERROR,
3854 				(errmsg("clearing hash table on shared memory, invalid number of elements: %d", nelements)));
3855 
3856 	/* Round up to power of 2 */
3857 	shift = 32;
3858 	nelements2 = 1;
3859 	do
3860 	{
3861 		nelements2 <<= 1;
3862 		shift--;
3863 	} while (nelements2 < nelements);
3864 
3865 	mask = ~0;
3866 	mask >>= shift;
3867 
3868 	size = (char *) &hh.elements - (char *) &hh + sizeof(POOL_HEADER_ELEMENT) * nelements2;
3869 	memset((void *) hash_header, 0, size);
3870 
3871 	hash_header->nhash = nelements2;
3872 	hash_header->mask = mask;
3873 
3874 	size = sizeof(POOL_HASH_ELEMENT) * nelements2;
3875 	memset((void *) hash_elements, 0, size);
3876 
3877 	for (i = 0; i < nelements2 - 1; i++)
3878 	{
3879 		hash_elements[i].next = (POOL_HASH_ELEMENT *) & hash_elements[i + 1];
3880 	}
3881 	hash_elements[nelements2 - 1].next = NULL;
3882 	hash_free = hash_elements;
3883 
3884 	return 0;
3885 }
3886 
3887 /*
3888  * Search cacheid by MD5 hash key string
3889  * If found, returns cache id, otherwise NULL.
3890  */
3891 POOL_CACHEID *
pool_hash_search(POOL_QUERY_HASH * key)3892 pool_hash_search(POOL_QUERY_HASH * key)
3893 {
3894 	volatile	POOL_HASH_ELEMENT *element;
3895 
3896 	uint32		hash_key = create_hash_key(key);
3897 
3898 	if (hash_key >= hash_header->nhash)
3899 	{
3900 		ereport(WARNING,
3901 				(errmsg("memcache: searching cacheid from hash. invalid hash key"),
3902 				 errdetail("invalid hash key: %uld nhash: %ld",
3903 						   hash_key, hash_header->nhash)));
3904 		return NULL;
3905 	}
3906 
3907 	{
3908 		char		md5[POOL_MD5_HASHKEYLEN + 1];
3909 
3910 		memcpy(md5, key->query_hash, POOL_MD5_HASHKEYLEN);
3911 		md5[POOL_MD5_HASHKEYLEN] = '\0';
3912 #ifdef POOL_HASH_DEBUG
3913 		ereport(LOG,
3914 				(errmsg("searching hash table"),
3915 				 errdetail("hash_key:%d md5:%s", hash_key, md5)));
3916 #endif
3917 	}
3918 
3919 	element = hash_header->elements[hash_key].element;
3920 	while (element)
3921 	{
3922 		{
3923 			char		md5[POOL_MD5_HASHKEYLEN + 1];
3924 
3925 			memcpy(md5, key->query_hash, POOL_MD5_HASHKEYLEN);
3926 			md5[POOL_MD5_HASHKEYLEN] = '\0';
3927 #ifdef POOL_HASH_DEBUG
3928 			ereport(LOG,
3929 					(errmsg("searching hash table"),
3930 					 errdetail("element md5:%s", md5)));
3931 #endif
3932 		}
3933 
3934 		if (memcmp((const void *) element->hashkey.query_hash,
3935 				   (const void *) key->query_hash, sizeof(key->query_hash)) == 0)
3936 		{
3937 			return (POOL_CACHEID *) & element->cacheid;
3938 		}
3939 		element = element->next;
3940 	}
3941 	return NULL;
3942 }
3943 
3944 /*
3945  * Insert MD5 key and associated cache id into shmem hash table.  If
3946  * "update" is true, replace cacheid associated with the MD5 key,
3947  * rather than throw an error.
3948  */
3949 static int
pool_hash_insert(POOL_QUERY_HASH * key,POOL_CACHEID * cacheid,bool update)3950 pool_hash_insert(POOL_QUERY_HASH * key, POOL_CACHEID * cacheid, bool update)
3951 {
3952 	POOL_HASH_ELEMENT *element;
3953 	POOL_HASH_ELEMENT *new_element;
3954 
3955 	uint32		hash_key = create_hash_key(key);
3956 
3957 	if (hash_key >= hash_header->nhash)
3958 	{
3959 		ereport(WARNING,
3960 				(errmsg("memcache: adding cacheid to hash. invalid hash key"),
3961 				 errdetail("invalid hash key: %uld nhash: %ld",
3962 						   hash_key, hash_header->nhash)));
3963 		return -1;
3964 	}
3965 
3966 	{
3967 		char		md5[POOL_MD5_HASHKEYLEN + 1];
3968 
3969 		memcpy(md5, key->query_hash, POOL_MD5_HASHKEYLEN);
3970 		md5[POOL_MD5_HASHKEYLEN] = '\0';
3971 #ifdef POOL_HASH_DEBUG
3972 		ereport(LOG,
3973 				(errmsg("searching hash table"),
3974 				 errdetail("hash_key:%d md5:%s block:%d item:%d", hash_key, md5, cacheid->blockid, cacheid->itemid)));
3975 #endif
3976 	}
3977 
3978 	/*
3979 	 * Look for hash key.
3980 	 */
3981 	element = hash_header->elements[hash_key].element;
3982 
3983 	while (element)
3984 	{
3985 		if (memcmp((const void *) element->hashkey.query_hash,
3986 				   (const void *) key->query_hash, sizeof(key->query_hash)) == 0)
3987 		{
3988 			/* Hash key found. If "update" is false, just throw an error. */
3989 			char		md5[POOL_MD5_HASHKEYLEN + 1];
3990 
3991 			if (!update)
3992 			{
3993 				memcpy(md5, key->query_hash, POOL_MD5_HASHKEYLEN);
3994 				md5[POOL_MD5_HASHKEYLEN] = '\0';
3995 				ereport(LOG,
3996 						(errmsg("memcache: adding cacheid to hash. hash key:\"%s\" already exists", md5)));
3997 				return -1;
3998 			}
3999 			else
4000 			{
4001 				/* Update cache id */
4002 				memcpy((void *) &element->cacheid, cacheid, sizeof(POOL_CACHEID));
4003 				return 0;
4004 			}
4005 		}
4006 		element = element->next;
4007 	}
4008 
4009 	/*
4010 	 * Ok, same key did not exist. Just insert new hash key.
4011 	 */
4012 	new_element = (POOL_HASH_ELEMENT *) get_new_hash_element();
4013 	if (!new_element)
4014 	{
4015 		ereport(LOG,
4016 				(errmsg("memcache: adding cacheid to hash. failed to get new element")));
4017 		return -1;
4018 	}
4019 
4020 	element = hash_header->elements[hash_key].element;
4021 
4022 	hash_header->elements[hash_key].element = new_element;
4023 	new_element->next = element;
4024 
4025 	memcpy((void *) new_element->hashkey.query_hash, key->query_hash, POOL_MD5_HASHKEYLEN);
4026 	memcpy((void *) &new_element->cacheid, cacheid, sizeof(POOL_CACHEID));
4027 
4028 	return 0;
4029 }
4030 
4031 /*
4032  * Delete MD5 key and associated cache id from shmem hash table.
4033  */
4034 int
pool_hash_delete(POOL_QUERY_HASH * key)4035 pool_hash_delete(POOL_QUERY_HASH * key)
4036 {
4037 	POOL_HASH_ELEMENT *element;
4038 	POOL_HASH_ELEMENT **delete_point;
4039 	bool		found;
4040 
4041 	uint32		hash_key = create_hash_key(key);
4042 
4043 	if (hash_key >= hash_header->nhash)
4044 	{
4045 		ereport(LOG,
4046 				(errmsg("memcache: deleting key from hash. invalid key"),
4047 				 errdetail("invalid hash key: %uld nhash: %ld",
4048 						   hash_key, hash_header->nhash)));
4049 		return -1;
4050 	}
4051 
4052 	/*
4053 	 * Look for delete location
4054 	 */
4055 	found = false;
4056 	delete_point = (POOL_HASH_ELEMENT * *) & (hash_header->elements[hash_key].element);
4057 	element = hash_header->elements[hash_key].element;
4058 
4059 	while (element)
4060 	{
4061 		if (memcmp(element->hashkey.query_hash, key->query_hash, sizeof(key->query_hash)) == 0)
4062 		{
4063 			found = true;
4064 			break;
4065 		}
4066 		delete_point = &element->next;
4067 		element = element->next;
4068 	}
4069 
4070 	if (!found)
4071 	{
4072 		char		md5[POOL_MD5_HASHKEYLEN + 1];
4073 
4074 		memcpy(md5, key->query_hash, POOL_MD5_HASHKEYLEN);
4075 		md5[POOL_MD5_HASHKEYLEN] = '\0';
4076 		ereport(LOG,
4077 				(errmsg("memcache: deleting key from hash. key:\"%s\" not found", md5)));
4078 		return -1;
4079 	}
4080 
4081 	/*
4082 	 * Put back the element to free list
4083 	 */
4084 	*delete_point = element->next;
4085 	put_back_hash_element(element);
4086 
4087 	return 0;
4088 }
4089 
4090 /*
4091  * Calculate 32bit binary hash key (i.e. location in hash header) from MD5
4092  * string. We use top most 8 characters of MD5 string for calculation.
4093 */
4094 static uint32
create_hash_key(POOL_QUERY_HASH * key)4095 create_hash_key(POOL_QUERY_HASH * key)
4096 {
4097 #define POOL_HASH_NCHARS 8
4098 
4099 	char		md5[POOL_HASH_NCHARS + 1];
4100 	uint32		mask;
4101 
4102 	memcpy(md5, key->query_hash, POOL_HASH_NCHARS);
4103 	md5[POOL_HASH_NCHARS] = '\0';
4104 	mask = strtoul(md5, NULL, 16);
4105 	mask &= hash_header->mask;
4106 	return mask;
4107 }
4108 
4109 /*
4110  * Get new free hash element from free list.
4111  */
4112 static volatile POOL_HASH_ELEMENT *
get_new_hash_element(void)4113 get_new_hash_element(void)
4114 {
4115 	volatile	POOL_HASH_ELEMENT *elm;
4116 
4117 	if (!hash_free->next)
4118 	{
4119 		/* No free element */
4120 		return NULL;
4121 	}
4122 
4123 #ifdef POOL_HASH_DEBUG
4124 	ereport(LOG,
4125 			(errmsg("getting new hash element"),
4126 			 errdetail("hash_free->next:%p hash_free->next->next:%p",
4127 					   hash_free->next, hash_free->next->next)));
4128 #endif
4129 
4130 	elm = hash_free->next;
4131 	hash_free->next = elm->next;
4132 
4133 	return elm;
4134 }
4135 
4136 /*
4137  * Put back hash element to free list.
4138  */
4139 static void
put_back_hash_element(volatile POOL_HASH_ELEMENT * element)4140 put_back_hash_element(volatile POOL_HASH_ELEMENT * element)
4141 {
4142 	POOL_HASH_ELEMENT *elm;
4143 
4144 #ifdef POOL_HASH_DEBUG
4145 	ereport(LOG,
4146 			(errmsg("getting new hash element"),
4147 			 errdetail("hash_free->next:%p hash_free->next->next:%p",
4148 					   hash_free->next, hash_free->next->next)));
4149 #endif
4150 
4151 	elm = hash_free->next;
4152 	hash_free->next = (POOL_HASH_ELEMENT *) element;
4153 	element->next = elm;
4154 }
4155 
4156 /*
4157  * Return true if there's a free hash element.
4158  */
4159 static bool
is_free_hash_element(void)4160 is_free_hash_element(void)
4161 {
4162 	return hash_free->next != NULL;
4163 }
4164 
4165 /*
4166  * Returns shared memory cache stats.
4167  * Subsequent call to this function will break return value
4168  * because its in static memory.
4169  * Caller must hold shmem_lock before calling this function.
4170  * If in memory query cache is not enabled, all stats are 0.
4171  */
4172 POOL_SHMEM_STATS *
pool_get_shmem_storage_stats(void)4173 pool_get_shmem_storage_stats(void)
4174 {
4175 	static POOL_SHMEM_STATS mystats;
4176 	POOL_HASH_ELEMENT *element;
4177 	int			nblocks;
4178 	int			i;
4179 
4180 	memset(&mystats, 0, sizeof(POOL_SHMEM_STATS));
4181 
4182 	if (!pool_config->memory_cache_enabled)
4183 		return &mystats;
4184 
4185 	/*
4186 	 * Copy cache hit data
4187 	 */
4188 	mystats.cache_stats.num_selects = stats->num_selects;
4189 	mystats.cache_stats.num_cache_hits = stats->num_cache_hits;
4190 
4191 	if (pool_config->memqcache_method != SHMEM_CACHE)
4192 		return &mystats;
4193 
4194 	/* number of total hash entries */
4195 	mystats.num_hash_entries = hash_header->nhash;
4196 
4197 	/* number of used hash entries */
4198 	for (i = 0; i < hash_header->nhash; i++)
4199 	{
4200 		element = hash_header->elements[i].element;
4201 		while (element)
4202 		{
4203 			mystats.used_hash_entries++;
4204 			element = element->next;
4205 		}
4206 	}
4207 
4208 	nblocks = pool_get_memqcache_blocks();
4209 
4210 	for (i = 0; i < nblocks; i++)
4211 	{
4212 		POOL_CACHE_BLOCK_HEADER *bh;
4213 		POOL_CACHE_ITEM_POINTER *cip;
4214 		char	   *p = block_address(i);
4215 
4216 		bh = (POOL_CACHE_BLOCK_HEADER *) p;
4217 		int			j;
4218 
4219 		if (bh->flags & POOL_BLOCK_USED)
4220 		{
4221 			for (j = 0; j < bh->num_items; j++)
4222 			{
4223 				cip = item_pointer(p, j);
4224 				if (POOL_ITEM_DELETED & cip->flags)
4225 				{
4226 					mystats.fragment_cache_entries_size += item_header(p, j)->total_length;
4227 				}
4228 				else
4229 				{
4230 					/* number of used cache entries */
4231 					mystats.num_cache_entries++;
4232 					/* total size of used cache entries */
4233 					mystats.used_cache_entries_size += (item_header(p, j)->total_length + sizeof(POOL_CACHE_ITEM_POINTER));
4234 				}
4235 			}
4236 			mystats.used_cache_entries_size += sizeof(POOL_CACHE_BLOCK_HEADER);
4237 			/* total size of free(usable) cache entries */
4238 			mystats.free_cache_entries_size += bh->free_bytes;
4239 		}
4240 		else
4241 		{
4242 			mystats.free_cache_entries_size += pool_config->memqcache_cache_block_size;
4243 		}
4244 	}
4245 
4246 	/*
4247 	 * Copy POOL_QUERY_CACHE_STATS
4248 	 */
4249 	memcpy(&mystats.cache_stats, stats, sizeof(mystats.cache_stats));
4250 
4251 	return &mystats;
4252 }
4253 
4254 /*
4255  * Inject cached message to the target backend buffer to pretend as if backend
4256  * actually replies with Data row and Command Complete message.
4257  */
4258 static void
inject_cached_message(POOL_CONNECTION * backend,char * qcache,int qcachelen)4259 inject_cached_message(POOL_CONNECTION * backend, char *qcache, int qcachelen)
4260 {
4261 	char		kind;
4262 	int			len;
4263 	char	   *buf;
4264 	int			timeout;
4265 	int			i = 0;
4266 	bool		is_prepared_stmt = false;
4267 	POOL_SESSION_CONTEXT *session_context;
4268 	POOL_QUERY_CONTEXT *query_context;
4269 	POOL_PENDING_MESSAGE *msg;
4270 
4271 	session_context = pool_get_session_context(false);
4272 	query_context = session_context->query_context;
4273 	msg = pool_pending_message_find_lastest_by_query_context(query_context);
4274 
4275 	if (msg)
4276 	{
4277 		/*
4278 		 * If pending message found, we should extract target backend from it
4279 		 */
4280 		int			backend_id;
4281 
4282 		backend_id = pool_pending_message_get_target_backend_id(msg);
4283 		backend = CONNECTION(session_context->backend, backend_id);
4284 		timeout = -1;
4285 	}
4286 	else
4287 		timeout = 0;
4288 
4289 	/* Send flush messsage to backend to retrieve response of backend */
4290 	pool_write(backend, "H", 1);
4291 	len = htonl(sizeof(len));
4292 	pool_write_and_flush(backend, &len, sizeof(len));
4293 
4294 	/*
4295 	 * Push any response from backend
4296 	 */
4297 	for (;;)
4298 	{
4299 		/* check if there's any pending data */
4300 		if (!pool_ssl_pending(backend) && pool_read_buffer_is_empty(backend))
4301 		{
4302 			pool_set_timeout(timeout);
4303 			if (pool_check_fd(backend) != 0)
4304 			{
4305 				ereport(DEBUG1,
4306 						(errmsg("inject_cached_message: select shows no pending data")));
4307 				pool_set_timeout(-1);
4308 				break;
4309 			}
4310 			pool_set_timeout(-1);
4311 		}
4312 
4313 		pool_read(backend, &kind, 1);
4314 		ereport(DEBUG1,
4315 				(errmsg("inject_cached_message: push message kind: '%c'", kind)));
4316 		if (msg &&
4317 			((kind == 'T' && msg->type == POOL_DESCRIBE) ||
4318 			 (kind == '2' && msg->type == POOL_BIND)))
4319 		{
4320 			/* Pending message seen. Now it is likely to end of pending data */
4321 			timeout = 0;
4322 		}
4323 		pool_push(backend, &kind, sizeof(kind));
4324 		pool_read(backend, &len, sizeof(len));
4325 		pool_push(backend, &len, sizeof(len));
4326 		if ((ntohl(len) - sizeof(len)) > 0)
4327 		{
4328 			buf = pool_read2(backend, ntohl(len) - sizeof(len));
4329 			pool_push(backend, buf, ntohl(len) - sizeof(len));
4330 		}
4331 	}
4332 
4333 	/*
4334 	 * Inject row data and command complete
4335 	 */
4336 	while (i < qcachelen)
4337 	{
4338 		char		tmpkind;
4339 		int			tmplen;
4340 		char	   *p;
4341 
4342 		tmpkind = qcache[i];
4343 		i++;
4344 
4345 		memcpy(&tmplen, qcache + i, sizeof(tmplen));
4346 		i += sizeof(tmplen);
4347 		len = ntohl(tmplen);
4348 		p = qcache + i;
4349 		i += len - sizeof(tmplen);
4350 
4351 		/* No need to cache PARSE and BIND responses */
4352 		if (tmpkind == '1' || tmpkind == '2')
4353 		{
4354 			is_prepared_stmt = true;
4355 			continue;
4356 		}
4357 
4358 		/*
4359 		 * In the prepared statement execution, there is no need to send 'T'
4360 		 * response to the frontend.
4361 		 */
4362 		if (is_prepared_stmt && tmpkind == 'T')
4363 		{
4364 			continue;
4365 		}
4366 
4367 		/* push message */
4368 		ereport(DEBUG1,
4369 				(errmsg("inject_cached_message: push cached messages: '%c' len: %d", tmpkind, len)));
4370 		pool_push(backend, &tmpkind, 1);
4371 		pool_push(backend, &tmplen, sizeof(tmplen));
4372 		if (len > 0)
4373 			pool_push(backend, p, len - sizeof(tmplen));
4374 	}
4375 
4376 	/*
4377 	 * Pop data.
4378 	 */
4379 	pool_pop(backend, &len);
4380 }
4381