1 /* -*-pgsql-c-*- */
2 /*
3  * pgpool: a language independent connection pool server for PostgreSQL
4  * written by Tatsuo Ishii
5  *
6  * Copyright (c) 2003-2020	PgPool Global Development Group
7  *
8  * Permission to use, copy, modify, and distribute this software and
9  * its documentation for any purpose and without fee is hereby
10  * granted, provided that the above copyright notice appear in all
11  * copies and that both that copyright notice and this permission
12  * notice appear in supporting documentation, and that the name of the
13  * author not be used in advertising or publicity pertaining to
14  * distribution of the software without specific, written prior
15  * permission. The author makes no representations about the
16  * suitability of this software for any purpose.  It is provided "as
17  * is" without express or implied warranty.
18  *
19  * pool_memqcache.c: query cache on shmem or memcached
20  *
21  */
22 #define DATABASE_TO_OID_QUERY "SELECT oid FROM pg_database WHERE datname = '%s'"
23 
24 #include "pool.h"
25 
26 #include <stdio.h>
27 #include <string.h>
28 #include <unistd.h>
29 #include <sys/file.h>
30 #include <sys/stat.h>
31 #include <sys/types.h>
32 #include <fcntl.h>
33 #include <errno.h>
34 #include <string.h>
35 #include <stdlib.h>
36 #include <ctype.h>
37 #include <arpa/inet.h>
38 #include <dirent.h>
39 
40 #ifdef USE_MEMCACHED
41 #include <libmemcached/memcached.h>
42 #endif
43 
44 #include "auth/md5.h"
45 #include "pool_config.h"
46 #include "protocol/pool_proto_modules.h"
47 #include "parser/parsenodes.h"
48 #include "context/pool_session_context.h"
49 #include "query_cache/pool_memqcache.h"
50 #include "utils/pool_relcache.h"
51 #include "utils/pool_select_walker.h"
52 #include "utils/pool_stream.h"
53 #include "utils/pool_stream.h"
54 #include "utils/elog.h"
55 #include "utils/palloc.h"
56 #include "utils/memutils.h"
57 
58 #ifdef USE_MEMCACHED
59 memcached_st *memc;
60 #endif
61 
62 static char *encode_key(const char *s, char *buf, POOL_CONNECTION_POOL * backend);
63 #ifdef DEBUG
64 static void dump_cache_data(const char *data, size_t len);
65 #endif
66 static int	pool_commit_cache(POOL_CONNECTION_POOL * backend, char *query, char *data, size_t datalen, int num_oids, int *oids);
67 static int	send_cached_messages(POOL_CONNECTION * frontend, const char *qcache, int qcachelen);
68 static void send_message(POOL_CONNECTION * conn, char kind, int len, const char *data);
69 #ifdef USE_MEMCACHED
70 static int	delete_cache_on_memcached(const char *key);
71 #endif
72 static int	pool_get_dml_table_oid(int **oid);
73 static int	pool_get_dropdb_table_oids(int **oids, int dboid);
74 static void pool_discard_dml_table_oid(void);
75 static void pool_invalidate_query_cache(int num_table_oids, int *table_oid, bool unlink, int dboid);
76 static int	pool_get_database_oid(void);
77 static void pool_add_table_oid_map(POOL_CACHEKEY * cachkey, int num_table_oids, int *table_oids);
78 static void pool_reset_memqcache_buffer(bool reset_dml_oids);
79 static POOL_CACHEID * pool_add_item_shmem_cache(POOL_QUERY_HASH * query_hash, char *data, int size, time_t expire);
80 static POOL_CACHEID * pool_find_item_on_shmem_cache(POOL_QUERY_HASH * query_hash);
81 static char *pool_get_item_shmem_cache(POOL_QUERY_HASH * query_hash, int *size, int *sts);
82 static POOL_QUERY_CACHE_ARRAY * pool_add_query_cache_array(POOL_QUERY_CACHE_ARRAY * cache_array, POOL_TEMP_QUERY_CACHE * cache);
83 static void pool_add_temp_query_cache(POOL_TEMP_QUERY_CACHE * temp_cache, char kind, char *data, int data_len);
84 static void pool_add_oids_temp_query_cache(POOL_TEMP_QUERY_CACHE * temp_cache, int num_oids, int *oids);
85 static POOL_INTERNAL_BUFFER * pool_create_buffer(void);
86 static void pool_discard_buffer(POOL_INTERNAL_BUFFER * buffer);
87 static void pool_add_buffer(POOL_INTERNAL_BUFFER * buffer, void *data, size_t len);
88 static void *pool_get_buffer(POOL_INTERNAL_BUFFER * buffer, size_t *len);
89 #ifdef NOT_USED
90 static char *pool_get_buffer_pointer(POOL_INTERNAL_BUFFER * buffer);
91 #endif
92 static char *pool_get_current_cache_buffer(size_t *len);
93 static size_t pool_get_buffer_length(POOL_INTERNAL_BUFFER * buffer);
94 static void pool_check_and_discard_cache_buffer(int num_oids, int *oids);
95 
96 static void pool_set_memqcache_blocks(int num_blocks);
97 static int	pool_get_memqcache_blocks(void);
98 static void *pool_memory_cache_address(void);
99 static void pool_reset_fsmm(size_t size);
100 static void *pool_fsmm_address(void);
101 static void pool_update_fsmm(POOL_CACHE_BLOCKID blockid, size_t free_space);
102 static POOL_CACHE_BLOCKID pool_get_block(size_t free_space);
103 static POOL_CACHE_ITEM_HEADER * pool_cache_item_header(POOL_CACHEID * cacheid);
104 static int	pool_init_cache_block(POOL_CACHE_BLOCKID blockid);
105 #if NOT_USED
106 static void pool_wipe_out_cache_block(POOL_CACHE_BLOCKID blockid);
107 #endif
108 static int	pool_delete_item_shmem_cache(POOL_CACHEID * cacheid);
109 static char *block_address(int blockid);
110 static POOL_CACHE_ITEM_POINTER * item_pointer(char *block, int i);
111 static POOL_CACHE_ITEM_HEADER * item_header(char *block, int i);
112 static POOL_CACHE_BLOCKID pool_reuse_block(void);
113 #ifdef SHMEMCACHE_DEBUG
114 static void dump_shmem_cache(POOL_CACHE_BLOCKID blockid);
115 #endif
116 
117 static int	pool_hash_reset(int nelements);
118 static int	pool_hash_insert(POOL_QUERY_HASH * key, POOL_CACHEID * cacheid, bool update);
119 static uint32 create_hash_key(POOL_QUERY_HASH * key);
120 static volatile POOL_HASH_ELEMENT *get_new_hash_element(void);
121 static void put_back_hash_element(volatile POOL_HASH_ELEMENT * element);
122 static bool is_free_hash_element(void);
123 static void inject_cached_message(POOL_CONNECTION * backend, char *qcache, int qcachelen);
124 
125 /*
126  * if true, shared memory is locked in this process now.
127  */
128 static int is_shmem_locked;
129 
130 /*
131  * Connect to Memcached
132  */
133 int
memcached_connect(void)134 memcached_connect(void)
135 {
136 	char	   *memqcache_memcached_host;
137 	int			memqcache_memcached_port;
138 #ifdef USE_MEMCACHED
139 	memcached_server_st *servers;
140 	memcached_return rc;
141 
142 	/* Already connected? */
143 	if (memc)
144 	{
145 		return 0;
146 	}
147 #endif
148 
149 	memqcache_memcached_host = pool_config->memqcache_memcached_host;
150 	memqcache_memcached_port = pool_config->memqcache_memcached_port;
151 
152 	ereport(DEBUG1,
153 			(errmsg("connecting to memcached on Host:\"%s:%d\"", memqcache_memcached_host, memqcache_memcached_port)));
154 
155 #ifdef USE_MEMCACHED
156 	memc = memcached_create(NULL);
157 	servers = memcached_server_list_append(NULL,
158 										   memqcache_memcached_host,
159 										   memqcache_memcached_port,
160 										   &rc);
161 
162 	rc = memcached_server_push(memc, servers);
163 	if (rc != MEMCACHED_SUCCESS)
164 	{
165 		ereport(WARNING,
166 				(errmsg("failed to connect to memcached, server push error:\"%s\"\n", memcached_strerror(memc, rc))));
167 		memc = (memcached_st *) - 1;
168 		return -1;
169 	}
170 	memcached_server_list_free(servers);
171 #else
172 	ereport(WARNING,
173 			(errmsg("failed to connect to memcached, memcached support is not enabled")));
174 	return -1;
175 #endif
176 	return 0;
177 }
178 
179 /*
180  * Disconnect to Memcached
181  */
182 void
memcached_disconnect(void)183 memcached_disconnect(void)
184 {
185 #ifdef USE_MEMCACHED
186 	if (!memc)
187 	{
188 		return;
189 	}
190 	memcached_free(memc);
191 #else
192 	ereport(WARNING,
193 			(errmsg("failed to disconnect from memcached, memcached support is not enabled")));
194 #endif
195 }
196 
197 /*
198  * Register buffer data for query cache in memory cache
199  */
200 void
memqcache_register(char kind,POOL_CONNECTION * frontend,char * data,int data_len)201 memqcache_register(char kind,
202 				   POOL_CONNECTION * frontend,
203 				   char *data,
204 				   int data_len)
205 {
206 	POOL_TEMP_QUERY_CACHE *cache;
207 	POOL_SESSION_CONTEXT *session_context;
208 	POOL_QUERY_CONTEXT *query_context;
209 
210 	cache = pool_get_current_cache();
211 
212 	if (cache == NULL)
213 	{
214 		session_context = pool_get_session_context(true);
215 
216 		if (session_context && pool_is_query_in_progress())
217 		{
218 			char	   *query = pool_get_query_string();
219 
220 			query_context = session_context->query_context;
221 
222 			if (query)
223 				query_context->temp_cache = pool_create_temp_query_cache(query);
224 		}
225 	}
226 
227 	pool_add_temp_query_cache(cache, kind, data, data_len);
228 }
229 
230 /*
231  * Commit SELECT results to cache storage.
232  */
233 static int
pool_commit_cache(POOL_CONNECTION_POOL * backend,char * query,char * data,size_t datalen,int num_oids,int * oids)234 pool_commit_cache(POOL_CONNECTION_POOL * backend, char *query, char *data, size_t datalen, int num_oids, int *oids)
235 {
236 #ifdef USE_MEMCACHED
237 	memcached_return rc;
238 #endif
239 	POOL_CACHEKEY cachekey;
240 	char		tmpkey[MAX_KEY];
241 	time_t		memqcache_expire;
242 
243 	/*
244 	 * get_buflen() will return -1 if query result exceeds memqcache_maxcache
245 	 */
246 	if (datalen == -1)
247 	{
248 		return -1;
249 	}
250 
251 	/* query disabled */
252 	if (strlen(query) <= 0)
253 	{
254 		return -1;
255 	}
256 	ereport(DEBUG1,
257 			(errmsg("commiting SELECT results to cache storage"),
258 			 errdetail("Query=\"%s\"", query)));
259 #ifdef DEBUG
260 	dump_cache_data(data, datalen);
261 #endif
262 
263 
264 	/* encode md5key for memcached */
265 	encode_key(query, tmpkey, backend);
266 	ereport(DEBUG2,
267 			(errmsg("commiting SELECT results to cache storage"),
268 			 errdetail("search key : \"%s\"", tmpkey)));
269 
270 	memcpy(cachekey.hashkey, tmpkey, 32);
271 
272 	memqcache_expire = pool_config->memqcache_expire;
273 	ereport(DEBUG1,
274 			(errmsg("commiting SELECT results to cache storage"),
275 			 errdetail("memqcache_expire = %ld", memqcache_expire)));
276 
277 	if (pool_is_shmem_cache())
278 	{
279 		POOL_CACHEID *cacheid;
280 		POOL_QUERY_HASH query_hash;
281 
282 		memcpy(query_hash.query_hash, tmpkey, sizeof(query_hash.query_hash));
283 
284 		cacheid = pool_hash_search(&query_hash);
285 
286 		if (cacheid != NULL)
287 		{
288 			ereport(DEBUG1,
289 					(errmsg("commiting SELECT results to cache storage"),
290 					 errdetail("item already exists")));
291 
292 			return 0;
293 		}
294 		else
295 		{
296 			cacheid = pool_add_item_shmem_cache(&query_hash, data, datalen,memqcache_expire);
297 			if (cacheid == NULL)
298 			{
299 				ereport(LOG,
300 						(errmsg("failed to add item to shmem cache")));
301 				return -1;
302 			}
303 			else
304 			{
305 				ereport(DEBUG2,
306 						(errmsg("commiting SELECT results to cache storage"),
307 						 errdetail("blockid: %d itemid: %d",
308 								   cacheid->blockid, cacheid->itemid)));
309 			}
310 			cachekey.cacheid.blockid = cacheid->blockid;
311 			cachekey.cacheid.itemid = cacheid->itemid;
312 		}
313 	}
314 
315 #ifdef USE_MEMCACHED
316 	else
317 	{
318 		rc = memcached_set(memc, tmpkey, 32,
319 						   data, datalen, (time_t) memqcache_expire, 0);
320 		if (rc != MEMCACHED_SUCCESS)
321 		{
322 			ereport(WARNING,
323 					(errmsg("cache commit failed with error:\"%s\"", memcached_strerror(memc, rc))));
324 			return -1;
325 		}
326 		ereport(DEBUG1,
327 				(errmsg("commiting SELECT results to cache storage"),
328 				 errdetail("set cache succeeded")));
329 	}
330 #endif
331 
332 /*
333  * Register cache id to oid map
334  */
335 	pool_add_table_oid_map(&cachekey, num_oids, oids);
336 
337 	return 0;
338 }
339 
340 /*
341  * Commit SELECT system catalog results to cache storage.
342  */
343 int
pool_catalog_commit_cache(POOL_CONNECTION_POOL * backend,char * query,char * data,size_t datalen)344 pool_catalog_commit_cache(POOL_CONNECTION_POOL * backend, char *query, char *data, size_t datalen)
345 {
346 #ifdef USE_MEMCACHED
347 	memcached_return rc;
348 #endif
349 	POOL_CACHEKEY cachekey;
350 	char		tmpkey[MAX_KEY];
351 	time_t		memqcache_expire;
352 
353 	/*
354 	 * get_buflen() will return -1 if query result exceeds memqcache_maxcache
355 	 */
356 	if (datalen == -1)
357 	{
358 		return -1;
359 	}
360 
361 	/* query disabled */
362 	if (strlen(query) <= 0)
363 	{
364 		return -1;
365 	}
366 	ereport(DEBUG1,
367 			(errmsg("commiting relation cache to cache storage"),
368 			 errdetail("Query=\"%s\"", query)));
369 
370 #ifdef DEBUG
371 	dump_cache_data(data, datalen);
372 #endif
373 
374 	/* encode md5key for memcached */
375 	encode_key(query, tmpkey, backend);
376 	ereport(DEBUG2,
377 			(errmsg("commiting relation cache to cache storage"),
378 			 errdetail("search key : \"%s\"", tmpkey)));
379 
380 	memcpy(cachekey.hashkey, tmpkey, 32);
381 
382 	memqcache_expire = pool_config->relcache_expire;
383 	ereport(DEBUG1,
384 			(errmsg("commiting relation cache to cache storage"),
385 			 errdetail("memqcache_expire = %ld", memqcache_expire)));
386 
387 	if (pool_is_shmem_cache())
388 	{
389 		POOL_CACHEID *cacheid;
390 		POOL_QUERY_HASH query_hash;
391 
392 		memcpy(query_hash.query_hash, tmpkey, sizeof(query_hash.query_hash));
393 
394 		cacheid = pool_hash_search(&query_hash);
395 
396 		if (cacheid != NULL)
397 		{
398 			ereport(DEBUG1,
399 					(errmsg("commiting relation cache to cache storage"),
400 					 errdetail("item already exists")));
401 
402 			return 0;
403 		}
404 		else
405 		{
406 			cacheid = pool_add_item_shmem_cache(&query_hash, data, datalen, memqcache_expire);
407 			if (cacheid == NULL)
408 			{
409 				ereport(LOG,
410 						(errmsg("failed to add item to shmem cache")));
411 				return -1;
412 			}
413 			else
414 			{
415 				ereport(DEBUG2,
416 						(errmsg("commiting relation cache to cache storage"),
417 						 errdetail("blockid: %d itemid: %d",
418 								   cacheid->blockid, cacheid->itemid)));
419 			}
420 			cachekey.cacheid.blockid = cacheid->blockid;
421 			cachekey.cacheid.itemid = cacheid->itemid;
422 		}
423 	}
424 
425 #ifdef USE_MEMCACHED
426 	else
427 	{
428 		rc = memcached_set(memc, tmpkey, 32,
429 						   data, datalen, (time_t) memqcache_expire, 0);
430 		if (rc != MEMCACHED_SUCCESS)
431 		{
432 			ereport(WARNING,
433 					(errmsg("cache commit failed with error:\"%s\"", memcached_strerror(memc, rc))));
434 			return -1;
435 		}
436 		ereport(DEBUG1,
437 				(errmsg("commiting relation cache to cache storage"),
438 				 errdetail("set cache succeeded")));
439 	}
440 #endif
441 	return 0;
442 }
443 
444 
445 /*
446  * Fetch from memory cache.
447  * Return:
448  * 0: fetch success,
449  * 1: not found
450  */
451 int
pool_fetch_cache(POOL_CONNECTION_POOL * backend,const char * query,char ** buf,size_t * len)452 pool_fetch_cache(POOL_CONNECTION_POOL * backend, const char *query, char **buf, size_t *len)
453 {
454 	char	   *ptr;
455 	char		tmpkey[MAX_KEY];
456 	int			sts;
457 	char	   *p;
458 
459 	if (strlen(query) <= 0)
460 		ereport(ERROR,
461 				(errmsg("fetching from cache storage, no query")));
462 
463 	/* encode md5key for memcached */
464 	encode_key(query, tmpkey, backend);
465 	ereport(DEBUG1,
466 			(errmsg("fetching from cache storage"),
467 			 errdetail("search key \"%s\"", tmpkey)));
468 
469 
470 	if (pool_is_shmem_cache())
471 	{
472 		POOL_QUERY_HASH query_hash;
473 		int			mylen;
474 
475 		memcpy(query_hash.query_hash, tmpkey, sizeof(query_hash.query_hash));
476 
477 		ptr = pool_get_item_shmem_cache(&query_hash, &mylen, &sts);
478 		if (ptr == NULL)
479 		{
480 			ereport(DEBUG1,
481 					(errmsg("fetching from cache storage"),
482 					 errdetail("cache not found on shared memory")));
483 
484 			return 1;
485 		}
486 		*len = mylen;
487 	}
488 #ifdef USE_MEMCACHED
489 	else
490 	{
491 		memcached_return rc;
492 		unsigned int flags;
493 
494 		ptr = memcached_get(memc, tmpkey, strlen(tmpkey), len, &flags, &rc);
495 
496 		if (rc != MEMCACHED_SUCCESS)
497 		{
498 			if (rc != MEMCACHED_NOTFOUND)
499 			{
500 				ereport(LOG,
501 						(errmsg("fetching from cache storage, memcached_get failed with error: \"%s\"", memcached_strerror(memc, rc))));
502 
503 				/*
504 				 * Turn off memory cache support to prevent future errors.
505 				 */
506 				pool_config->memory_cache_enabled = 0;
507 				/* Behave as if cache not found */
508 				return 1;
509 			}
510 			else
511 			{
512 				/* Not found */
513 				ereport(DEBUG1,
514 						(errmsg("fetching from cache storage"),
515 						 errdetail("cache item not found for key: \"%s\" and query:\"%s\"", tmpkey, query)));
516 				return 1;
517 			}
518 		}
519 	}
520 #else
521 	else
522 	{
523 		ereport(ERROR,
524 				(errmsg("memcached support is not enabled")));
525 	}
526 #endif
527 
528 	p = palloc(*len);
529 
530 	memcpy(p, ptr, *len);
531 
532 	if (!pool_is_shmem_cache())
533 	{
534 		free(ptr);
535 	}
536 
537 	ereport(DEBUG1,
538 			(errmsg("fetching from cache storage"),
539 			 errdetail("query=\"%s\" len:%zd", query, *len)));
540 #ifdef DEBUG
541 	dump_cache_data(p, *len);
542 #endif
543 
544 	*buf = p;
545 
546 	return 0;
547 }
548 
549 /*
550  * encode key.
551  * create cache key as md5(username + query string + database name)
552  */
553 static char *
encode_key(const char * s,char * buf,POOL_CONNECTION_POOL * backend)554 encode_key(const char *s, char *buf, POOL_CONNECTION_POOL * backend)
555 {
556 	char	   *strkey;
557 	int			u_length;
558 	int			d_length;
559 	int			q_length;
560 	int			length;
561 
562 	u_length = strlen(backend->info->user);
563 	ereport(DEBUG1,
564 			(errmsg("memcache encode key"),
565 			 errdetail("username: \"%s\" database_name: \"%s\"", backend->info->user, backend->info->database)));
566 
567 	d_length = strlen(backend->info->database);
568 
569 	q_length = strlen(s);
570 	ereport(DEBUG1,
571 			(errmsg("memcache encode key"),
572 			 errdetail("query: \"%s\"", s)));
573 
574 	length = u_length + d_length + q_length + 1;
575 
576 	strkey = (char *) palloc(sizeof(char) * length);
577 
578 	snprintf(strkey, length, "%s%s%s", backend->info->user, s, backend->info->database);
579 
580 	pool_md5_hash(strkey, strlen(strkey), buf);
581 	ereport(DEBUG1,
582 			(errmsg("memcache encode key"),
583 			 errdetail("`%s' -> `%s'", strkey, buf)));
584 	pfree(strkey);
585 	return buf;
586 }
587 
588 #ifdef DEBUG
589 /*
590  * dump cache data
591  */
592 static void
dump_cache_data(const char * data,size_t len)593 dump_cache_data(const char *data, size_t len)
594 {
595 	int			i;
596 	int			plen;
597 
598 
599 	fprintf(stderr, "shmem: len = %zd\n", len);
600 
601 	while (len > 0)
602 	{
603 		fprintf(stderr, "shmem: kind:%c\n", *data++);
604 		len--;
605 		memmove(&plen, data, 4);
606 		len -= 4;
607 		data += 4;
608 		plen = ntohl(plen);
609 		fprintf(stderr, "shmem: len:%d\n", plen);
610 		plen -= 4;
611 
612 		fprintf(stderr, "shmem: ");
613 		for (i = 0; i < plen; i++)
614 		{
615 			fprintf(stderr, "%02x ", (unsigned char) (*data++));
616 			len--;
617 		}
618 		fprintf(stderr, "\n");
619 	}
620 }
621 #endif
622 
623 /*
624  * send cached messages
625  */
626 static int
send_cached_messages(POOL_CONNECTION * frontend,const char * qcache,int qcachelen)627 send_cached_messages(POOL_CONNECTION * frontend, const char *qcache, int qcachelen)
628 {
629 	int			msg = 0;
630 	int			i = 0;
631 	int			is_prepared_stmt = 0;
632 	int			len;
633 	const char *p;
634 
635 	while (i < qcachelen)
636 	{
637 		char		tmpkind;
638 		int			tmplen;
639 
640 		tmpkind = qcache[i];
641 		i++;
642 
643 		memcpy(&tmplen, qcache + i, sizeof(tmplen));
644 		i += sizeof(tmplen);
645 		len = ntohl(tmplen);
646 		p = qcache + i;
647 		i += len - sizeof(tmplen);
648 
649 		/* No need to cache PARSE and BIND responses */
650 		if (tmpkind == '1' || tmpkind == '2')
651 		{
652 			is_prepared_stmt = 1;
653 			continue;
654 		}
655 
656 		/*
657 		 * In the prepared statement execution, there is no need to send 'T'
658 		 * response to the frontend.
659 		 */
660 		if (is_prepared_stmt && tmpkind == 'T')
661 		{
662 			continue;
663 		}
664 
665 		/* send message to frontend */
666 		ereport(DEBUG1,
667 				(errmsg("memcache: sending cached messages: '%c' len: %d", tmpkind, len)));
668 		send_message(frontend, tmpkind, len, p);
669 
670 		msg++;
671 	}
672 
673 	return msg;
674 }
675 
676 /*
677  * send message to frontend
678  */
679 static void
send_message(POOL_CONNECTION * conn,char kind,int len,const char * data)680 send_message(POOL_CONNECTION * conn, char kind, int len, const char *data)
681 {
682 	ereport(DEBUG2,
683 			(errmsg("memcache: sending messages: kind '%c', len=%d, data=%p", kind, len, data)));
684 
685 	pool_write(conn, &kind, 1);
686 
687 	len = htonl(len);
688 	pool_write(conn, &len, sizeof(len));
689 
690 	len = ntohl(len);
691 	pool_write(conn, (void *) data, len - sizeof(len));
692 }
693 
694 #ifdef USE_MEMCACHED
695 /*
696  * delete query cache on memcached
697  */
698 static int
delete_cache_on_memcached(const char * key)699 delete_cache_on_memcached(const char *key)
700 {
701 
702 	memcached_return rc;
703 
704 	ereport(DEBUG2,
705 			(errmsg("memcache: deleteing cache on memcached with key: \"%s\"", key)));
706 
707 
708 	/* delete cache data on memcached. key is md5 hash query */
709 	rc = memcached_delete(memc, key, 32, (time_t) 0);
710 
711 	/* delete cache data on memcached is failed */
712 	if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_BUFFERED)
713 	{
714 		ereport(LOG,
715 				(errmsg("failed to delete cache on memcached, error:\"%s\"", memcached_strerror(memc, rc))));
716 		return 0;
717 	}
718 	return 1;
719 
720 }
721 #endif
722 
723 /*
724  * Fetch SELECT data from cache if possible.
725  */
726 POOL_STATUS
pool_fetch_from_memory_cache(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend,char * contents,bool * foundp)727 pool_fetch_from_memory_cache(POOL_CONNECTION * frontend,
728 							 POOL_CONNECTION_POOL * backend,
729 							 char *contents, bool *foundp)
730 {
731 	char	   *qcache;
732 	size_t		qcachelen;
733 	int			sts;
734 	pool_sigset_t oldmask;
735 
736 	ereport(DEBUG1,
737 			(errmsg("pool_fetch_from_memory_cache called")));
738 
739 	*foundp = false;
740 
741 	POOL_SETMASK2(&BlockSig, &oldmask);
742 	pool_shmem_lock();
743 
744 	PG_TRY();
745 	{
746 		sts = pool_fetch_cache(backend, contents, &qcache, &qcachelen);
747 	}
748 	PG_CATCH();
749 	{
750 		pool_shmem_unlock();
751 		POOL_SETMASK(&oldmask);
752 		PG_RE_THROW();
753 	}
754 	PG_END_TRY();
755 
756 	pool_shmem_unlock();
757 	POOL_SETMASK(&oldmask);
758 
759 	if (sts != 0)
760 		/* Cache not found */
761 		return POOL_CONTINUE;
762 
763 	/*
764 	 * Cache found. If we are doing extended query and in streaming
765 	 * replication mode, we need to retrieve any responses from backend and
766 	 * forward them to frontend.
767 	 */
768 	if (pool_is_doing_extended_query_message() && SL_MODE)
769 	{
770 		POOL_SESSION_CONTEXT *session_context;
771 		POOL_CONNECTION *target_backend;
772 
773 		ereport(DEBUG1,
774 				(errmsg("memcache: injecting cache data")));
775 
776 		session_context = pool_get_session_context(true);
777 		target_backend = CONNECTION(backend, session_context->load_balance_node_id);
778 		inject_cached_message(target_backend, qcache, qcachelen);
779 	}
780 	else
781 	{
782 		/*
783 		 * Send each messages to frontend
784 		 */
785 		send_cached_messages(frontend, qcache, qcachelen);
786 	}
787 
788 	pfree(qcache);
789 
790 	/*
791 	 * Send a "READY FOR QUERY" if not in extended query.
792 	 */
793 	if (!pool_is_doing_extended_query_message() && MAJOR(backend) == PROTO_MAJOR_V3)
794 	{
795 		signed char state;
796 
797 		/*
798 		 * We keep previous transaction state.
799 		 */
800 		state = MASTER(backend)->tstate;
801 		send_message(frontend, 'Z', 5, (char *) &state);
802 	}
803 
804 	if (!pool_is_doing_extended_query_message() || !SL_MODE)
805 	{
806 		if (pool_flush(frontend))
807 		{
808 			return POOL_END;
809 		}
810 	}
811 
812 	*foundp = true;
813 
814 	if (pool_config->log_per_node_statement)
815 		ereport(LOG,
816 				(errmsg("fetch from memory cache"),
817 				 errdetail("query result fetched from cache. statement: %s", contents)));
818 
819 	ereport(DEBUG1,
820 			(errmsg("fetch from memory cache"),
821 			 errdetail("query result found in the query cache, %s", contents)));
822 
823 	return POOL_CONTINUE;
824 }
825 
826 /*
827  * Simple and rough (thus unreliable) check if the query is likely
828  * SELECT. Just check if the query starts with SELECT or WITH. This
829  * can be used before parse tree is available.
830  */
831 bool
pool_is_likely_select(char * query)832 pool_is_likely_select(char *query)
833 {
834 	bool		do_continue = false;
835 
836 	if (query == NULL)
837 		return false;
838 
839 	if (pool_config->ignore_leading_white_space)
840 	{
841 		/* Ignore leading white spaces */
842 		while (*query && isspace(*query))
843 			query++;
844 	}
845 	if (!*query)
846 	{
847 		return false;
848 	}
849 
850 	/*
851 	 * Get rid of head comment. It is sure that the query is in correct
852 	 * format, because the parser has rejected bad queries such as the one
853 	 * with not-ended comment.
854 	 */
855 	while (*query)
856 	{
857 		/* Ignore spaces and return marks */
858 		do_continue = false;
859 		while (*query && isspace(*query))
860 		{
861 			query++;
862 			do_continue = true;
863 		}
864 		if (do_continue)
865 		{
866 			continue;
867 		}
868 
869 		while (*query && !strncmp(query, "\n", 2))
870 		{
871 			query++;
872 			do_continue = true;
873 		}
874 		if (do_continue)
875 		{
876 			query += 2;
877 			continue;
878 		}
879 
880 		/* Ignore comments like C */
881 		if (!strncmp(query, "/*", 2))
882 		{
883 			while (*query && strncmp(query, "*/", 2))
884 				query++;
885 
886 			query += 2;
887 			continue;
888 		}
889 
890 		/* Ignore SQL comments */
891 		if (!strncmp(query, "--", 2))
892 		{
893 			while (*query && strncmp(query, "\n", 2))
894 				query++;
895 
896 			query += 2;
897 			continue;
898 		}
899 
900 		if (!strncasecmp(query, "SELECT", 6) || !strncasecmp(query, "WITH", 4))
901 		{
902 			return true;
903 		}
904 
905 		query++;
906 	}
907 
908 	return false;
909 }
910 
911 /*
912  * Return true if SELECT can be cached.  "node" is the parse tree for
913  * the query and "query" is the query string.
914  * The query must be SELECT or WITH.
915  */
916 bool
pool_is_allow_to_cache(Node * node,char * query)917 pool_is_allow_to_cache(Node *node, char *query)
918 {
919 	int			i = 0;
920 	int			num_oids = -1;
921 	SelectContext ctx;
922 
923 	/*
924 	 * If NO QUERY CACHE comment exists, do not cache.
925 	 */
926 	if (!strncasecmp(query, NO_QUERY_CACHE, NO_QUERY_CACHE_COMMENT_SZ))
927 		return false;
928 
929 	/*
930 	 * Check black table list first.
931 	 */
932 	if (pool_config->num_black_memqcache_table_list > 0)
933 	{
934 		/*
935 		 * Extract oids in from clause of SELECT, and check if SELECT to them
936 		 * could be cached.
937 		 */
938 		num_oids = pool_extract_table_oids_from_select_stmt(node, &ctx);
939 		if (num_oids > 0)
940 		{
941 			for (i = 0; i < num_oids; i++)
942 			{
943 				ereport(DEBUG1,
944 						(errmsg("memcache: checking if node is allowed to cache: check table_names[%d] = \"%s\"", i, ctx.table_names[i])));
945 				if (pool_is_table_in_black_list(ctx.table_names[i]) == true)
946 				{
947 					ereport(DEBUG1,
948 							(errmsg("memcache: node is not allowed to cache")));
949 					return false;
950 				}
951 			}
952 		}
953 	}
954 
955 	/* SELECT INTO or SELECT FOR SHARE or UPDATE cannot be cached */
956 	if (pool_has_insertinto_or_locking_clause(node))
957 		return false;
958 
959 	/*
960 	 * If SELECT uses non immutable functions, it's not allowed to cache.
961 	 */
962 	if (pool_has_non_immutable_function_call(node))
963 		return false;
964 
965 	/*
966 	 * If SELECT uses temporary tables it's not allowed to cache.
967 	 */
968 	if (pool_config->check_temp_table && pool_has_temp_table(node))
969 		return false;
970 
971 	/*
972 	 * If SELECT uses system catalogs, it's not allowed to cache.
973 	 */
974 	if (pool_has_system_catalog(node))
975 		return false;
976 
977 	/*
978 	 * TABLESAMPLE is not allowed to cache.
979 	 */
980 	if (IsA(node, SelectStmt) &&((SelectStmt *) node)->fromClause)
981 	{
982 		List	   *tbl_list = ((SelectStmt *) node)->fromClause;
983 		ListCell   *tbl;
984 
985 		foreach(tbl, tbl_list)
986 		{
987 			if (IsA(lfirst(tbl), RangeTableSample))
988 				return false;
989 		}
990 	}
991 
992 
993 	/*
994 	 * If the table is in the while list, allow to cache even if it is VIEW or
995 	 * unlogged table.
996 	 */
997 	if (pool_config->num_white_memqcache_table_list > 0)
998 	{
999 		if (num_oids < 0)
1000 			num_oids = pool_extract_table_oids_from_select_stmt(node, &ctx);
1001 
1002 		if (num_oids > 0)
1003 		{
1004 			for (i = 0; i < num_oids; i++)
1005 			{
1006 				char	   *table = ctx.table_names[i];
1007 
1008 				ereport(DEBUG1,
1009 						(errmsg("memcache: checking if node is allowed to cache: check table_names[%d] = \"%s\"", i, table)));
1010 				if (is_view(table) || is_unlogged_table(table))
1011 				{
1012 					if (pool_is_table_in_white_list(table) == false)
1013 					{
1014 						ereport(DEBUG1,
1015 								(errmsg("memcache: node is not allowed to cache")));
1016 						return false;
1017 					}
1018 				}
1019 			}
1020 		}
1021 	}
1022 	else
1023 	{
1024 		/*
1025 		 * If SELECT uses views, it's not allowed to cache.
1026 		 */
1027 		if (pool_has_view(node))
1028 			return false;
1029 
1030 		/*
1031 		 * If SELECT uses unlogged tables, it's not allowed to cache.
1032 		 */
1033 		if (pool_has_unlogged_table(node))
1034 			return false;
1035 	}
1036 
1037 	/*
1038 	 * If Data-modifying statements in WITH clause, it's not allowed to cache.
1039 	 */
1040 	if(IsA(node, SelectStmt) && ((SelectStmt *) node)->withClause)
1041 	{
1042 		ListCell	*lc;
1043 		WithClause	*withClause = ((SelectStmt *) node)->withClause;
1044 
1045 		foreach(lc, withClause->ctes)
1046 		{
1047 			CommonTableExpr *cte = (CommonTableExpr *)lfirst(lc);
1048 			if(IsA(cte->ctequery, InsertStmt) ||
1049 			   IsA(cte->ctequery, DeleteStmt) ||
1050 			   IsA(cte->ctequery, UpdateStmt))
1051 			{
1052 				return false;
1053 			}
1054 		}
1055 	}
1056 
1057 	return true;
1058 }
1059 
1060 
1061 /*
1062  * Return true If the SELECTed table is in back list.
1063  */
1064 bool
pool_is_table_in_black_list(const char * table_name)1065 pool_is_table_in_black_list(const char *table_name)
1066 {
1067 
1068 	if (pool_config->num_black_memqcache_table_list > 0 &&
1069 		pattern_compare((char *) table_name, BLACKLIST, "black_memqcache_table_list") == 1)
1070 	{
1071 		return true;
1072 	}
1073 
1074 	return false;
1075 }
1076 
1077 /*
1078  * Return true If the SELECTed table is in white list.
1079  */
1080 bool
pool_is_table_in_white_list(const char * table_name)1081 pool_is_table_in_white_list(const char *table_name)
1082 {
1083 	if (pool_config->num_white_memqcache_table_list > 0 &&
1084 		pattern_compare((char *) table_name, WHITELIST, "white_memqcache_table_list") == 1)
1085 	{
1086 		return true;
1087 	}
1088 
1089 	return false;
1090 }
1091 
1092 /*
1093  * Extract table oid from INSERT/UPDATE/DELETE/TRUNCATE/
1094  * DROP TABLE/ALTER TABLE/COPY FROM statement.
1095  * For SELECT, if Data-modifying statements in its WITH clause,
1096  * extract table oid from Data-modifying statements too.
1097  * Returns number of oids.
1098  * In case of error, returns 0 (InvalidOid).
1099  * oids buffer (oidsp) will be discarded by subsequent call.
1100  */
1101 int
pool_extract_table_oids(Node * node,int ** oidsp)1102 pool_extract_table_oids(Node *node, int **oidsp)
1103 {
1104 #define POOL_MAX_DML_OIDS 128
1105 	char	   *table;
1106 	static int	oids[POOL_MAX_DML_OIDS];
1107 	int			num_oids;
1108 	int			oid;
1109 
1110 	if (node == NULL)
1111 	{
1112 		ereport(LOG,
1113 				(errmsg("memcache: error while extracting table oids. statement is NULL")));
1114 		return 0;
1115 	}
1116 
1117 	num_oids = 0;
1118 	*oidsp = oids;
1119 
1120 	if (IsA(node, InsertStmt))
1121 	{
1122 		InsertStmt *stmt = (InsertStmt *) node;
1123 
1124 		num_oids = pool_extract_withclause_oids((Node *) stmt->withClause, *oidsp);
1125 		table = make_table_name_from_rangevar(stmt->relation);
1126 	}
1127 	else if (IsA(node, UpdateStmt))
1128 	{
1129 		UpdateStmt *stmt = (UpdateStmt *) node;
1130 
1131 		num_oids = pool_extract_withclause_oids((Node *) stmt->withClause, *oidsp);
1132 		table = make_table_name_from_rangevar(stmt->relation);
1133 	}
1134 	else if (IsA(node, DeleteStmt))
1135 	{
1136 		DeleteStmt *stmt = (DeleteStmt *) node;
1137 
1138 		num_oids = pool_extract_withclause_oids((Node *) stmt->withClause, *oidsp);
1139 		table = make_table_name_from_rangevar(stmt->relation);
1140 	}
1141 	else if(IsA(node, SelectStmt))
1142 	{
1143 		SelectStmt *stmt = (SelectStmt *) node;
1144 		num_oids = pool_extract_withclause_oids((Node *) stmt->withClause, *oidsp);
1145 		table = NULL;
1146 	}
1147 #ifdef NOT_USED
1148 
1149 	/*
1150 	 * We do not handle CREATE TABLE here.  It is possible that
1151 	 * pool_extract_table_oids() is called before CREATE TABLE gets executed.
1152 	 */
1153 	else if (IsA(node, CreateStmt))
1154 	{
1155 		CreateStmt *stmt = (CreateStmt *) node;
1156 
1157 		table = make_table_name_from_rangevar(stmt->relation);
1158 	}
1159 #endif
1160 
1161 	else if (IsA(node, AlterTableStmt))
1162 	{
1163 		AlterTableStmt *stmt = (AlterTableStmt *) node;
1164 
1165 		table = make_table_name_from_rangevar(stmt->relation);
1166 	}
1167 
1168 	else if (IsA(node, CopyStmt))
1169 	{
1170 		CopyStmt   *stmt = (CopyStmt *) node;
1171 
1172 		if (stmt->is_from)		/* COPY FROM? */
1173 		{
1174 			table = make_table_name_from_rangevar(stmt->relation);
1175 		}
1176 		else
1177 		{
1178 			return 0;
1179 		}
1180 	}
1181 
1182 	else if (IsA(node, DropStmt))
1183 	{
1184 		ListCell   *cell;
1185 
1186 		DropStmt   *stmt = (DropStmt *) node;
1187 
1188 		if (stmt->removeType != OBJECT_TABLE)
1189 		{
1190 			return 0;
1191 		}
1192 
1193 		/*
1194 		 * Here, stmt->objects is list of target relation info.  The first
1195 		 * cell of target relation info is a list (possibly) consists of
1196 		 * database, schema and relation.  We need to call
1197 		 * makeRangeVarFromNameList() before passing to
1198 		 * make_table_name_from_rangevar. Otherwise we get weird excessively
1199 		 * decorated relation name (''table_name'').
1200 		 */
1201 		foreach(cell, stmt->objects)
1202 		{
1203 			if (num_oids >= POOL_MAX_DML_OIDS)
1204 			{
1205 				ereport(LOG,
1206 						(errmsg("memcache: error while extracting table oids. too many oids:%d", num_oids)));
1207 				return 0;
1208 			}
1209 
1210 			table = make_table_name_from_rangevar(makeRangeVarFromNameList(lfirst(cell)));
1211 			oid = pool_table_name_to_oid(table);
1212 			if (oid > 0)
1213 			{
1214 				oids[num_oids++] = pool_table_name_to_oid(table);
1215 				ereport(DEBUG1,
1216 						(errmsg("memcache: extracting table oids: table: \"%s\" oid:%d", table, oids[num_oids - 1])));
1217 			}
1218 		}
1219 		return num_oids;
1220 	}
1221 	else if (IsA(node, TruncateStmt))
1222 	{
1223 		ListCell   *cell;
1224 
1225 		TruncateStmt *stmt = (TruncateStmt *) node;
1226 
1227 		foreach(cell, stmt->relations)
1228 		{
1229 			if (num_oids >= POOL_MAX_DML_OIDS)
1230 			{
1231 				ereport(LOG,
1232 						(errmsg("memcache: error while extracting table oids. too many oids:%d", num_oids)));
1233 				return 0;
1234 			}
1235 
1236 			table = make_table_name_from_rangevar(lfirst(cell));
1237 			oid = pool_table_name_to_oid(table);
1238 			if (oid > 0)
1239 			{
1240 				oids[num_oids++] = pool_table_name_to_oid(table);
1241 				ereport(DEBUG1,
1242 						(errmsg("memcache: extracting table oids: table: \"%s\" oid:%d", table, oids[num_oids - 1])));
1243 			}
1244 		}
1245 		return num_oids;
1246 	}
1247 	else if (IsA(node, ExplainStmt))
1248 	{
1249 		ListCell	*cell;
1250 		DefElem		*def;
1251 		ExplainStmt *stmt = (ExplainStmt *) node;
1252 
1253 		foreach(cell, stmt->options)
1254 		{
1255 			def = lfirst(cell);
1256 			if (strncmp("analyze", def->defname, 7) == 0)
1257 			{
1258 				return pool_extract_table_oids(stmt->query, oidsp);
1259 			}
1260 		}
1261 
1262 		table = NULL;
1263 	}
1264 	else
1265 	{
1266 		ereport(DEBUG1,
1267 				(errmsg("memcache: extracting table oids: statment is different from INSERT/UPDATE/DELETE/TRUNCATE/DROP TABLE/ALTER TABLE")));
1268 		return 0;
1269 	}
1270 
1271 	oid = pool_table_name_to_oid(table);
1272 	if (oid > 0)
1273 	{
1274 		if (num_oids >= POOL_MAX_DML_OIDS)
1275 		{
1276 			ereport(LOG,
1277 					(errmsg("memcache: error while extracting table oids. too many oids:%d", num_oids)));
1278 			return 0;
1279 		}
1280 
1281 		oids[num_oids++] = pool_table_name_to_oid(table);
1282 		ereport(DEBUG1,
1283 				(errmsg("memcache: extracting table oids: table: \"%s\" oid:%d", table, oid)));
1284 	}
1285 	return num_oids;
1286 }
1287 
1288 /*
1289  * Extract table oid from INSERT/UPDATE/DELETE
1290  * FROM statement in WITH clause.
1291  * Returns number of oids.
1292  * oids buffer (oidsp) will be discarded by subsequent call.
1293  */
1294 int
pool_extract_withclause_oids(Node * node,int * oidsp)1295 pool_extract_withclause_oids(Node *node, int *oidsp)
1296 {
1297 	int			num_oids = 0;
1298 	int			oid;
1299 	char	   *table;
1300 	ListCell   *lc;
1301 	WithClause *with;
1302 
1303 	if(oidsp == NULL)
1304 	{
1305 		return 0;
1306 	}
1307 
1308 	if(!node || !IsA(node, WithClause))
1309 	{
1310 		return 0;
1311 	}
1312 
1313 	with = (WithClause *) node;
1314 	foreach(lc, with->ctes)
1315 	{
1316 		CommonTableExpr *cte = (CommonTableExpr *)lfirst(lc);
1317 		if(IsA(cte->ctequery, InsertStmt))
1318 		{
1319 			InsertStmt *stmt = (InsertStmt *) cte->ctequery;
1320 			table = make_table_name_from_rangevar(stmt->relation);
1321 		}
1322 		else if(IsA(cte->ctequery, DeleteStmt))
1323 		{
1324 			DeleteStmt *stmt = (DeleteStmt *) cte->ctequery;
1325 			table = make_table_name_from_rangevar(stmt->relation);
1326 		}
1327 		else if(IsA(cte->ctequery, UpdateStmt))
1328 		{
1329 			UpdateStmt *stmt = (UpdateStmt *) cte->ctequery;
1330 			table = make_table_name_from_rangevar(stmt->relation);
1331 		}
1332 		else
1333 		{
1334 			/* only check INSERT/DELETE/UPDATE in WITH clause */
1335 			table = NULL;
1336 		}
1337 
1338 		oid = pool_table_name_to_oid(table);
1339 		if (oid > 0)
1340 		{
1341 			if (num_oids >= POOL_MAX_DML_OIDS)
1342 			{
1343 				break;
1344 			}
1345 
1346 			oidsp[num_oids++] = pool_table_name_to_oid(table);
1347 			ereport(DEBUG1,
1348 					(errmsg("memcache: extracting table oids: table: \"%s\" oid:%d", table, oidsp[num_oids - 1])));
1349 		}
1350 	}
1351 
1352 	return num_oids;
1353 }
1354 
1355 #define POOL_OIDBUF_SIZE 1024
1356 static int *oidbuf;
1357 static int	oidbufp;
1358 static int	oidbuf_size;
1359 
1360 /*
1361  * Add table oid to internal buffer
1362  */
1363 void
pool_add_dml_table_oid(int oid)1364 pool_add_dml_table_oid(int oid)
1365 {
1366 	int			i;
1367 	int		   *tmp;
1368 
1369 	if (oid == 0)
1370 		return;
1371 
1372 	if (oidbufp >= oidbuf_size)
1373 	{
1374 		MemoryContext oldcxt;
1375 
1376 		oidbuf_size += POOL_OIDBUF_SIZE;
1377 
1378 		/*
1379 		 * This need to live throughout the life of child so home it in
1380 		 * TopMemoryContext
1381 		 */
1382 		oldcxt = MemoryContextSwitchTo(TopMemoryContext);
1383 		tmp = repalloc(oidbuf, sizeof(int) * oidbuf_size);
1384 		MemoryContextSwitchTo(oldcxt);
1385 		if (tmp == NULL)
1386 			return;
1387 
1388 		oidbuf = tmp;
1389 	}
1390 
1391 	for (i = 0; i < oidbufp; i++)
1392 	{
1393 		if (oidbuf[i] == oid)
1394 			/* Already same oid exists */
1395 			return;
1396 	}
1397 	oidbuf[oidbufp++] = oid;
1398 }
1399 
1400 
1401 /*
1402  * Get table oid buffer
1403  */
1404 static int
pool_get_dml_table_oid(int ** oid)1405 pool_get_dml_table_oid(int **oid)
1406 {
1407 	*oid = oidbuf;
1408 	return oidbufp;
1409 }
1410 
1411 static int
pool_get_dropdb_table_oids(int ** oids,int dboid)1412 pool_get_dropdb_table_oids(int **oids, int dboid)
1413 {
1414 	int		   *rtn = 0;
1415 	int			oids_size = 0;
1416 	int		   *tmp;
1417 
1418 	int			num_oids = 0;
1419 	DIR		   *dir;
1420 	struct dirent *dp;
1421 	char		path[1024];
1422 
1423 	snprintf(path, sizeof(path), "%s/%d", pool_config->memqcache_oiddir, dboid);
1424 	if ((dir = opendir(path)) == NULL)
1425 	{
1426 		ereport(DEBUG1,
1427 				(errmsg("memcache: getting drop table oids"),
1428 				 errdetail("Failed to open dir: %s", path)));
1429 		return 0;
1430 	}
1431 
1432 	while ((dp = readdir(dir)) != NULL)
1433 	{
1434 		if (strcmp(dp->d_name, ".") == 0 || strcmp(dp->d_name, "..") == 0)
1435 			continue;
1436 
1437 		if (num_oids >= oids_size)
1438 		{
1439 			oids_size += POOL_OIDBUF_SIZE;
1440 			tmp = repalloc(rtn, sizeof(int) * oids_size);
1441 			if (tmp == NULL)
1442 			{
1443 				closedir(dir);
1444 				return 0;
1445 			}
1446 			rtn = tmp;
1447 		}
1448 
1449 		rtn[num_oids] = atol(dp->d_name);
1450 		num_oids++;
1451 	}
1452 
1453 	closedir(dir);
1454 	*oids = rtn;
1455 
1456 	return num_oids;
1457 }
1458 
1459 /* Discard oid internal buffer */
1460 static void
pool_discard_dml_table_oid(void)1461 pool_discard_dml_table_oid(void)
1462 {
1463 	oidbufp = 0;
1464 }
1465 
1466 /*
1467  * Management modules for oid map.  When caching SELECT results, we
1468  * record table oids to file, which has following structure.
1469  *
1470  * memqcache_oiddir -+- database_oid -+-table_oid_file1
1471  *                                    |
1472  *                                    +-table_oid_file2
1473  *                                    |
1474  *                                    +-table_oid_file3...
1475  *
1476  * table_oid_file's name is table oid, which was used by the SELECT
1477  * statement. The file has 1 or more cacheid(s). When SELECT result is
1478  * cached, the file is created and cache id is appended. Later SELECT
1479  * using same table oid will add to the same file. If the SELECT uses
1480  * multiple tables, multiple table_oid_file will be created. When
1481  * INSERT/UPDATE/DELETE is executed, corresponding caches must be
1482  * deleted (cache invalidation) (when DROP TABLE, ALTER TABLE is
1483  * executed, the caches must be deleted as well). When database is
1484  * dropped, all caches belonging to the database must be deleted.
1485  */
1486 
1487 /*
1488  * Get oid of current database
1489  */
1490 static int
pool_get_database_oid(void)1491 pool_get_database_oid(void)
1492 {
1493 /*
1494  * Query to convert table name to oid
1495  */
1496 	int			oid = 0;
1497 	static POOL_RELCACHE * relcache;
1498 	POOL_CONNECTION_POOL *backend;
1499 
1500 	backend = pool_get_session_context(false)->backend;
1501 
1502 	/*
1503 	 * If relcache does not exist, create it.
1504 	 */
1505 	if (!relcache)
1506 	{
1507 		relcache = pool_create_relcache(pool_config->relcache_size, DATABASE_TO_OID_QUERY,
1508 										int_register_func, int_unregister_func,
1509 										false);
1510 		if (relcache == NULL)
1511 		{
1512 			ereport(LOG,
1513 					(errmsg("memcache: error creating relcache while getting database OID")));
1514 			return oid;
1515 		}
1516 	}
1517 
1518 	/*
1519 	 * Search relcache.
1520 	 */
1521 	oid = (int) (intptr_t) pool_search_relcache(relcache, backend,
1522 												MASTER_CONNECTION(backend)->sp->database);
1523 	return oid;
1524 }
1525 
1526 /*
1527  * Get oid of current database for discarding cache files
1528  * after executing DROP DATABASE
1529  */
1530 int
pool_get_database_oid_from_dbname(char * dbname)1531 pool_get_database_oid_from_dbname(char *dbname)
1532 {
1533 	int			dboid = 0;
1534 	POOL_SELECT_RESULT *res;
1535 	char		query[1024];
1536 
1537 	POOL_CONNECTION_POOL *backend;
1538 
1539 	backend = pool_get_session_context(false)->backend;
1540 
1541 	snprintf(query, sizeof(query), DATABASE_TO_OID_QUERY, dbname);
1542 	do_query(MASTER(backend), query, &res, MAJOR(backend));
1543 
1544 	if (res->numrows != 1)
1545 	{
1546 		ereport(DEBUG1,
1547 				(errmsg("memcache: getting oid of current database"),
1548 				 errdetail("received %d rows", res->numrows)));
1549 		free_select_result(res);
1550 		return 0;
1551 	}
1552 
1553 	dboid = atol(res->data[0]);
1554 	free_select_result(res);
1555 
1556 	return dboid;
1557 }
1558 
1559 /*
1560  * Add cache id (shmem case) or hash key (memcached case) to table oid
1561  * map file.  Caller must hold shmem lock before calling this function
1562  * to avoid file extension conflict among different pgpool child
1563  * process.
1564  * As of pgpool-II 3.2, pool_handle_query_cache is responsible for that.
1565  * (pool_handle_query_cache -> pool_commit_cache -> pool_add_table_oid_map)
1566  */
1567 static void
pool_add_table_oid_map(POOL_CACHEKEY * cachekey,int num_table_oids,int * table_oids)1568 pool_add_table_oid_map(POOL_CACHEKEY * cachekey, int num_table_oids, int *table_oids)
1569 {
1570 	char	   *dir;
1571 	int			dboid;
1572 	char		path[1024];
1573 	int			i;
1574 	int			len;
1575 
1576 	/*
1577 	 * Create memqcache_oiddir
1578 	 */
1579 	dir = pool_config->memqcache_oiddir;
1580 
1581 	if (mkdir(dir, S_IREAD | S_IWRITE | S_IEXEC) == -1)
1582 	{
1583 		if (errno != EEXIST)
1584 		{
1585 			ereport(WARNING,
1586 					(errmsg("memcache: adding table oid maps, failed to create directory:\"%s\"", dir),
1587 					 errdetail("%m")));
1588 			return;
1589 		}
1590 	}
1591 
1592 	/*
1593 	 * Create memqcache_oiddir/database_oid
1594 	 */
1595 	dboid = pool_get_database_oid();
1596 	ereport(DEBUG1,
1597 			(errmsg("memcache: adding table oid maps"),
1598 			 errdetail("dboid %d", dboid)));
1599 
1600 	if (dboid <= 0)
1601 	{
1602 		ereport(WARNING,
1603 				(errmsg("memcache: adding table oid maps, failed to get database OID")));
1604 		return;
1605 	}
1606 
1607 	snprintf(path, sizeof(path), "%s/%d", dir, dboid);
1608 	if (mkdir(path, S_IREAD | S_IWRITE | S_IEXEC) == -1)
1609 	{
1610 		if (errno != EEXIST)
1611 		{
1612 			ereport(WARNING,
1613 					(errmsg("memcache: adding table oid maps, failed to create directory:\"%s\"", path),
1614 					 errdetail("%m")));
1615 			return;
1616 		}
1617 	}
1618 
1619 	if (pool_is_shmem_cache())
1620 	{
1621 		len = sizeof(cachekey->cacheid);
1622 	}
1623 	else
1624 	{
1625 		len = sizeof(cachekey->hashkey);
1626 	}
1627 
1628 	for (i = 0; i < num_table_oids; i++)
1629 	{
1630 		int			fd;
1631 		int			oid = table_oids[i];
1632 		int			sts;
1633 		struct flock fl;
1634 
1635 		/*
1636 		 * Create or open each memqcache_oiddir/database_oid/table_oid
1637 		 */
1638 		snprintf(path, sizeof(path), "%s/%d/%d", dir, dboid, oid);
1639 		if ((fd = open(path, O_CREAT | O_RDWR, S_IRUSR | S_IWUSR)) == -1)
1640 		{
1641 			ereport(WARNING,
1642 					(errmsg("memcache: adding table oid maps, failed to open file:\"%s\"", path),
1643 					 errdetail("%m")));
1644 			return;
1645 		}
1646 
1647 		fl.l_type = F_WRLCK;
1648 		fl.l_whence = SEEK_SET;
1649 		fl.l_start = 0;			/* Offset from l_whence         */
1650 		fl.l_len = 0;			/* length, 0 = to EOF           */
1651 
1652 		sts = fcntl(fd, F_SETLKW, &fl);
1653 		if (sts == -1)
1654 		{
1655 			ereport(WARNING,
1656 					(errmsg("memcache: adding table oid maps, failed to lock file:\"%s\"", path),
1657 					 errdetail("%m")));
1658 
1659 			close(fd);
1660 			return;
1661 		}
1662 
1663 		/*
1664 		 * Below was ifdef-out because of a performance reason. Looking for
1665 		 * duplicate cache entries in a file needed unacceptably high cost. So
1666 		 * we gave up this and decided not to care about duplicate entries in
1667 		 * the file.
1668 		 */
1669 #ifdef NOT_USED
1670 		for (;;)
1671 		{
1672 			sts = read(fd, (char *) &buf, len);
1673 			if (sts == -1)
1674 			{
1675 				ereport(WARNING,
1676 						(errmsg("memcache: adding table oid maps, failed to read file:\"%s\"", path),
1677 						 errdetail("%m")));
1678 				close(fd);
1679 				return;
1680 			}
1681 			else if (sts == len)
1682 			{
1683 				if (memcmp(cachekey, &buf, len) == 0)
1684 				{
1685 					/* Same key found. Skip this */
1686 					close(fd);
1687 					return;
1688 				}
1689 				continue;
1690 			}
1691 
1692 			/*
1693 			 * Must be EOF
1694 			 */
1695 			if (sts != 0)
1696 			{
1697 				ereport(WARNING,
1698 						(errmsg("memcache: adding table oid maps, invalid data length:%d in file:\"%s\". error:\"%s\"", sts, path)));
1699 				close(fd);
1700 				return;
1701 			}
1702 			break;
1703 		}
1704 #endif
1705 
1706 		if (lseek(fd, 0, SEEK_END) == -1)
1707 		{
1708 			ereport(WARNING,
1709 					(errmsg("memcache: adding table oid maps, failed seek on file:\"%s\"", path),
1710 					 errdetail("%m")));
1711 			close(fd);
1712 			return;
1713 		}
1714 
1715 		/*
1716 		 * Write cache_id or cache key at the end of file
1717 		 */
1718 		sts = write(fd, (char *) cachekey, len);
1719 		if (sts == -1 || sts != len)
1720 		{
1721 			ereport(WARNING,
1722 					(errmsg("memcache: adding table oid maps, failed to write file:\"%s\"", path),
1723 					 errdetail("%m")));
1724 			close(fd);
1725 			return;
1726 		}
1727 		close(fd);
1728 	}
1729 }
1730 
1731 /*
1732  * Discard all oid maps at pgpool-II startup.
1733  * This is necessary for shmem case.
1734  */
1735 void
pool_discard_oid_maps(void)1736 pool_discard_oid_maps(void)
1737 {
1738 	char		command[1024];
1739 
1740 	snprintf(command, sizeof(command), "/bin/rm -fr %s/[0-9]*",
1741 			 pool_config->memqcache_oiddir);
1742 	if (system(command) == -1)
1743 		ereport(WARNING,
1744 				(errmsg("unable to execute command \"%s\"", command),
1745 				 errdetail("system() command failed with error \"%m\"")));
1746 
1747 
1748 }
1749 
1750 void
pool_discard_oid_maps_by_db(int dboid)1751 pool_discard_oid_maps_by_db(int dboid)
1752 {
1753 	char		command[1024];
1754 
1755 	if (pool_is_shmem_cache())
1756 	{
1757 		snprintf(command, sizeof(command), "/bin/rm -fr %s/%d/",
1758 				 pool_config->memqcache_oiddir, dboid);
1759 
1760 		ereport(DEBUG1,
1761 				(errmsg("memcache: discarding oid maps by db"),
1762 				 errdetail("command: '%s\'", command)));
1763 
1764 		if (system(command) == -1)
1765 			ereport(WARNING,
1766 					(errmsg("unable to execute command \"%s\"", command),
1767 					 errdetail("system() command failed with error \"%m\"")));
1768 	}
1769 }
1770 
1771 /*
1772  * Read cache id (shmem case) or hash key (memcached case) from table
1773  * oid map file according to table_oids and discard cache entries.  If
1774  * unlink is true, the file will be unlinked after successful cache
1775  * removal.
1776  */
1777 static void
pool_invalidate_query_cache(int num_table_oids,int * table_oid,bool unlinkp,int dboid)1778 pool_invalidate_query_cache(int num_table_oids, int *table_oid, bool unlinkp, int dboid)
1779 {
1780 	char	   *dir;
1781 	char		path[1024];
1782 	int			i;
1783 	int			len;
1784 	POOL_CACHEKEY buf;
1785 
1786 	/*
1787 	 * Create memqcache_oiddir
1788 	 */
1789 	dir = pool_config->memqcache_oiddir;
1790 	if (mkdir(dir, S_IREAD | S_IWRITE | S_IEXEC) == -1)
1791 	{
1792 		if (errno != EEXIST)
1793 		{
1794 			ereport(WARNING,
1795 					(errmsg("memcache: invalidating query cache, failed to create directory:\"%s\"", dir),
1796 					 errdetail("%m")));
1797 			return;
1798 		}
1799 	}
1800 
1801 	/*
1802 	 * Create memqcache_oiddir/database_oid
1803 	 */
1804 	if (dboid == 0)
1805 	{
1806 		dboid = pool_get_database_oid();
1807 		ereport(DEBUG1,
1808 				(errmsg("memcache invalidating query cache"),
1809 				 errdetail("dboid %d", dboid)));
1810 
1811 		if (dboid <= 0)
1812 		{
1813 			ereport(WARNING,
1814 					(errmsg("memcache: invalidating query cache, could not get database OID")));
1815 			return;
1816 		}
1817 	}
1818 
1819 	snprintf(path, sizeof(path), "%s/%d", dir, dboid);
1820 	if (mkdir(path, S_IREAD | S_IWRITE | S_IEXEC) == -1)
1821 	{
1822 		if (errno != EEXIST)
1823 		{
1824 			ereport(WARNING,
1825 					(errmsg("memcache: invalidating query cache, failed to create directory:\"%s\"", path),
1826 					 errdetail("%m")));
1827 			return;
1828 		}
1829 	}
1830 
1831 	if (pool_is_shmem_cache())
1832 	{
1833 		len = sizeof(buf.cacheid);
1834 	}
1835 	else
1836 	{
1837 		len = sizeof(buf.hashkey);
1838 	}
1839 
1840 	for (i = 0; i < num_table_oids; i++)
1841 	{
1842 		int			fd;
1843 		int			oid = table_oid[i];
1844 		int			sts;
1845 		struct flock fl;
1846 
1847 		/*
1848 		 * Open each memqcache_oiddir/database_oid/table_oid
1849 		 */
1850 		snprintf(path, sizeof(path), "%s/%d/%d", dir, dboid, oid);
1851 		if ((fd = open(path, O_RDONLY)) == -1)
1852 		{
1853 			/*
1854 			 * This may be normal. It is possible that no SELECT has been
1855 			 * issued since the table has been created or since pgpool-II
1856 			 * started up.
1857 			 */
1858 			ereport(DEBUG1,
1859 					(errmsg("memcache invalidating query cache"),
1860 					 errdetail("failed to open \"%s\". reason:\"%m\"", path)));
1861 			continue;
1862 		}
1863 
1864 		fl.l_type = F_RDLCK;
1865 		fl.l_whence = SEEK_SET;
1866 		fl.l_start = 0;			/* Offset from l_whence         */
1867 		fl.l_len = 0;			/* length, 0 = to EOF           */
1868 
1869 		sts = fcntl(fd, F_SETLKW, &fl);
1870 		if (sts == -1)
1871 		{
1872 			ereport(WARNING,
1873 					(errmsg("memcache: invalidating query cache, failed to lock file:\"%s\"", path),
1874 					 errdetail("%m")));
1875 			close(fd);
1876 			return;
1877 		}
1878 		for (;;)
1879 		{
1880 			sts = read(fd, (char *) &buf, len);
1881 			if (sts == -1)
1882 			{
1883 				ereport(WARNING,
1884 						(errmsg("memcache: invalidating query cache, failed to read file:\"%s\"", path),
1885 						 errdetail("%m")));
1886 
1887 				close(fd);
1888 				return;
1889 			}
1890 			else if (sts == len)
1891 			{
1892 				if (pool_is_shmem_cache())
1893 				{
1894 					ereport(DEBUG1,
1895 							(errmsg("memcache invalidating query cache"),
1896 							 errdetail("deleting cacheid:%d itemid:%d",
1897 									   buf.cacheid.blockid, buf.cacheid.itemid)));
1898 					pool_delete_item_shmem_cache(&buf.cacheid);
1899 				}
1900 #ifdef USE_MEMCACHED
1901 				else
1902 				{
1903 					char		delbuf[33];
1904 
1905 					memcpy(delbuf, buf.hashkey, 32);
1906 					delbuf[32] = 0;
1907 					ereport(DEBUG1,
1908 							(errmsg("memcache invalidating query cache"),
1909 							 errdetail("deleting %s", delbuf)));
1910 
1911 					delete_cache_on_memcached(delbuf);
1912 				}
1913 #endif
1914 				continue;
1915 			}
1916 
1917 			/*
1918 			 * Must be EOF
1919 			 */
1920 			if (sts != 0)
1921 			{
1922 				ereport(WARNING,
1923 						(errmsg("memcache: invalidating query cache, invalid data length:%d in file:\"%s\"", sts, path)));
1924 				close(fd);
1925 				return;
1926 			}
1927 			break;
1928 		}
1929 
1930 		if (unlinkp)
1931 		{
1932 			unlink(path);
1933 		}
1934 		close(fd);
1935 	}
1936 #ifdef SHMEMCACHE_DEBUG
1937 	dump_shmem_cache(0);
1938 #endif
1939 }
1940 
1941 /*
1942  * Reset SELECT data buffers.  If reset_dml_oids is true, call
1943  * pool_discard_dml_table_oid() to reset table oids used in DML statements.
1944  */
1945 static void
pool_reset_memqcache_buffer(bool reset_dml_oids)1946 pool_reset_memqcache_buffer(bool reset_dml_oids)
1947 {
1948 	POOL_SESSION_CONTEXT *session_context;
1949 
1950 	session_context = pool_get_session_context(true);
1951 
1952 	if (session_context && session_context->query_cache_array)
1953 	{
1954 		POOL_TEMP_QUERY_CACHE *cache;
1955 
1956 		ereport(DEBUG1,
1957 				(errmsg("memcache reset buffer"),
1958 				 errdetail("discard: %p", session_context->query_cache_array)));
1959 
1960 		pool_discard_query_cache_array(session_context->query_cache_array);
1961 		session_context->query_cache_array = pool_create_query_cache_array();
1962 
1963 		ereport(DEBUG1,
1964 				(errmsg("memcache reset buffer"),
1965 				 errdetail("create: %p", session_context->query_cache_array)));
1966 
1967 		/*
1968 		 * if the query context is still under use, we cannot discard
1969 		 * temporary cache.
1970 		 */
1971 		if ((SL_MODE && pool_is_doing_extended_query_message()) ||
1972 			can_query_context_destroy(session_context->query_context))
1973 		{
1974 			ereport(DEBUG1,
1975 					(errmsg("memcache reset buffer"),
1976 					 errdetail("discard temp buffer of %p (%s)",
1977 							   session_context->query_context, session_context->query_context->original_query)));
1978 
1979 			cache = pool_get_current_cache();
1980 			pool_discard_temp_query_cache(cache);
1981 
1982 			/*
1983 			 * Reset temp_cache pointer in the current query context so that
1984 			 * we don't double free memory.
1985 			 */
1986 			session_context->query_context->temp_cache = NULL;
1987 		}
1988 	}
1989 
1990 	if (reset_dml_oids)
1991 		pool_discard_dml_table_oid();
1992 
1993 	pool_tmp_stats_reset_num_selects();
1994 }
1995 
1996 /*
1997  * Return true if memory cache method is "shmem".  The purpose of this
1998  * function is to cache the result of stcmp and to save a few cycle.
1999  */
2000 bool
pool_is_shmem_cache(void)2001 pool_is_shmem_cache(void)
2002 {
2003 	return pool_config->memqcache_method == SHMEM_CACHE;
2004 }
2005 
2006 /*
2007  * Remember memory cache number of blocks.
2008  */
2009 static int	memqcache_num_blocks;
2010 static void
pool_set_memqcache_blocks(int num_blocks)2011 pool_set_memqcache_blocks(int num_blocks)
2012 {
2013 	memqcache_num_blocks = num_blocks;
2014 }
2015 
2016 /*
2017  * Return memory cache number of blocks.
2018  */
2019 static int
pool_get_memqcache_blocks(void)2020 pool_get_memqcache_blocks(void)
2021 {
2022 	return memqcache_num_blocks;
2023 }
2024 
2025 /*
2026  * Query cache on shared memory management modules.
2027  */
2028 
2029 /*
2030  * Calculate necessary shared memory size.
2031  */
2032 size_t
pool_shared_memory_cache_size(void)2033 pool_shared_memory_cache_size(void)
2034 {
2035 	int64		num_blocks;
2036 	size_t		size;
2037 
2038 	if (pool_config->memqcache_maxcache > pool_config->memqcache_cache_block_size)
2039 		ereport(FATAL,
2040 				(errmsg("invalid memory cache configuration"),
2041 				 errdetail("memqcache_cache_block_size %d should be greater or equal to memqcache_maxcache %d",
2042 						   pool_config->memqcache_cache_block_size,
2043 						   pool_config->memqcache_maxcache)));
2044 
2045 
2046 	num_blocks = pool_config->memqcache_total_size /
2047 		pool_config->memqcache_cache_block_size;
2048 	if (num_blocks == 0)
2049 		ereport(FATAL,
2050 				(errmsg("invalid memory cache configuration"),
2051 				 errdetail("memqcache_total_size %ld should be greater or equal to memqcache_cache_block_size %d",
2052 						   pool_config->memqcache_total_size,
2053 						   pool_config->memqcache_cache_block_size)));
2054 
2055 	ereport(LOG,
2056 			(errmsg("memory cache initialized"),
2057 			 errdetail("memcache blocks :%ld", num_blocks)));
2058 	/* Remember # of blocks */
2059 	pool_set_memqcache_blocks(num_blocks);
2060 	size = pool_config->memqcache_cache_block_size * num_blocks;
2061 	return size;
2062 }
2063 
2064 /*
2065  * Acquire and initialize shared memory cache. This should be called
2066  * only once from pgpool main process at the process staring up time.
2067  */
2068 static void *shmem;
2069 int
pool_init_memory_cache(size_t size)2070 pool_init_memory_cache(size_t size)
2071 {
2072 	ereport(DEBUG1,
2073 			(errmsg("memory cache request size : %zd", size)));
2074 
2075 	shmem = pool_shared_memory_create(size);
2076 	return 0;
2077 }
2078 
2079 /*
2080  * Clear all the shared memory cache and reset FSMM and hash table.
2081  */
2082 void
pool_clear_memory_cache(void)2083 pool_clear_memory_cache(void)
2084 {
2085 	size_t		size;
2086 	pool_sigset_t oldmask;
2087 
2088 	POOL_SETMASK2(&BlockSig, &oldmask);
2089 	pool_shmem_lock();
2090 
2091 	PG_TRY();
2092 	{
2093 		size = pool_shared_memory_cache_size();
2094 		memset(shmem, 0, size);
2095 
2096 		size = pool_shared_memory_fsmm_size();
2097 		pool_reset_fsmm(size);
2098 
2099 		pool_discard_oid_maps();
2100 
2101 		pool_hash_reset(pool_config->memqcache_max_num_cache);
2102 	}
2103 	PG_CATCH();
2104 	{
2105 		pool_shmem_unlock();
2106 		POOL_SETMASK(&oldmask);
2107 		PG_RE_THROW();
2108 	}
2109 	PG_END_TRY();
2110 
2111 	pool_shmem_unlock();
2112 	POOL_SETMASK(&oldmask);
2113 }
2114 
2115 /*
2116  * Return shared memory cache address
2117  */
2118 static void *
pool_memory_cache_address(void)2119 pool_memory_cache_address(void)
2120 {
2121 	return shmem;
2122 }
2123 
2124 /*
2125  * Initialize new block
2126  */
2127 
2128 /*
2129  * Free space management map
2130  *
2131  * Free space management map (FSMM) consists of bytes. Each byte
2132  * corresponds to block id. For example, if you have 1GB cache and
2133  * block size is 8Kb, number of blocks = 131,072, thus total size of
2134  * FSMM is 128Kb.  Each FSMM entry has value from 0 to 255. Those
2135  * values describes total free space in each block.
2136  * For example, if the value is 2, the free space can be between 64
2137  * bytes and 95 bytes.
2138  *
2139  * value free space(in bytes)
2140  * 0     0-31
2141  * 1     32-63
2142  * 2     64-95
2143  * 3     96-127
2144  * :
2145  * 255   8160-8192
2146  */
2147 
2148 /*
2149  * Calculate necessary shared memory size for FSMM. Should be called after
2150  * pool_shared_memory_cache_size.
2151  */
2152 size_t
pool_shared_memory_fsmm_size(void)2153 pool_shared_memory_fsmm_size(void)
2154 {
2155 	size_t		size;
2156 
2157 	size = pool_get_memqcache_blocks() * sizeof(char);
2158 	return size;
2159 }
2160 
2161 /*
2162  * Acquire and initialize shared memory cache for FSMM. This should be
2163  * called after pool_shared_memory_cache_size only once from pgpool
2164  * main process at the process staring up time.
2165  */
2166 static void *fsmm;
2167 int
pool_init_fsmm(size_t size)2168 pool_init_fsmm(size_t size)
2169 {
2170 	int			maxblock = pool_get_memqcache_blocks();
2171 	int			encode_value;
2172 
2173 	fsmm = pool_shared_memory_create(size);
2174 	encode_value = POOL_MAX_FREE_SPACE / POOL_FSMM_RATIO;
2175 	memset(fsmm, encode_value, maxblock);
2176 	return 0;
2177 }
2178 
2179 /*
2180  * Return shared memory fsmm address
2181  */
2182 static void *
pool_fsmm_address(void)2183 pool_fsmm_address(void)
2184 {
2185 	return fsmm;
2186 }
2187 
2188 /*
2189  * Clock algorithm shared query cache management modules.
2190  */
2191 
2192 /*
2193  * Clock hand pointing to next victim block
2194  */
2195 static int *pool_fsmm_clock_hand;
2196 
2197 /*
2198  * Allocate and initialize clock hand on shmem
2199  */
2200 void
pool_allocate_fsmm_clock_hand(void)2201 pool_allocate_fsmm_clock_hand(void)
2202 {
2203 	pool_fsmm_clock_hand = pool_shared_memory_create(sizeof(*pool_fsmm_clock_hand));
2204 	*pool_fsmm_clock_hand = 0;
2205 }
2206 
2207 /*
2208  * Reset FSMM.
2209  */
2210 static void
pool_reset_fsmm(size_t size)2211 pool_reset_fsmm(size_t size)
2212 {
2213 	int			encode_value;
2214 
2215 	encode_value = POOL_MAX_FREE_SPACE / POOL_FSMM_RATIO;
2216 	memset(fsmm, encode_value, size);
2217 
2218 	*pool_fsmm_clock_hand = 0;
2219 }
2220 
2221 /*
2222  * Find victim block using clock algorithm and make it free.
2223  * Returns new free block id.
2224  */
pool_reuse_block(void)2225 static POOL_CACHE_BLOCKID pool_reuse_block(void)
2226 {
2227 	int			maxblock = pool_get_memqcache_blocks();
2228 	char	   *block = block_address(*pool_fsmm_clock_hand);
2229 	POOL_CACHE_BLOCK_HEADER *bh = (POOL_CACHE_BLOCK_HEADER *) block;
2230 	POOL_CACHE_BLOCKID reused_block;
2231 	POOL_CACHE_ITEM_POINTER *cip;
2232 	char	   *p;
2233 	int			i;
2234 
2235 	bh->flags = 0;
2236 	reused_block = *pool_fsmm_clock_hand;
2237 	p = block_address(reused_block);
2238 
2239 	for (i = 0; i < bh->num_items; i++)
2240 	{
2241 		cip = item_pointer(p, i);
2242 
2243 		if (!(POOL_ITEM_DELETED & cip->flags))
2244 		{
2245 			pool_hash_delete(&cip->query_hash);
2246 			ereport(DEBUG1,
2247 					(errmsg("pool_reuse_block: blockid: %d item: %d", reused_block, i)));
2248 		}
2249 	}
2250 
2251 	pool_init_cache_block(reused_block);
2252 	pool_update_fsmm(reused_block, POOL_MAX_FREE_SPACE);
2253 
2254 	(*pool_fsmm_clock_hand)++;
2255 	if (*pool_fsmm_clock_hand >= maxblock)
2256 		*pool_fsmm_clock_hand = 0;
2257 
2258 	ereport(LOG,
2259 			(errmsg("pool_reuse_block: blockid: %d", reused_block)));
2260 
2261 	return reused_block;
2262 }
2263 
2264 /*
2265  * Get block id which has enough space
2266  */
pool_get_block(size_t free_space)2267 static POOL_CACHE_BLOCKID pool_get_block(size_t free_space)
2268 {
2269 	int			encode_value;
2270 	unsigned char *p = pool_fsmm_address();
2271 	int			i;
2272 	int			maxblock = pool_get_memqcache_blocks();
2273 	POOL_CACHE_BLOCK_HEADER *bh;
2274 
2275 	if (p == NULL)
2276 	{
2277 		ereport(WARNING,
2278 				(errmsg("memcache: getting block: FSMM is not initialized")));
2279 		return -1;
2280 	}
2281 
2282 	if (free_space > POOL_MAX_FREE_SPACE)
2283 	{
2284 		ereport(WARNING,
2285 				(errmsg("memcache: getting block: invalid free space:%zd", free_space),
2286 				 errdetail("requested free space: %zd is more than maximum allowed space:%lu", free_space, POOL_MAX_FREE_SPACE)));
2287 		return -1;
2288 	}
2289 
2290 	encode_value = free_space / POOL_FSMM_RATIO;
2291 
2292 	for (i = 0; i < maxblock; i++)
2293 	{
2294 		if (p[i] >= encode_value)
2295 		{
2296 			/*
2297 			 * This block *may" have enough space. We need to make sure it
2298 			 * actually has enough space.
2299 			 */
2300 			bh = (POOL_CACHE_BLOCK_HEADER *) block_address(i);
2301 			if (bh->free_bytes >= free_space)
2302 			{
2303 				return (POOL_CACHE_BLOCKID) i;
2304 			}
2305 		}
2306 	}
2307 
2308 	/*
2309 	 * No enough space found. Reuse victim block
2310 	 */
2311 	return pool_reuse_block();
2312 }
2313 
2314 /*
2315  * Update free space info for specified block
2316  */
2317 static void
pool_update_fsmm(POOL_CACHE_BLOCKID blockid,size_t free_space)2318 pool_update_fsmm(POOL_CACHE_BLOCKID blockid, size_t free_space)
2319 {
2320 	int			encode_value;
2321 	char	   *p = pool_fsmm_address();
2322 
2323 	if (p == NULL)
2324 	{
2325 		ereport(WARNING,
2326 				(errmsg("memcache: updating free space in block: FSMM is not initialized")));
2327 		return;
2328 	}
2329 
2330 	if (blockid >= pool_get_memqcache_blocks())
2331 	{
2332 		ereport(WARNING,
2333 				(errmsg("memcache: updating free space in block: block id:%d in invalid", blockid)));
2334 		return;
2335 	}
2336 
2337 	if (free_space > POOL_MAX_FREE_SPACE)
2338 	{
2339 		ereport(WARNING,
2340 				(errmsg("memcache: updating free space in block: invalid free space:%zd", free_space),
2341 				 errdetail("requested free space: %zd is more than maximum allowed space:%lu", free_space, POOL_MAX_FREE_SPACE)));
2342 
2343 		return;
2344 	}
2345 
2346 	encode_value = free_space / POOL_FSMM_RATIO;
2347 
2348 	p[blockid] = encode_value;
2349 
2350 	return;
2351 }
2352 
2353 /*
2354  * Add item data to shared memory cache.
2355  * On successful registration, returns cache id.
2356  * The cache id is overwritten by the subsequent call to this function.
2357  * On error returns NULL.
2358  */
pool_add_item_shmem_cache(POOL_QUERY_HASH * query_hash,char * data,int size,time_t expire)2359 static POOL_CACHEID * pool_add_item_shmem_cache(POOL_QUERY_HASH * query_hash, char *data, int size, time_t expire)
2360 {
2361 	static POOL_CACHEID cacheid;
2362 	POOL_CACHE_BLOCKID blockid;
2363 	POOL_CACHE_BLOCK_HEADER *bh;
2364 	POOL_CACHE_ITEM_POINTER *cip;
2365 
2366 	POOL_CACHE_ITEM ci;
2367 	POOL_CACHE_ITEM_POINTER cip_body;
2368 	char	   *item;
2369 
2370 	int			request_size;
2371 	char	   *p;
2372 	int			i;
2373 	char	   *src;
2374 	char	   *dst;
2375 	int			num_deleted;
2376 	char	   *dcip;
2377 	char	   *dci;
2378 	bool		need_pack;
2379 	char	   *work_buffer;
2380 	int			index;
2381 
2382 	if (query_hash == NULL)
2383 	{
2384 		ereport(LOG,
2385 				(errmsg("error while adding item to shmem cache, query hash is NULL")));
2386 		return NULL;
2387 	}
2388 
2389 	if (data == NULL)
2390 	{
2391 		ereport(LOG,
2392 				(errmsg("error while adding item to shmem cache, data is NULL")));
2393 		return NULL;
2394 	}
2395 
2396 	if (size <= 0)
2397 	{
2398 		ereport(LOG,
2399 				(errmsg("error while adding item to shmem cache, invalid request size: %d", size)));
2400 		return NULL;
2401 	}
2402 
2403 	/* Add overhead */
2404 	request_size = size + sizeof(POOL_CACHE_ITEM_POINTER) + sizeof(POOL_CACHE_ITEM_HEADER);
2405 
2406 	/* Get cache block which has enough space */
2407 	blockid = pool_get_block(request_size);
2408 
2409 	if (blockid == -1)
2410 	{
2411 		return NULL;
2412 	}
2413 
2414 	/*
2415 	 * Initialize the block if necessary. If no live items are remained, we
2416 	 * also initialize the block. If there's contiguous deleted items, we turn
2417 	 * them into free space as well.
2418 	 */
2419 	pool_init_cache_block(blockid);
2420 
2421 	/*
2422 	 * Make sure that we have at least one free hash element.
2423 	 */
2424 	while (!is_free_hash_element())
2425 	{
2426 		/* If not, reuse next victim block */
2427 		blockid = pool_reuse_block();
2428 		pool_init_cache_block(blockid);
2429 	}
2430 
2431 	/* Get block address on shmem */
2432 	p = block_address(blockid);
2433 	bh = (POOL_CACHE_BLOCK_HEADER *) p;
2434 
2435 	/*
2436 	 * Create contiguous free space. We assume that item bodies are ordered
2437 	 * from bottom to top of the block, and corresponding item pointers are
2438 	 * ordered from the youngest to the oldest in the beginning of the block.
2439 	 */
2440 
2441 	/*
2442 	 * Optimization. If there's no deleted items, we don't need to pack it to
2443 	 * create contiguous free space.
2444 	 */
2445 	need_pack = false;
2446 	for (i = 0; i < bh->num_items; i++)
2447 	{
2448 		cip = item_pointer(p, i);
2449 
2450 		if (POOL_ITEM_DELETED & cip->flags) /* Deleted item? */
2451 		{
2452 			need_pack = true;
2453 			ereport(DEBUG1,
2454 					(errmsg("memcache adding item"),
2455 					 errdetail("start creating contiguous space")));
2456 			break;
2457 		}
2458 	}
2459 
2460 	/*
2461 	 * We disable packing for now. Revisit and remove following code fragment
2462 	 * later.
2463 	 */
2464 	need_pack = false;
2465 
2466 	if (need_pack)
2467 	{
2468 		/*
2469 		 * Pack and create contiguous free space.
2470 		 */
2471 		dcip = calloc(1, pool_config->memqcache_cache_block_size);
2472 		if (!dcip)
2473 		{
2474 			ereport(WARNING,
2475 					(errmsg("memcache: adding item to cache: calloc failed")));
2476 			return NULL;
2477 		}
2478 
2479 		work_buffer = dcip;
2480 		dci = dcip + pool_config->memqcache_cache_block_size;
2481 		num_deleted = 0;
2482 		index = 0;
2483 
2484 		for (i = 0; i < bh->num_items; i++)
2485 		{
2486 			int			total_length;
2487 			POOL_CACHEID cid;
2488 
2489 			cip = item_pointer(p, i);
2490 
2491 			if (POOL_ITEM_DELETED & cip->flags) /* Deleted item? */
2492 			{
2493 				num_deleted++;
2494 				continue;
2495 			}
2496 
2497 			/* Copy item body */
2498 			src = p + cip->offset;
2499 			total_length = item_header(p, i)->total_length;
2500 			dst = dci - total_length;
2501 			cip->offset = dst - work_buffer;
2502 			memcpy(dst, src, total_length);
2503 
2504 			dci -= total_length;
2505 
2506 			/* Copy item pointer */
2507 			src = (char *) cip;
2508 			dst = (char *) item_pointer(dcip, index);
2509 			memcpy(dst, src, sizeof(POOL_CACHE_ITEM_POINTER));
2510 
2511 			/* Update hash index */
2512 			cid.blockid = blockid;
2513 			cid.itemid = index;
2514 			pool_hash_insert(&cip->query_hash, &cid, true);
2515 			ereport(DEBUG1,
2516 					(errmsg("memcache adding item"),
2517 					 errdetail("item cid updated. old:%d %d new:%d %d",
2518 							   blockid, i, blockid, index)));
2519 			index++;
2520 		}
2521 
2522 		/* All items deleted? */
2523 		if (num_deleted > 0 && num_deleted == bh->num_items)
2524 		{
2525 			ereport(DEBUG1,
2526 					(errmsg("memcache adding item"),
2527 					 errdetail("all items deleted, total deleted:%d", num_deleted)));
2528 			bh->flags = 0;
2529 			pool_init_cache_block(blockid);
2530 			pool_update_fsmm(blockid, POOL_MAX_FREE_SPACE);
2531 		}
2532 		else
2533 		{
2534 			/* Update number of items */
2535 			bh->num_items -= num_deleted;
2536 
2537 			/* Copy back the packed block except block header */
2538 			memcpy(p + sizeof(POOL_CACHE_BLOCK_HEADER),
2539 				   work_buffer + sizeof(POOL_CACHE_BLOCK_HEADER),
2540 				   pool_config->memqcache_cache_block_size - sizeof(POOL_CACHE_BLOCK_HEADER));
2541 		}
2542 		free(work_buffer);
2543 	}
2544 
2545 	/*
2546 	 * Make sure that we have enough free space
2547 	 */
2548 	if (bh->free_bytes < request_size)
2549 	{
2550 		/* This should not happen */
2551 		ereport(WARNING,
2552 				(errmsg("memcache: adding item to cache: not enough space"),
2553 				 errdetail("free space: %d required: %d block id:%d",
2554 						   bh->free_bytes, request_size, blockid)));
2555 		return NULL;
2556 	}
2557 
2558 	/*
2559 	 * At this point, new item can be located at block_header->num_items
2560 	 */
2561 
2562 	/* Fill in cache item header */
2563 	ci.header.timestamp = time(NULL);
2564 	ci.header.expire = expire;
2565 	ci.header.total_length = sizeof(POOL_CACHE_ITEM_HEADER) + size;
2566 
2567 	/* Calculate item body address */
2568 	if (bh->num_items == 0)
2569 	{
2570 		/*
2571 		 * This is the #0 item. So address is block_bottom - data_length
2572 		 */
2573 		item = p + pool_config->memqcache_cache_block_size - ci.header.total_length;
2574 
2575 		/* Mark this block used */
2576 		bh->flags = POOL_BLOCK_USED;
2577 	}
2578 	else
2579 	{
2580 		cip = item_pointer(p, bh->num_items - 1);
2581 		item = p + cip->offset - ci.header.total_length;
2582 	}
2583 
2584 	/* Copy item header */
2585 	memcpy(item, &ci, sizeof(POOL_CACHE_ITEM_HEADER));
2586 	bh->free_bytes -= sizeof(POOL_CACHE_ITEM_HEADER);
2587 
2588 	/* Copy item body */
2589 	memcpy(item + sizeof(POOL_CACHE_ITEM_HEADER), data, size);
2590 	bh->free_bytes -= size;
2591 
2592 	/* Copy cache item pointer */
2593 	memcpy(&cip_body.query_hash, query_hash, sizeof(POOL_QUERY_HASH));
2594 	memset(&cip_body.next, 0, sizeof(POOL_CACHEID));
2595 	cip_body.offset = item - p;
2596 	cip_body.flags = POOL_ITEM_USED;
2597 	memcpy(item_pointer(p, bh->num_items), &cip_body, sizeof(POOL_CACHE_ITEM_POINTER));
2598 	bh->free_bytes -= sizeof(POOL_CACHE_ITEM_POINTER);
2599 
2600 	/* Update FSMM */
2601 	pool_update_fsmm(blockid, bh->free_bytes);
2602 
2603 	cacheid.blockid = blockid;
2604 	cacheid.itemid = bh->num_items;
2605 	ereport(DEBUG1,
2606 			(errmsg("memcache adding item"),
2607 			 errdetail("new item inserted. blockid: %d itemid:%d",
2608 					   cacheid.blockid, cacheid.itemid)));
2609 
2610 	/* Add up number of items */
2611 	bh->num_items++;
2612 
2613 	/* Update hash table */
2614 	if (pool_hash_insert(query_hash, &cacheid, false) < 0)
2615 	{
2616 		ereport(LOG,
2617 				(errmsg("error while adding item to shmem cache, hash insert failed")));
2618 
2619 		/*
2620 		 * Since we have failed to insert hash index entry, we need to undo
2621 		 * the addition of cache entry.
2622 		 */
2623 		pool_delete_item_shmem_cache(&cacheid);
2624 		return NULL;
2625 	}
2626 	ereport(DEBUG1,
2627 			(errmsg("memcache adding item"),
2628 			 errdetail("block: %d item: %d", cacheid.blockid, cacheid.itemid)));
2629 
2630 #ifdef SHMEMCACHE_DEBUG
2631 	dump_shmem_cache(blockid);
2632 #endif
2633 	return &cacheid;
2634 }
2635 
2636 /*
2637  * Returns item data address on shared memory cache specified by query hash.
2638  * Also data length is set to *size.
2639  * On error or data not found case returns NULL.
2640  * Detail is set to *sts. (0: success, 1: not found, -1: error)
2641  */
2642 static char *
pool_get_item_shmem_cache(POOL_QUERY_HASH * query_hash,int * size,int * sts)2643 pool_get_item_shmem_cache(POOL_QUERY_HASH * query_hash, int *size, int *sts)
2644 {
2645 	POOL_CACHEID *cacheid;
2646 	POOL_CACHE_ITEM_HEADER *cih;
2647 
2648 	if (sts == NULL)
2649 	{
2650 		ereport(LOG,
2651 				(errmsg("error while getting item from shmem cache, sts is NULL")));
2652 		return NULL;
2653 	}
2654 
2655 	*sts = -1;
2656 
2657 	if (query_hash == NULL)
2658 	{
2659 		ereport(LOG,
2660 				(errmsg("error while getting item from shmem cache, query hash is NULL")));
2661 		return NULL;
2662 	}
2663 
2664 	if (size == NULL)
2665 	{
2666 		ereport(LOG,
2667 				(errmsg("error while getting item from shmem cache, size is NULL")));
2668 		return NULL;
2669 	}
2670 
2671 	/*
2672 	 * Find cache header by using hash table
2673 	 */
2674 	cacheid = pool_find_item_on_shmem_cache(query_hash);
2675 	if (cacheid == NULL)
2676 	{
2677 		/* Not found */
2678 		*sts = 1;
2679 		return NULL;
2680 	}
2681 
2682 	cih = pool_cache_item_header(cacheid);
2683 
2684 	*size = cih->total_length - sizeof(POOL_CACHE_ITEM_HEADER);
2685 	return (char *) cih + sizeof(POOL_CACHE_ITEM_HEADER);
2686 }
2687 
2688 /*
2689  * Find data on shared memory cache specified query hash.
2690  * On success returns cache id.
2691  * The cache id is overwritten by the subsequent call to this function.
2692  */
pool_find_item_on_shmem_cache(POOL_QUERY_HASH * query_hash)2693 static POOL_CACHEID * pool_find_item_on_shmem_cache(POOL_QUERY_HASH * query_hash)
2694 {
2695 	static POOL_CACHEID cacheid;
2696 	POOL_CACHEID *c;
2697 	POOL_CACHE_ITEM_HEADER *cih;
2698 	time_t		now;
2699 
2700 	c = pool_hash_search(query_hash);
2701 	if (!c)
2702 	{
2703 		return NULL;
2704 	}
2705 
2706 	cih = item_header(block_address(c->blockid), c->itemid);
2707 
2708 	/* Check cache expiration */
2709 	if (cih->expire > 0)
2710 	{
2711 		now = time(NULL);
2712 		if (now > (cih->timestamp + cih->expire))
2713 		{
2714 			ereport(DEBUG1,
2715 					(errmsg("memcache finding item"),
2716 					 errdetail("cache expired: now: %ld timestamp: %ld",
2717 							   now, cih->timestamp + cih->expire)));
2718 			pool_delete_item_shmem_cache(c);
2719 			return NULL;
2720 		}
2721 	}
2722 
2723 	cacheid.blockid = c->blockid;
2724 	cacheid.itemid = c->itemid;
2725 	return &cacheid;
2726 }
2727 
2728 /*
2729  * Delete item data specified cache id from shmem.
2730  * On successful deletion, returns 0.
2731  * Other wise return -1.
2732  * FSMM is also updated.
2733  */
2734 static int
pool_delete_item_shmem_cache(POOL_CACHEID * cacheid)2735 pool_delete_item_shmem_cache(POOL_CACHEID * cacheid)
2736 {
2737 	POOL_CACHE_BLOCK_HEADER *bh;
2738 	POOL_CACHE_ITEM_POINTER *cip;
2739 	POOL_CACHE_ITEM_HEADER *cih;
2740 	POOL_QUERY_HASH key;
2741 	int			size;
2742 
2743 	ereport(DEBUG1,
2744 			(errmsg("memcache deleting item data"),
2745 			 errdetail("cacheid:%d itemid:%d", cacheid->blockid, cacheid->itemid)));
2746 
2747 	if (cacheid->blockid >= pool_get_memqcache_blocks())
2748 	{
2749 		ereport(LOG,
2750 				(errmsg("error while deleting item from shmem cache, invalid block: %d",
2751 						cacheid->blockid)));
2752 		return -1;
2753 	}
2754 
2755 	bh = (POOL_CACHE_BLOCK_HEADER *) block_address(cacheid->blockid);
2756 	if (!(bh->flags & POOL_BLOCK_USED))
2757 	{
2758 		ereport(LOG,
2759 				(errmsg("error while deleting item from shmem cache, block: %d is not used",
2760 						cacheid->blockid)));
2761 		return -1;
2762 	}
2763 
2764 	if (cacheid->itemid >= bh->num_items)
2765 	{
2766 		/*
2767 		 * This could happen if the block is reused.  Since contents of oid
2768 		 * map file is not updated when the block is reused.
2769 		 */
2770 		ereport(DEBUG1,
2771 				(errmsg("memcache error deleting item data"),
2772 				 errdetail("invalid item id %d in block:%d",
2773 						   cacheid->itemid, cacheid->blockid)));
2774 
2775 		return -1;
2776 	}
2777 
2778 	cip = item_pointer(block_address(cacheid->blockid), cacheid->itemid);
2779 	if (!(cip->flags & POOL_ITEM_USED))
2780 	{
2781 		ereport(LOG,
2782 				(errmsg("error while deleting item from shmem cache, item: %d was not used",
2783 						cacheid->itemid)));
2784 		return -1;
2785 	}
2786 
2787 	if (cip->flags & POOL_ITEM_DELETED)
2788 	{
2789 		ereport(LOG,
2790 				(errmsg("error while deleting item from shmem cache, item: %d was already deleted",
2791 						cacheid->itemid)));
2792 		return -1;
2793 	}
2794 
2795 	/* Save cache key */
2796 	memcpy(&key, &cip->query_hash, sizeof(POOL_QUERY_HASH));
2797 
2798 	cih = pool_cache_item_header(cacheid);
2799 	size = cih->total_length + sizeof(POOL_CACHE_ITEM_POINTER);
2800 
2801 	/* Delete item pointer */
2802 	cip->flags |= POOL_ITEM_DELETED;
2803 
2804 	/*
2805 	 * We do NOT count down bh->num_items here. The deleted space will be
2806 	 * recycled by pool_add_item_shmem_cache(). However, if this is the last
2807 	 * item, we can recycle whole block.
2808 	 *
2809 	 * 2012/4/1: Now we do not pack data in pool_add_item_shmem_cache() for
2810 	 * performance reason. Also we count down num_items if it is the last one.
2811 	 */
2812 	if ((bh->num_items - 1) == 0)
2813 	{
2814 		ereport(DEBUG1,
2815 				(errmsg("memcache deleting item data"),
2816 				 errdetail("no item remains. initialize block")));
2817 		bh->flags = 0;
2818 		pool_init_cache_block(cacheid->blockid);
2819 	}
2820 
2821 	/* Remove hash index */
2822 	pool_hash_delete(&key);
2823 
2824 	/*
2825 	 * If the deleted item is last one in the block, we add it to the free
2826 	 * space.
2827 	 */
2828 	if (cacheid->itemid == (bh->num_items - 1))
2829 	{
2830 		bh->free_bytes += size;
2831 		ereport(DEBUG1,
2832 				(errmsg("memcache deleting item data"),
2833 				 errdetail("deleted %d bytes, freebytes is = %d",
2834 						   size, bh->free_bytes)));
2835 
2836 		bh->num_items--;
2837 	}
2838 
2839 	/* Update FSMM */
2840 	pool_update_fsmm(cacheid->blockid, bh->free_bytes);
2841 
2842 	return 0;
2843 }
2844 
2845 /*
2846  * Returns item header specified by cache id.
2847  */
pool_cache_item_header(POOL_CACHEID * cacheid)2848 static POOL_CACHE_ITEM_HEADER * pool_cache_item_header(POOL_CACHEID * cacheid)
2849 {
2850 	POOL_CACHE_BLOCK_HEADER *bh;
2851 
2852 	if (cacheid->blockid >= pool_get_memqcache_blocks())
2853 	{
2854 		ereport(WARNING,
2855 				(errmsg("error while getting cache item header, invalid block id: %d", cacheid->blockid)));
2856 		return NULL;
2857 	}
2858 
2859 	bh = (POOL_CACHE_BLOCK_HEADER *) block_address(cacheid->blockid);
2860 	if (cacheid->itemid >= bh->num_items)
2861 	{
2862 		ereport(WARNING,
2863 				(errmsg("error while getting cache item header, invalid item id: %d", cacheid->itemid)));
2864 		return NULL;
2865 	}
2866 
2867 	return item_header((char *) bh, cacheid->itemid);
2868 }
2869 
2870 /*
2871  * Initialize specified block.
2872  */
2873 static int
pool_init_cache_block(POOL_CACHE_BLOCKID blockid)2874 pool_init_cache_block(POOL_CACHE_BLOCKID blockid)
2875 {
2876 	char	   *p;
2877 	POOL_CACHE_BLOCK_HEADER *bh;
2878 
2879 	if (blockid >= pool_get_memqcache_blocks())
2880 	{
2881 		ereport(WARNING,
2882 				(errmsg("error while initializing cache block, invalid block id: %d", blockid)));
2883 		return -1;
2884 	}
2885 
2886 	p = block_address(blockid);
2887 	bh = (POOL_CACHE_BLOCK_HEADER *) p;
2888 
2889 	/* Is this block used? */
2890 	if (!(bh->flags & POOL_BLOCK_USED))
2891 	{
2892 		/* Initialize empty block */
2893 		memset(p, 0, pool_config->memqcache_cache_block_size);
2894 		bh->free_bytes = pool_config->memqcache_cache_block_size -
2895 			sizeof(POOL_CACHE_BLOCK_HEADER);
2896 	}
2897 	return 0;
2898 }
2899 
2900 #if NOT_USED
2901 /*
2902  * Delete all items in the block.
2903  */
2904 static void
pool_wipe_out_cache_block(POOL_CACHE_BLOCKID blockid)2905 pool_wipe_out_cache_block(POOL_CACHE_BLOCKID blockid)
2906 {
2907 	char	   *p;
2908 	POOL_CACHE_BLOCK_HEADER *bh;
2909 	POOL_CACHE_ITEM_POINTER *cip;
2910 	POOL_CACHEID cacheid;
2911 	int			i;
2912 
2913 	/* Get block address on shmem */
2914 	p = block_address(blockid);
2915 	bh = (POOL_CACHE_BLOCK_HEADER *) p;
2916 	cacheid.blockid = blockid;
2917 
2918 	for (i = 0; i < bh->num_items; i++)
2919 	{
2920 		cip = item_pointer(p, i);
2921 
2922 		if ((POOL_ITEM_DELETED & cip->flags) == 0)	/* Not deleted item? */
2923 		{
2924 			cacheid.itemid = i;
2925 			pool_delete_item_shmem_cache(&cacheid);
2926 		}
2927 	}
2928 
2929 	bh->flags = 0;
2930 	pool_init_cache_block(blockid);
2931 	pool_update_fsmm(blockid, POOL_MAX_FREE_SPACE);
2932 }
2933 #endif
2934 
2935 /*
2936  * Acquire lock: XXX giant lock
2937  */
2938 void
pool_shmem_lock(void)2939 pool_shmem_lock(void)
2940 {
2941 	if (pool_is_shmem_cache() && !is_shmem_locked)
2942 	{
2943 		pool_semaphore_lock(SHM_CACHE_SEM);
2944 		is_shmem_locked = true;
2945 	}
2946 }
2947 
2948 /*
2949  * Release lock
2950  */
2951 void
pool_shmem_unlock(void)2952 pool_shmem_unlock(void)
2953 {
2954 	if (pool_is_shmem_cache() && is_shmem_locked)
2955 	{
2956 		pool_semaphore_unlock(SHM_CACHE_SEM);
2957 		is_shmem_locked = false;
2958 	}
2959 }
2960 
2961 /*
2962  * check lock
2963  */
2964 bool
pool_is_shmem_lock(void)2965 pool_is_shmem_lock(void)
2966 {
2967 	return is_shmem_locked;
2968 }
2969 
2970 /*
2971  * Returns cache block address specified by block id
2972  */
2973 static char *
block_address(int blockid)2974 block_address(int blockid)
2975 {
2976 	char	   *p;
2977 
2978 	p = pool_memory_cache_address() +
2979 		blockid * pool_config->memqcache_cache_block_size;
2980 	return p;
2981 }
2982 
2983 /*
2984  * Returns i th item pointer in block address block
2985  */
item_pointer(char * block,int i)2986 static POOL_CACHE_ITEM_POINTER * item_pointer(char *block, int i)
2987 {
2988 	return (POOL_CACHE_ITEM_POINTER *) (block + sizeof(POOL_CACHE_BLOCK_HEADER) +
2989 										sizeof(POOL_CACHE_ITEM_POINTER) * i);
2990 }
2991 
2992 /*
2993  * Returns i th item header in block address block
2994  */
item_header(char * block,int i)2995 static POOL_CACHE_ITEM_HEADER * item_header(char *block, int i)
2996 {
2997 	POOL_CACHE_ITEM_POINTER *cip;
2998 
2999 	cip = item_pointer(block, i);
3000 	return (POOL_CACHE_ITEM_HEADER *) (block + cip->offset);
3001 }
3002 
3003 #ifdef SHMEMCACHE_DEBUG
3004 /*
3005  * Dump shmem cache block
3006  */
3007 static void
dump_shmem_cache(POOL_CACHE_BLOCKID blockid)3008 dump_shmem_cache(POOL_CACHE_BLOCKID blockid)
3009 {
3010 	POOL_CACHE_BLOCK_HEADER *bh;
3011 	POOL_CACHE_ITEM_POINTER *cip;
3012 	POOL_CACHE_ITEM_HEADER *cih;
3013 	int			i;
3014 
3015 	bh = (POOL_CACHE_BLOCK_HEADER *) block_address(blockid);
3016 	fprintf(stderr, "shmem: block header(%lu bytes): flags:%x num_items:%d free_bytes:%d\n",
3017 			sizeof(*bh), bh->flags, bh->num_items, bh->free_bytes);
3018 	for (i = 0; i < bh->num_items; i++)
3019 	{
3020 		cip = item_pointer((char *) bh, i);
3021 		fprintf(stderr, "shmem: block: %d %d th item pointer(%lu bytes): offset:%d flags:%x\n",
3022 				blockid, i, sizeof(*cip), cip->offset, cip->flags);
3023 		cih = item_header((char *) bh, i);
3024 		fprintf(stderr, "shmem: block: %d %d th item header(%lu bytes): timestamp:%ld length:%d\n",
3025 				blockid, i, sizeof(*cih), cih->timestamp, cih->total_length);
3026 	}
3027 }
3028 #endif
3029 
3030 /*
3031  * SELECT query result array modules
3032  */
3033 POOL_QUERY_CACHE_ARRAY *
pool_create_query_cache_array(void)3034 pool_create_query_cache_array(void)
3035 {
3036 #define POOL_QUERY_CACHE_ARRAY_ALLOCATE_NUM 128
3037 #define POOL_QUERY_CACHE_ARRAY_HEADER_SIZE (sizeof(int)+sizeof(int))
3038 
3039 	size_t		size;
3040 	POOL_QUERY_CACHE_ARRAY *p;
3041 	POOL_SESSION_CONTEXT *session_context = pool_get_session_context(false);
3042 	MemoryContext old_context = MemoryContextSwitchTo(session_context->memory_context);
3043 
3044 	size = POOL_QUERY_CACHE_ARRAY_HEADER_SIZE + POOL_QUERY_CACHE_ARRAY_ALLOCATE_NUM *
3045 		sizeof(POOL_TEMP_QUERY_CACHE *);
3046 	p = palloc(size);
3047 	p->num_caches = 0;
3048 	p->array_size = POOL_QUERY_CACHE_ARRAY_ALLOCATE_NUM;
3049 	MemoryContextSwitchTo(old_context);
3050 	return p;
3051 }
3052 
3053 /*
3054  * Discard query cache array
3055  */
3056 void
pool_discard_query_cache_array(POOL_QUERY_CACHE_ARRAY * cache_array)3057 pool_discard_query_cache_array(POOL_QUERY_CACHE_ARRAY * cache_array)
3058 {
3059 	int			i;
3060 
3061 	if (!cache_array)
3062 		return;
3063 
3064 	ereport(DEBUG1,
3065 			(errmsg("memcache discarding query cache array"),
3066 			 errdetail("num_caches: %d", cache_array->num_caches)));
3067 
3068 	for (i = 0; i < cache_array->num_caches; i++)
3069 	{
3070 		ereport(DEBUG2,
3071 				(errmsg("memcache discarding query cache array"),
3072 				 errdetail("cache no: %d cache: %p", i, cache_array->caches[i])));
3073 		pool_discard_temp_query_cache(cache_array->caches[i]);
3074 	}
3075 	pfree(cache_array);
3076 }
3077 
3078 /*
3079  * Add query cache array
3080  */
pool_add_query_cache_array(POOL_QUERY_CACHE_ARRAY * cache_array,POOL_TEMP_QUERY_CACHE * cache)3081 static POOL_QUERY_CACHE_ARRAY * pool_add_query_cache_array(POOL_QUERY_CACHE_ARRAY * cache_array, POOL_TEMP_QUERY_CACHE * cache)
3082 {
3083 	size_t		size;
3084 	POOL_QUERY_CACHE_ARRAY *cp = cache_array;
3085 
3086 	if (!cache_array)
3087 		return cp;
3088 
3089 	ereport(DEBUG2,
3090 			(errmsg("memcache adding query cache array"),
3091 			 errdetail("num_caches: %d cache: %p", cache_array->num_caches, cache)));
3092 	if (cache_array->num_caches >= cache_array->array_size)
3093 	{
3094 		cache_array->array_size += POOL_QUERY_CACHE_ARRAY_ALLOCATE_NUM;
3095 		size = POOL_QUERY_CACHE_ARRAY_HEADER_SIZE + cache_array->array_size *
3096 			sizeof(POOL_TEMP_QUERY_CACHE *);
3097 		cache_array = repalloc(cache_array, size);
3098 	}
3099 	cache_array->caches[cache_array->num_caches++] = cache;
3100 	return cache_array;
3101 }
3102 
3103 /*
3104  * SELECT query result temporary cache modules
3105  */
3106 
3107 /*
3108  * Create SELECT result temporary cache
3109  */
3110 POOL_TEMP_QUERY_CACHE *
pool_create_temp_query_cache(char * query)3111 pool_create_temp_query_cache(char *query)
3112 {
3113 	POOL_SESSION_CONTEXT *session_context = pool_get_session_context(false);
3114 	MemoryContext old_context = MemoryContextSwitchTo(session_context->memory_context);
3115 	POOL_TEMP_QUERY_CACHE *p;
3116 
3117 	p = palloc(sizeof(*p));
3118 	p->query = pstrdup(query);
3119 
3120 	p->buffer = pool_create_buffer();
3121 	p->oids = pool_create_buffer();
3122 	p->num_oids = 0;
3123 	p->is_exceeded = false;
3124 	p->is_discarded = false;
3125 
3126 	MemoryContextSwitchTo(old_context);
3127 
3128 	ereport(DEBUG1,
3129 			(errmsg("pool_create_temp_query_cache: cache created: %p", p)));
3130 
3131 	return p;
3132 }
3133 
3134 /*
3135  * Discard temp query cache
3136  */
3137 void
pool_discard_temp_query_cache(POOL_TEMP_QUERY_CACHE * temp_cache)3138 pool_discard_temp_query_cache(POOL_TEMP_QUERY_CACHE * temp_cache)
3139 {
3140 	if (!temp_cache)
3141 		return;
3142 
3143 	if (temp_cache->query)
3144 		pfree(temp_cache->query);
3145 	if (temp_cache->buffer)
3146 		pool_discard_buffer(temp_cache->buffer);
3147 	if (temp_cache->oids)
3148 		pool_discard_buffer(temp_cache->oids);
3149 
3150 	ereport(DEBUG1,
3151 			(errmsg("pool_discard_temp_query_cache: cache discarded: %p", temp_cache)));
3152 
3153 	pfree(temp_cache);
3154 }
3155 
3156 /*
3157  * Add data to temp query cache.
3158  * Data must be FE/BE protocol packet.
3159  */
3160 static void
pool_add_temp_query_cache(POOL_TEMP_QUERY_CACHE * temp_cache,char kind,char * data,int data_len)3161 pool_add_temp_query_cache(POOL_TEMP_QUERY_CACHE * temp_cache, char kind, char *data, int data_len)
3162 {
3163 	POOL_INTERNAL_BUFFER *buffer;
3164 	size_t		buflen;
3165 	int			send_len;
3166 
3167 	if (temp_cache == NULL)
3168 	{
3169 		/*
3170 		 * This could happen if cache exceeded in previous query execution in
3171 		 * the same unnamed portal.
3172 		 */
3173 		ereport(DEBUG1,
3174 				(errmsg("memcache adding temporary query cache"),
3175 				 errdetail("POOL_TEMP_QUERY_CACHE is NULL")));
3176 		return;
3177 	}
3178 
3179 	if (temp_cache->is_exceeded)
3180 	{
3181 		ereport(DEBUG1,
3182 				(errmsg("memcache adding temporary query cache"),
3183 				 errdetail("memqcache_maxcache exceeds")));
3184 		return;
3185 	}
3186 
3187 	/*
3188 	 * We only store T(Table Description), D(Data row), C(Command Complete),
3189 	 * 1(ParseComplete), 2(BindComplete)
3190 	 */
3191 	if (kind != 'T' && kind != 'D' && kind != 'C' && kind != '1' && kind != '2')
3192 	{
3193 		return;
3194 	}
3195 
3196 	/* Check data limit */
3197 	buffer = temp_cache->buffer;
3198 	buflen = pool_get_buffer_length(buffer);
3199 
3200 	if ((buflen + data_len + sizeof(int) + 1) > pool_config->memqcache_maxcache)
3201 	{
3202 		ereport(DEBUG1,
3203 				(errmsg("memcache adding temporary query cache"),
3204 				 errdetail("data size exceeds memqcache_maxcache. current:%zd requested:%zd memq_maxcache:%d",
3205 						   buflen, data_len + sizeof(int) + 1, pool_config->memqcache_maxcache)));
3206 		temp_cache->is_exceeded = true;
3207 		return;
3208 	}
3209 
3210 	pool_add_buffer(buffer, &kind, 1);
3211 	send_len = htonl(data_len + sizeof(int));
3212 	pool_add_buffer(buffer, (char *) &send_len, sizeof(int));
3213 	pool_add_buffer(buffer, data, data_len);
3214 
3215 	return;
3216 }
3217 
3218 /*
3219  * Add table oids used by SELECT to temp query cache.
3220  */
3221 static void
pool_add_oids_temp_query_cache(POOL_TEMP_QUERY_CACHE * temp_cache,int num_oids,int * oids)3222 pool_add_oids_temp_query_cache(POOL_TEMP_QUERY_CACHE * temp_cache, int num_oids, int *oids)
3223 {
3224 	POOL_INTERNAL_BUFFER *buffer;
3225 
3226 	if (!temp_cache || num_oids <= 0)
3227 		return;
3228 
3229 	buffer = temp_cache->oids;
3230 	pool_add_buffer(buffer, oids, num_oids * sizeof(int));
3231 	temp_cache->num_oids = num_oids;
3232 }
3233 
3234 /*
3235  * Internal buffer management modules.
3236  * Usage:
3237  * 1) Create buffer using pool_create_buffer().
3238  * 2) Add data to buffer using pool_add_buffer().
3239  * 3) Extract (copied) data from buffer using pool_get_buffer().
3240  * 4) Optionally you can:
3241  *		Obtain buffer length by using pool_get_buffer_length().
3242  *		Obtain buffer pointer by using pool_get_buffer_pointer().
3243  * 5) Discard buffer using pool_discard_buffer().
3244  */
3245 
3246 /*
3247  * Create and return internal buffer
3248  */
pool_create_buffer(void)3249 static POOL_INTERNAL_BUFFER * pool_create_buffer(void)
3250 {
3251 	POOL_INTERNAL_BUFFER *p;
3252 
3253 	p = palloc0(sizeof(*p));
3254 	return p;
3255 }
3256 
3257 /*
3258  * Discard internal buffer
3259  */
3260 static void
pool_discard_buffer(POOL_INTERNAL_BUFFER * buffer)3261 pool_discard_buffer(POOL_INTERNAL_BUFFER * buffer)
3262 {
3263 	if (buffer)
3264 	{
3265 		if (buffer->buf)
3266 			pfree(buffer->buf);
3267 		pfree(buffer);
3268 	}
3269 }
3270 
3271 /*
3272  * Add data to internal buffer
3273  */
3274 static void
pool_add_buffer(POOL_INTERNAL_BUFFER * buffer,void * data,size_t len)3275 pool_add_buffer(POOL_INTERNAL_BUFFER * buffer, void *data, size_t len)
3276 {
3277 #define POOL_ALLOCATE_UNIT 8192
3278 
3279 	/* Sanity check */
3280 	if (!buffer || !data || len == 0)
3281 		return;
3282 	POOL_SESSION_CONTEXT *session_context = pool_get_session_context(false);
3283 	MemoryContext old_context = MemoryContextSwitchTo(session_context->memory_context);
3284 
3285 	/* Check if we need to increase the buffer size */
3286 	if ((buffer->buflen + len) > buffer->bufsize)
3287 	{
3288 		size_t		allocate_size = ((buffer->buflen + len) / POOL_ALLOCATE_UNIT + 1) * POOL_ALLOCATE_UNIT;
3289 
3290 		ereport(DEBUG2,
3291 				(errmsg("memcache adding data to internal buffer"),
3292 				 errdetail("realloc old size:%zd new size:%zd",
3293 						   buffer->bufsize, allocate_size)));
3294 		buffer->bufsize = allocate_size;
3295 		buffer->buf = (char *) repalloc(buffer->buf, buffer->bufsize);
3296 	}
3297 	/* Add data to buffer */
3298 	memcpy(buffer->buf + buffer->buflen, data, len);
3299 	buffer->buflen += len;
3300 	ereport(DEBUG2,
3301 			(errmsg("memcache adding data to internal buffer"),
3302 			 errdetail("len:%zd, total:%zd bufsize:%zd",
3303 					   len, buffer->buflen, buffer->bufsize)));
3304 	MemoryContextSwitchTo(old_context);
3305 	return;
3306 }
3307 
3308 /*
3309  * Get data from internal buffer.
3310  * Data is stored in newly malloc memory.
3311  * Data length is returned to len.
3312  */
3313 static void *
pool_get_buffer(POOL_INTERNAL_BUFFER * buffer,size_t * len)3314 pool_get_buffer(POOL_INTERNAL_BUFFER * buffer, size_t *len)
3315 {
3316 	void	   *p;
3317 
3318 	if (buffer->bufsize == 0 || buffer->buflen == 0 ||
3319 		buffer->buf == NULL)
3320 	{
3321 		*len = 0;
3322 		return NULL;
3323 	}
3324 
3325 	p = palloc(buffer->buflen);
3326 	memcpy(p, buffer->buf, buffer->buflen);
3327 	*len = buffer->buflen;
3328 	return p;
3329 }
3330 
3331 /*
3332  * Get internal buffer length.
3333  */
3334 static size_t
pool_get_buffer_length(POOL_INTERNAL_BUFFER * buffer)3335 pool_get_buffer_length(POOL_INTERNAL_BUFFER * buffer)
3336 {
3337 	if (buffer == NULL)
3338 		return 0;
3339 
3340 	return buffer->buflen;
3341 }
3342 
3343 #ifdef NOT_USED
3344 /*
3345  * Get internal buffer pointer.
3346  */
3347 static char *
pool_get_buffer_pointer(POOL_INTERNAL_BUFFER * buffer)3348 pool_get_buffer_pointer(POOL_INTERNAL_BUFFER * buffer)
3349 {
3350 	if (buffer == NULL)
3351 		return NULL;
3352 	return buffer->buf;
3353 }
3354 #endif
3355 /*
3356  * Get query cache buffer struct of current query context
3357  */
3358 POOL_TEMP_QUERY_CACHE *
pool_get_current_cache(void)3359 pool_get_current_cache(void)
3360 {
3361 	POOL_SESSION_CONTEXT *session_context;
3362 	POOL_QUERY_CONTEXT *query_context;
3363 	POOL_TEMP_QUERY_CACHE *p = NULL;
3364 
3365 	session_context = pool_get_session_context(true);
3366 	if (session_context)
3367 	{
3368 		query_context = session_context->query_context;
3369 		if (query_context)
3370 		{
3371 			p = query_context->temp_cache;
3372 		}
3373 	}
3374 	return p;
3375 }
3376 
3377 /*
3378  * Get query cache buffer of current query context
3379  */
3380 static char *
pool_get_current_cache_buffer(size_t * len)3381 pool_get_current_cache_buffer(size_t *len)
3382 {
3383 	char	   *p = NULL;
3384 
3385 	*len = 0;
3386 	POOL_TEMP_QUERY_CACHE *cache;
3387 
3388 	cache = pool_get_current_cache();
3389 	if (cache)
3390 	{
3391 		p = pool_get_buffer(cache->buffer, len);
3392 	}
3393 	return p;
3394 }
3395 
3396 /*
3397  * Mark this temporary query cache buffer discarded if the SELECT
3398  * uses the table oid specified by oids.
3399  */
3400 static void
pool_check_and_discard_cache_buffer(int num_oids,int * oids)3401 pool_check_and_discard_cache_buffer(int num_oids, int *oids)
3402 {
3403 	POOL_SESSION_CONTEXT *session_context;
3404 	POOL_TEMP_QUERY_CACHE *cache;
3405 	int			num_caches;
3406 	size_t		len;
3407 	int		   *soids;
3408 	int			i,
3409 				j,
3410 				k;
3411 
3412 	session_context = pool_get_session_context(true);
3413 
3414 	if (!session_context || !session_context->query_cache_array)
3415 		return;
3416 
3417 	num_caches = session_context->query_cache_array->num_caches;
3418 
3419 	for (i = 0; i < num_caches; i++)
3420 	{
3421 		cache = session_context->query_cache_array->caches[i];
3422 		if (!cache || cache->is_discarded)
3423 			continue;
3424 
3425 		soids = (int *) pool_get_buffer(cache->oids, &len);
3426 		if (!soids || !len)
3427 			continue;
3428 
3429 		for (j = 0; j < cache->num_oids; j++)
3430 		{
3431 			if (cache->is_discarded)
3432 				break;
3433 
3434 			for (k = 0; k < num_oids; k++)
3435 			{
3436 				if (soids[j] == oids[k])
3437 				{
3438 					ereport(DEBUG1,
3439 							(errmsg("discard cache for \"%s\"", cache->query)));
3440 					cache->is_discarded = true;
3441 					break;
3442 				}
3443 			}
3444 		}
3445 		pfree(soids);
3446 	}
3447 }
3448 
3449 /*
3450  * At Ready for Query or Comand Complete handle query cache.  For streaming
3451  * replication mode and extended query at Comand Complete handle query cache.
3452  * For other case At Ready for Query handle query cache.
3453  */
3454 void
pool_handle_query_cache(POOL_CONNECTION_POOL * backend,char * query,Node * node,char state)3455 pool_handle_query_cache(POOL_CONNECTION_POOL * backend, char *query, Node *node, char state)
3456 {
3457 	POOL_SESSION_CONTEXT *session_context;
3458 	pool_sigset_t oldmask;
3459 	char	   *cache_buffer;
3460 	size_t		len;
3461 	int			num_oids;
3462 	int		   *oids;
3463 	int			i;
3464 
3465 	session_context = pool_get_session_context(true);
3466 
3467 	/* Ok to cache SELECT result? */
3468 	if (pool_is_cache_safe())
3469 	{
3470 		SelectContext ctx;
3471 		MemoryContext old_context;
3472 
3473 		old_context = MemoryContextSwitchTo(session_context->memory_context);
3474 		num_oids = pool_extract_table_oids_from_select_stmt(node, &ctx);
3475 		MemoryContextSwitchTo(old_context);
3476 		oids = ctx.table_oids;
3477 		ereport(DEBUG2,
3478 				(errmsg("query cache handler for ReadyForQuery"),
3479 				 errdetail("num_oids: %d oid: %d", num_oids, *oids)));
3480 
3481 		if (state == 'I')		/* Not inside a transaction? */
3482 		{
3483 			/*
3484 			 * Make sure that temporary cache is not exceeded.
3485 			 */
3486 			if (!pool_is_cache_exceeded())
3487 			{
3488 				POOL_TEMP_QUERY_CACHE *cache;
3489 
3490 				/*
3491 				 * If we are not inside a transaction, we can immediately
3492 				 * register to cache storage.
3493 				 */
3494 				/* Register to memcached or shmem */
3495 				POOL_SETMASK2(&BlockSig, &oldmask);
3496 				pool_shmem_lock();
3497 
3498 				cache_buffer = pool_get_current_cache_buffer(&len);
3499 				if (cache_buffer)
3500 				{
3501 					if (session_context->query_context->skip_cache_commit == false)
3502 					{
3503 						if (pool_commit_cache(backend, query, cache_buffer, len, num_oids, oids) != 0)
3504 						{
3505 							ereport(WARNING,
3506 									(errmsg("ReadyForQuery: pool_commit_cache failed")));
3507 						}
3508 					}
3509 
3510 					/*
3511 					 * Reset temporary query cache buffer. This is necessary
3512 					 * if extended query protocol is used and a bind/execute
3513 					 * message arrives which uses a statement created by prior
3514 					 * parse message. In this case since the temp_cache is not
3515 					 * initialized by a parse message, messages are added to
3516 					 * pre existing temp cache buffer. The problem was found
3517 					 * in bug#152.
3518 					 * http://www.pgpool.net/mantisbt/view.php?id=152
3519 					 */
3520 					cache = pool_get_current_cache();
3521 					ereport(DEBUG1,
3522 							(errmsg("pool_handle_query_cache: temp_cache: %p", cache)));
3523 					pool_discard_temp_query_cache(cache);
3524 
3525 					if (SL_MODE && pool_is_doing_extended_query_message())
3526 						session_context->query_context->temp_cache = NULL;
3527 					else
3528 						session_context->query_context->temp_cache = pool_create_temp_query_cache(query);
3529 					pfree(cache_buffer);
3530 				}
3531 				pool_shmem_unlock();
3532 				POOL_SETMASK(&oldmask);
3533 			}
3534 
3535 			/* Count up SELECT stats */
3536 			pool_stats_count_up_num_selects(1);
3537 
3538 			/* Reset temp buffer */
3539 			pool_reset_memqcache_buffer(true);
3540 		}
3541 		else
3542 		{
3543 			POOL_TEMP_QUERY_CACHE *cache = pool_get_current_cache();
3544 
3545 			/* In transaction. Keep to temp query cache array */
3546 			pool_add_oids_temp_query_cache(cache, num_oids, oids);
3547 
3548 			/*
3549 			 * If temp cache has been overflowed, just trash the half baked
3550 			 * temp cache.
3551 			 */
3552 			if (pool_is_cache_exceeded())
3553 			{
3554 				POOL_TEMP_QUERY_CACHE *cache;
3555 
3556 				cache = pool_get_current_cache();
3557 				pool_discard_temp_query_cache(cache);
3558 
3559 				/*
3560 				 * Reset temp_cache pointer in the current query context so
3561 				 * that we don't double free memory.
3562 				 */
3563 				session_context->query_context->temp_cache = NULL;
3564 
3565 			}
3566 
3567 			/*
3568 			 * Otherwise add to the temp cache array.
3569 			 */
3570 			else
3571 			{
3572 				session_context->query_cache_array =
3573 					pool_add_query_cache_array(session_context->query_cache_array, cache);
3574 
3575 				/*
3576 				 * Reset temp_cache pointer in the current query context so
3577 				 * that we don't add the same temp cache to the cache array.
3578 				 * This is necessary such that case when next query is just a
3579 				 * "bind message", without "parse message". In the case the
3580 				 * query context is reused and same cache pointer will be
3581 				 * added to the query_cache_array which we do not want.
3582 				 */
3583 				session_context->query_context->temp_cache = NULL;
3584 			}
3585 
3586 			/* Count up temporary SELECT stats */
3587 			pool_tmp_stats_count_up_num_selects();
3588 		}
3589 	}
3590 	else if (is_rollback_query(node))	/* Rollback? */
3591 	{
3592 		/* Discard buffered data */
3593 		pool_reset_memqcache_buffer(true);
3594 	}
3595 	else if (is_commit_query(node)) /* Commit? */
3596 	{
3597 		int			num_caches;
3598 
3599 		POOL_SETMASK2(&BlockSig, &oldmask);
3600 		pool_shmem_lock();
3601 
3602 		/* Invalidate query cache */
3603 		if (pool_config->memqcache_auto_cache_invalidation)
3604 		{
3605 			num_oids = pool_get_dml_table_oid(&oids);
3606 			pool_invalidate_query_cache(num_oids, oids, true, 0);
3607 		}
3608 
3609 		/*--------------------------------------------------------------------
3610 		 * If we have something in the query cache buffer, that means either:
3611 		 * - We only had SELECTs in the transaction
3612 		 * - We had only SELECTs after the last DML
3613 		 * Thus we can register SELECT results to cache storage.
3614 		 *--------------------------------------------------------------------
3615 		 */
3616 		num_caches = session_context->query_cache_array->num_caches;
3617 		for (i = 0; i < num_caches; i++)
3618 		{
3619 			POOL_TEMP_QUERY_CACHE *cache;
3620 
3621 			cache = session_context->query_cache_array->caches[i];
3622 			if (!cache || cache->is_discarded)
3623 				continue;
3624 
3625 			num_oids = cache->num_oids;
3626 			oids = pool_get_buffer(cache->oids, &len);
3627 			cache_buffer = pool_get_buffer(cache->buffer, &len);
3628 
3629 			if (pool_commit_cache(backend, cache->query, cache_buffer, len, num_oids, oids) != 0)
3630 			{
3631 				ereport(WARNING,
3632 						(errmsg("ReadyForQuery: pool_commit_cache failed")));
3633 			}
3634 			if (oids)
3635 				pfree(oids);
3636 			if (cache_buffer)
3637 				pfree(cache_buffer);
3638 		}
3639 		pool_shmem_unlock();
3640 		POOL_SETMASK(&oldmask);
3641 
3642 		/* Count up number of SELECT stats */
3643 		pool_stats_count_up_num_selects(pool_tmp_stats_get_num_selects());
3644 
3645 		pool_reset_memqcache_buffer(true);
3646 	}
3647 	else						/* Non cache safe queries */
3648 	{
3649 		/* Non cachable SELECT */
3650 		if (node && IsA(node, SelectStmt))
3651 		{
3652 			/* Extract table oids from buffer */
3653 			num_oids = pool_get_dml_table_oid(&oids);
3654 
3655 			if (state == 'I')
3656 			{
3657 				/*
3658 				 * If Data-modifying statements in SELECT's WITH clause,
3659 				 * invalidate query cache.
3660 				 */
3661 				if (num_oids > 0 && pool_config->memqcache_auto_cache_invalidation)
3662 				{
3663 					POOL_SETMASK2(&BlockSig, &oldmask);
3664 					pool_shmem_lock();
3665 					pool_invalidate_query_cache(num_oids, oids, true, 0);
3666 					pool_shmem_unlock();
3667 					POOL_SETMASK(&oldmask);
3668 				}
3669 
3670 				/* Count up SELECT stats */
3671 				pool_stats_count_up_num_selects(1);
3672 				pool_reset_memqcache_buffer(true);
3673 			}
3674 			else
3675 			{
3676 				/*
3677 				 * If we are inside a transaction, we cannot invalidate
3678 				 * query cache yet. However we can clear cache buffer, if
3679 				 * DML/DDL modifies the TABLE which SELECT uses.
3680 				 */
3681 				if (num_oids > 0 && pool_config->memqcache_auto_cache_invalidation)
3682 				{
3683 					pool_check_and_discard_cache_buffer(num_oids, oids);
3684 					pool_reset_memqcache_buffer(false);
3685 				}
3686 
3687 				/* Count up temporary SELECT stats */
3688 				pool_tmp_stats_count_up_num_selects();
3689 			}
3690 		}
3691 
3692 		/*
3693 		 * If the query is DROP DATABASE, discard both of caches in
3694 		 * shmem/memcached and oidmap in memqcache_oiddir.
3695 		 */
3696 		else if (is_drop_database(node) && session_context->query_context->dboid != 0)
3697 		{
3698 			int			dboid = session_context->query_context->dboid;
3699 
3700 			num_oids = pool_get_dropdb_table_oids(&oids, dboid);
3701 
3702 			if (num_oids > 0 && pool_config->memqcache_auto_cache_invalidation)
3703 			{
3704 				pool_shmem_lock();
3705 				pool_invalidate_query_cache(num_oids, oids, true, dboid);
3706 				pool_discard_oid_maps_by_db(dboid);
3707 				pool_shmem_unlock();
3708 				pool_reset_memqcache_buffer(true);
3709 
3710 				pfree(oids);
3711 				ereport(DEBUG2,
3712 						(errmsg("query cache handler for ReadyForQuery"),
3713 						 errdetail("deleted all cache files for the DROPped DB")));
3714 			}
3715 		}
3716 		else
3717 		{
3718 			/*
3719 			 * DML/DCL/DDL case
3720 			 */
3721 
3722 			/* Extract table oids from buffer */
3723 			num_oids = pool_get_dml_table_oid(&oids);
3724 			if (num_oids > 0 && pool_config->memqcache_auto_cache_invalidation)
3725 			{
3726 				/*
3727 				 * If we are not inside a transaction, we can immediately
3728 				 * invalidate query cache.
3729 				 */
3730 				if (state == 'I')
3731 				{
3732 					POOL_SETMASK2(&BlockSig, &oldmask);
3733 					pool_shmem_lock();
3734 					pool_invalidate_query_cache(num_oids, oids, true, 0);
3735 					pool_shmem_unlock();
3736 					POOL_SETMASK(&oldmask);
3737 					pool_reset_memqcache_buffer(true);
3738 				}
3739 				else
3740 				{
3741 					/*
3742 					 * If we are inside a transaction, we cannot invalidate
3743 					 * query cache yet. However we can clear cache buffer, if
3744 					 * DML/DDL modifies the TABLE which SELECT uses.
3745 					 */
3746 					pool_check_and_discard_cache_buffer(num_oids, oids);
3747 					pool_reset_memqcache_buffer(false);
3748 				}
3749 			}
3750 			else if (num_oids == 0)
3751 			{
3752 				/*
3753 				 * It is also necessary to clear cache buffers in case of no
3754 				 * oid queries (like BEGIN, CHECKPOINT, VACUUM, etc) too.
3755 				 */
3756 				pool_reset_memqcache_buffer(true);
3757 			}
3758 		}
3759 	}
3760 }
3761 
3762 /*
3763  * Create and initialize query cache stats
3764  */
3765 static POOL_QUERY_CACHE_STATS * stats;
3766 int
pool_init_memqcache_stats(void)3767 pool_init_memqcache_stats(void)
3768 {
3769 	stats = pool_shared_memory_create(sizeof(POOL_QUERY_CACHE_STATS));
3770 	pool_reset_memqcache_stats();
3771 	return 0;
3772 }
3773 
3774 /*
3775  * Returns copy of stats area. The copy is in static area and will be
3776  * overwritten by next call to this function.
3777  */
3778 POOL_QUERY_CACHE_STATS *
pool_get_memqcache_stats(void)3779 pool_get_memqcache_stats(void)
3780 {
3781 	static POOL_QUERY_CACHE_STATS mystats;
3782 	pool_sigset_t oldmask;
3783 
3784 	memset(&mystats, 0, sizeof(POOL_QUERY_CACHE_STATS));
3785 
3786 	if (stats)
3787 	{
3788 		POOL_SETMASK2(&BlockSig, &oldmask);
3789 		pool_semaphore_lock(QUERY_CACHE_STATS_SEM);
3790 		memcpy(&mystats, stats, sizeof(POOL_QUERY_CACHE_STATS));
3791 		pool_semaphore_unlock(QUERY_CACHE_STATS_SEM);
3792 		POOL_SETMASK(&oldmask);
3793 	}
3794 
3795 	return &mystats;
3796 }
3797 
3798 /*
3799  * Reset query cache stats. Caller must lock QUERY_CACHE_STATS_SEM if
3800  * necessary.
3801  */
3802 void
pool_reset_memqcache_stats(void)3803 pool_reset_memqcache_stats(void)
3804 {
3805 	memset(stats, 0, sizeof(POOL_QUERY_CACHE_STATS));
3806 	stats->start_time = time(NULL);
3807 }
3808 
3809 /*
3810  * Count up number of successful SELECTs and returns the number.
3811  * QUERY_CACHE_STATS_SEM lock is acquired in this function.
3812  */
3813 long long int
pool_stats_count_up_num_selects(long long int num)3814 pool_stats_count_up_num_selects(long long int num)
3815 {
3816 	pool_sigset_t oldmask;
3817 
3818 	POOL_SETMASK2(&BlockSig, &oldmask);
3819 	pool_semaphore_lock(QUERY_CACHE_STATS_SEM);
3820 	stats->num_selects += num;
3821 	pool_semaphore_unlock(QUERY_CACHE_STATS_SEM);
3822 	POOL_SETMASK(&oldmask);
3823 	return stats->num_selects;
3824 }
3825 
3826 /*
3827  * Count up number of successful SELECTs in temporary area and returns
3828  * the number.
3829  */
3830 long long int
pool_tmp_stats_count_up_num_selects(void)3831 pool_tmp_stats_count_up_num_selects(void)
3832 {
3833 	POOL_SESSION_CONTEXT *session_context;
3834 
3835 	session_context = pool_get_session_context(false);
3836 	session_context->num_selects++;
3837 	return session_context->num_selects;
3838 }
3839 
3840 /*
3841  * Return number of successful SELECTs in temporary area.
3842  */
3843 long long int
pool_tmp_stats_get_num_selects(void)3844 pool_tmp_stats_get_num_selects(void)
3845 {
3846 	POOL_SESSION_CONTEXT *session_context;
3847 
3848 	session_context = pool_get_session_context(false);
3849 	return session_context->num_selects;
3850 }
3851 
3852 /*
3853  * Reset number of successful SELECTs in temporary area.
3854  */
3855 void
pool_tmp_stats_reset_num_selects(void)3856 pool_tmp_stats_reset_num_selects(void)
3857 {
3858 	POOL_SESSION_CONTEXT *session_context;
3859 
3860 	session_context = pool_get_session_context(false);
3861 	session_context->num_selects = 0;
3862 }
3863 
3864 /*
3865  * Count up number of SELECTs extracted from cache returns the number.
3866  * QUERY_CACHE_STATS_SEM lock is acquired in this function.
3867  */
3868 long long int
pool_stats_count_up_num_cache_hits(void)3869 pool_stats_count_up_num_cache_hits(void)
3870 {
3871 	pool_sigset_t oldmask;
3872 
3873 	POOL_SETMASK2(&BlockSig, &oldmask);
3874 	pool_semaphore_lock(QUERY_CACHE_STATS_SEM);
3875 	stats->num_cache_hits++;
3876 	pool_semaphore_unlock(QUERY_CACHE_STATS_SEM);
3877 	POOL_SETMASK(&oldmask);
3878 	return stats->num_cache_hits;
3879 }
3880 
3881 /*
3882  * On shared memory hash table implementation.  We use sub part of md5
3883  * hash key as hash function.  The experiment has shown that has_any()
3884  * of PostgreSQL is a little bit better than the method using part of
3885  * md5 hash value, but it seems adding some cpu cycles to call
3886  * hash_any() is not worth the trouble.
3887  */
3888 
3889 static volatile POOL_HASH_HEADER *hash_header;
3890 static volatile POOL_HASH_ELEMENT *hash_elements;
3891 static volatile POOL_HASH_ELEMENT *hash_free;
3892 
3893 /*
3894  * Initialize hash table on shared memory "nelements" is max number of
3895  * hash keys. The actual number of hash key is rounded up to power of
3896  * 2.
3897  */
3898 #undef POOL_HASH_DEBUG
3899 
3900 int
pool_hash_init(int nelements)3901 pool_hash_init(int nelements)
3902 {
3903 	size_t		size;
3904 	int			nelements2;		/* number of rounded up hash keys */
3905 	int			shift;
3906 	uint32		mask;
3907 	POOL_HASH_HEADER hh;
3908 	int			i;
3909 
3910 	if (nelements <= 0)
3911 		ereport(ERROR,
3912 				(errmsg("initializing hash table on shared memory, invalid number of elements: %d", nelements)));
3913 
3914 	/* Round up to power of 2 */
3915 	shift = 32;
3916 	nelements2 = 1;
3917 	do
3918 	{
3919 		nelements2 <<= 1;
3920 		shift--;
3921 	} while (nelements2 < nelements);
3922 
3923 	mask = ~0;
3924 	mask >>= shift;
3925 	size = (char *) &hh.elements - (char *) &hh + sizeof(POOL_HEADER_ELEMENT) * nelements2;
3926 	hash_header = pool_shared_memory_create(size);
3927 	hash_header->nhash = nelements2;
3928 	hash_header->mask = mask;
3929 
3930 #ifdef POOL_HASH_DEBUG
3931 	ereport(LOG,
3932 			(errmsg("initializing hash table on shared memory"),
3933 			 errdetail("size:%zd nelements2:%d", size, nelements2)));
3934 
3935 #endif
3936 
3937 	size = sizeof(POOL_HASH_ELEMENT) * nelements2;
3938 	hash_elements = pool_shared_memory_create(size);
3939 
3940 #ifdef POOL_HASH_DEBUG
3941 	ereport(LOG,
3942 			(errmsg("initializing hash table on shared memory"),
3943 			 errdetail("size:%zd nelements2:%d", size, nelements2)));
3944 #endif
3945 
3946 	for (i = 0; i < nelements2 - 1; i++)
3947 	{
3948 		hash_elements[i].next = (POOL_HASH_ELEMENT *) & hash_elements[i + 1];
3949 	}
3950 	hash_elements[nelements2 - 1].next = NULL;
3951 	hash_free = hash_elements;
3952 
3953 	return 0;
3954 }
3955 
3956 /*
3957  * Reset hash table on shared memory "nelements" is max number of
3958  * hash keys. The actual number of hash key is rounded up to power of
3959  * 2.
3960  */
3961 static int
pool_hash_reset(int nelements)3962 pool_hash_reset(int nelements)
3963 {
3964 	size_t		size;
3965 	int			nelements2;		/* number of rounded up hash keys */
3966 	int			shift;
3967 	uint32		mask;
3968 	POOL_HASH_HEADER hh;
3969 	int			i;
3970 
3971 	if (nelements <= 0)
3972 		ereport(ERROR,
3973 				(errmsg("clearing hash table on shared memory, invalid number of elements: %d", nelements)));
3974 
3975 	/* Round up to power of 2 */
3976 	shift = 32;
3977 	nelements2 = 1;
3978 	do
3979 	{
3980 		nelements2 <<= 1;
3981 		shift--;
3982 	} while (nelements2 < nelements);
3983 
3984 	mask = ~0;
3985 	mask >>= shift;
3986 
3987 	size = (char *) &hh.elements - (char *) &hh + sizeof(POOL_HEADER_ELEMENT) * nelements2;
3988 	memset((void *) hash_header, 0, size);
3989 
3990 	hash_header->nhash = nelements2;
3991 	hash_header->mask = mask;
3992 
3993 	size = sizeof(POOL_HASH_ELEMENT) * nelements2;
3994 	memset((void *) hash_elements, 0, size);
3995 
3996 	for (i = 0; i < nelements2 - 1; i++)
3997 	{
3998 		hash_elements[i].next = (POOL_HASH_ELEMENT *) & hash_elements[i + 1];
3999 	}
4000 	hash_elements[nelements2 - 1].next = NULL;
4001 	hash_free = hash_elements;
4002 
4003 	return 0;
4004 }
4005 
4006 /*
4007  * Search cacheid by MD5 hash key string
4008  * If found, returns cache id, otherwise NULL.
4009  */
4010 POOL_CACHEID *
pool_hash_search(POOL_QUERY_HASH * key)4011 pool_hash_search(POOL_QUERY_HASH * key)
4012 {
4013 	volatile	POOL_HASH_ELEMENT *element;
4014 
4015 	uint32		hash_key = create_hash_key(key);
4016 
4017 	if (hash_key >= hash_header->nhash)
4018 	{
4019 		ereport(WARNING,
4020 				(errmsg("memcache: searching cacheid from hash. invalid hash key"),
4021 				 errdetail("invalid hash key: %uld nhash: %ld",
4022 						   hash_key, hash_header->nhash)));
4023 		return NULL;
4024 	}
4025 
4026 	{
4027 		char		md5[POOL_MD5_HASHKEYLEN + 1];
4028 
4029 		memcpy(md5, key->query_hash, POOL_MD5_HASHKEYLEN);
4030 		md5[POOL_MD5_HASHKEYLEN] = '\0';
4031 #ifdef POOL_HASH_DEBUG
4032 		ereport(LOG,
4033 				(errmsg("searching hash table"),
4034 				 errdetail("hash_key:%d md5:%s", hash_key, md5)));
4035 #endif
4036 	}
4037 
4038 	element = hash_header->elements[hash_key].element;
4039 	while (element)
4040 	{
4041 		{
4042 			char		md5[POOL_MD5_HASHKEYLEN + 1];
4043 
4044 			memcpy(md5, key->query_hash, POOL_MD5_HASHKEYLEN);
4045 			md5[POOL_MD5_HASHKEYLEN] = '\0';
4046 #ifdef POOL_HASH_DEBUG
4047 			ereport(LOG,
4048 					(errmsg("searching hash table"),
4049 					 errdetail("element md5:%s", md5)));
4050 #endif
4051 		}
4052 
4053 		if (memcmp((const void *) element->hashkey.query_hash,
4054 				   (const void *) key->query_hash, sizeof(key->query_hash)) == 0)
4055 		{
4056 			return (POOL_CACHEID *) & element->cacheid;
4057 		}
4058 		element = element->next;
4059 	}
4060 	return NULL;
4061 }
4062 
4063 /*
4064  * Insert MD5 key and associated cache id into shmem hash table.  If
4065  * "update" is true, replace cacheid associated with the MD5 key,
4066  * rather than throw an error.
4067  */
4068 static int
pool_hash_insert(POOL_QUERY_HASH * key,POOL_CACHEID * cacheid,bool update)4069 pool_hash_insert(POOL_QUERY_HASH * key, POOL_CACHEID * cacheid, bool update)
4070 {
4071 	POOL_HASH_ELEMENT *element;
4072 	POOL_HASH_ELEMENT *new_element;
4073 
4074 	uint32		hash_key = create_hash_key(key);
4075 
4076 	if (hash_key >= hash_header->nhash)
4077 	{
4078 		ereport(WARNING,
4079 				(errmsg("memcache: adding cacheid to hash. invalid hash key"),
4080 				 errdetail("invalid hash key: %uld nhash: %ld",
4081 						   hash_key, hash_header->nhash)));
4082 		return -1;
4083 	}
4084 
4085 	{
4086 		char		md5[POOL_MD5_HASHKEYLEN + 1];
4087 
4088 		memcpy(md5, key->query_hash, POOL_MD5_HASHKEYLEN);
4089 		md5[POOL_MD5_HASHKEYLEN] = '\0';
4090 #ifdef POOL_HASH_DEBUG
4091 		ereport(LOG,
4092 				(errmsg("searching hash table"),
4093 				 errdetail("hash_key:%d md5:%s block:%d item:%d", hash_key, md5, cacheid->blockid, cacheid->itemid)));
4094 #endif
4095 	}
4096 
4097 	/*
4098 	 * Look for hash key.
4099 	 */
4100 	element = hash_header->elements[hash_key].element;
4101 
4102 	while (element)
4103 	{
4104 		if (memcmp((const void *) element->hashkey.query_hash,
4105 				   (const void *) key->query_hash, sizeof(key->query_hash)) == 0)
4106 		{
4107 			/* Hash key found. If "update" is false, just throw an error. */
4108 			char		md5[POOL_MD5_HASHKEYLEN + 1];
4109 
4110 			if (!update)
4111 			{
4112 				memcpy(md5, key->query_hash, POOL_MD5_HASHKEYLEN);
4113 				md5[POOL_MD5_HASHKEYLEN] = '\0';
4114 				ereport(LOG,
4115 						(errmsg("memcache: adding cacheid to hash. hash key:\"%s\" already exists", md5)));
4116 				return -1;
4117 			}
4118 			else
4119 			{
4120 				/* Update cache id */
4121 				memcpy((void *) &element->cacheid, cacheid, sizeof(POOL_CACHEID));
4122 				return 0;
4123 			}
4124 		}
4125 		element = element->next;
4126 	}
4127 
4128 	/*
4129 	 * Ok, same key did not exist. Just insert new hash key.
4130 	 */
4131 	new_element = (POOL_HASH_ELEMENT *) get_new_hash_element();
4132 	if (!new_element)
4133 	{
4134 		ereport(LOG,
4135 				(errmsg("memcache: adding cacheid to hash. failed to get new element")));
4136 		return -1;
4137 	}
4138 
4139 	element = hash_header->elements[hash_key].element;
4140 
4141 	hash_header->elements[hash_key].element = new_element;
4142 	new_element->next = element;
4143 
4144 	memcpy((void *) new_element->hashkey.query_hash, key->query_hash, POOL_MD5_HASHKEYLEN);
4145 	memcpy((void *) &new_element->cacheid, cacheid, sizeof(POOL_CACHEID));
4146 
4147 	return 0;
4148 }
4149 
4150 /*
4151  * Delete MD5 key and associated cache id from shmem hash table.
4152  */
4153 int
pool_hash_delete(POOL_QUERY_HASH * key)4154 pool_hash_delete(POOL_QUERY_HASH * key)
4155 {
4156 	POOL_HASH_ELEMENT *element;
4157 	POOL_HASH_ELEMENT **delete_point;
4158 	bool		found;
4159 
4160 	uint32		hash_key = create_hash_key(key);
4161 
4162 	if (hash_key >= hash_header->nhash)
4163 	{
4164 		ereport(LOG,
4165 				(errmsg("memcache: deleting key from hash. invalid key"),
4166 				 errdetail("invalid hash key: %uld nhash: %ld",
4167 						   hash_key, hash_header->nhash)));
4168 		return -1;
4169 	}
4170 
4171 	/*
4172 	 * Look for delete location
4173 	 */
4174 	found = false;
4175 	delete_point = (POOL_HASH_ELEMENT * *) & (hash_header->elements[hash_key].element);
4176 	element = hash_header->elements[hash_key].element;
4177 
4178 	while (element)
4179 	{
4180 		if (memcmp(element->hashkey.query_hash, key->query_hash, sizeof(key->query_hash)) == 0)
4181 		{
4182 			found = true;
4183 			break;
4184 		}
4185 		delete_point = &element->next;
4186 		element = element->next;
4187 	}
4188 
4189 	if (!found)
4190 	{
4191 		char		md5[POOL_MD5_HASHKEYLEN + 1];
4192 
4193 		memcpy(md5, key->query_hash, POOL_MD5_HASHKEYLEN);
4194 		md5[POOL_MD5_HASHKEYLEN] = '\0';
4195 		ereport(LOG,
4196 				(errmsg("memcache: deleting key from hash. key:\"%s\" not found", md5)));
4197 		return -1;
4198 	}
4199 
4200 	/*
4201 	 * Put back the element to free list
4202 	 */
4203 	*delete_point = element->next;
4204 	put_back_hash_element(element);
4205 
4206 	return 0;
4207 }
4208 
4209 /*
4210  * Calculate 32bit binary hash key (i.e. location in hash header) from MD5
4211  * string. We use top most 8 characters of MD5 string for calculation.
4212 */
4213 static uint32
create_hash_key(POOL_QUERY_HASH * key)4214 create_hash_key(POOL_QUERY_HASH * key)
4215 {
4216 #define POOL_HASH_NCHARS 8
4217 
4218 	char		md5[POOL_HASH_NCHARS + 1];
4219 	uint32		mask;
4220 
4221 	memcpy(md5, key->query_hash, POOL_HASH_NCHARS);
4222 	md5[POOL_HASH_NCHARS] = '\0';
4223 	mask = strtoul(md5, NULL, 16);
4224 	mask &= hash_header->mask;
4225 	return mask;
4226 }
4227 
4228 /*
4229  * Get new free hash element from free list.
4230  */
4231 static volatile POOL_HASH_ELEMENT *
get_new_hash_element(void)4232 get_new_hash_element(void)
4233 {
4234 	volatile	POOL_HASH_ELEMENT *elm;
4235 
4236 	if (!hash_free->next)
4237 	{
4238 		/* No free element */
4239 		return NULL;
4240 	}
4241 
4242 #ifdef POOL_HASH_DEBUG
4243 	ereport(LOG,
4244 			(errmsg("getting new hash element"),
4245 			 errdetail("hash_free->next:%p hash_free->next->next:%p",
4246 					   hash_free->next, hash_free->next->next)));
4247 #endif
4248 
4249 	elm = hash_free->next;
4250 	hash_free->next = elm->next;
4251 
4252 	return elm;
4253 }
4254 
4255 /*
4256  * Put back hash element to free list.
4257  */
4258 static void
put_back_hash_element(volatile POOL_HASH_ELEMENT * element)4259 put_back_hash_element(volatile POOL_HASH_ELEMENT * element)
4260 {
4261 	POOL_HASH_ELEMENT *elm;
4262 
4263 #ifdef POOL_HASH_DEBUG
4264 	ereport(LOG,
4265 			(errmsg("getting new hash element"),
4266 			 errdetail("hash_free->next:%p hash_free->next->next:%p",
4267 					   hash_free->next, hash_free->next->next)));
4268 #endif
4269 
4270 	elm = hash_free->next;
4271 	hash_free->next = (POOL_HASH_ELEMENT *) element;
4272 	element->next = elm;
4273 }
4274 
4275 /*
4276  * Return true if there's a free hash element.
4277  */
4278 static bool
is_free_hash_element(void)4279 is_free_hash_element(void)
4280 {
4281 	return hash_free->next != NULL;
4282 }
4283 
4284 /*
4285  * Returns shared memory cache stats.
4286  * Subsequent call to this function will break return value
4287  * because its in static memory.
4288  * Caller must hold shmem_lock before calling this function.
4289  * If in memory query cache is not enabled, all stats are 0.
4290  */
4291 POOL_SHMEM_STATS *
pool_get_shmem_storage_stats(void)4292 pool_get_shmem_storage_stats(void)
4293 {
4294 	static POOL_SHMEM_STATS mystats;
4295 	POOL_HASH_ELEMENT *element;
4296 	int			nblocks;
4297 	int			i;
4298 
4299 	memset(&mystats, 0, sizeof(POOL_SHMEM_STATS));
4300 
4301 	if (!pool_config->memory_cache_enabled)
4302 		return &mystats;
4303 
4304 	/*
4305 	 * Copy cache hit data
4306 	 */
4307 	mystats.cache_stats.num_selects = stats->num_selects;
4308 	mystats.cache_stats.num_cache_hits = stats->num_cache_hits;
4309 
4310 	if (pool_config->memqcache_method != SHMEM_CACHE)
4311 		return &mystats;
4312 
4313 	/* number of total hash entries */
4314 	mystats.num_hash_entries = hash_header->nhash;
4315 
4316 	/* number of used hash entries */
4317 	for (i = 0; i < hash_header->nhash; i++)
4318 	{
4319 		element = hash_header->elements[i].element;
4320 		while (element)
4321 		{
4322 			mystats.used_hash_entries++;
4323 			element = element->next;
4324 		}
4325 	}
4326 
4327 	nblocks = pool_get_memqcache_blocks();
4328 
4329 	for (i = 0; i < nblocks; i++)
4330 	{
4331 		POOL_CACHE_BLOCK_HEADER *bh;
4332 		POOL_CACHE_ITEM_POINTER *cip;
4333 		char	   *p = block_address(i);
4334 
4335 		bh = (POOL_CACHE_BLOCK_HEADER *) p;
4336 		int			j;
4337 
4338 		if (bh->flags & POOL_BLOCK_USED)
4339 		{
4340 			for (j = 0; j < bh->num_items; j++)
4341 			{
4342 				cip = item_pointer(p, j);
4343 				if (POOL_ITEM_DELETED & cip->flags)
4344 				{
4345 					mystats.fragment_cache_entries_size += item_header(p, j)->total_length;
4346 				}
4347 				else
4348 				{
4349 					/* number of used cache entries */
4350 					mystats.num_cache_entries++;
4351 					/* total size of used cache entries */
4352 					mystats.used_cache_entries_size += (item_header(p, j)->total_length + sizeof(POOL_CACHE_ITEM_POINTER));
4353 				}
4354 			}
4355 			mystats.used_cache_entries_size += sizeof(POOL_CACHE_BLOCK_HEADER);
4356 			/* total size of free(usable) cache entries */
4357 			mystats.free_cache_entries_size += bh->free_bytes;
4358 		}
4359 		else
4360 		{
4361 			mystats.free_cache_entries_size += pool_config->memqcache_cache_block_size;
4362 		}
4363 	}
4364 
4365 	/*
4366 	 * Copy POOL_QUERY_CACHE_STATS
4367 	 */
4368 	memcpy(&mystats.cache_stats, stats, sizeof(mystats.cache_stats));
4369 
4370 	return &mystats;
4371 }
4372 
4373 /*
4374  * Inject cached message to the target backend buffer to pretend as if backend
4375  * actually replies with Data row and Command Complete message.
4376  */
4377 static void
inject_cached_message(POOL_CONNECTION * backend,char * qcache,int qcachelen)4378 inject_cached_message(POOL_CONNECTION * backend, char *qcache, int qcachelen)
4379 {
4380 	char		kind;
4381 	int			len;
4382 	char	   *buf;
4383 	int			timeout;
4384 	int			i = 0;
4385 	bool		is_prepared_stmt = false;
4386 	POOL_SESSION_CONTEXT *session_context;
4387 	POOL_QUERY_CONTEXT *query_context;
4388 	POOL_PENDING_MESSAGE *msg;
4389 
4390 	session_context = pool_get_session_context(false);
4391 	query_context = session_context->query_context;
4392 	msg = pool_pending_message_find_lastest_by_query_context(query_context);
4393 
4394 	if (msg)
4395 	{
4396 		/*
4397 		 * If pending message found, we should extract target backend from it
4398 		 */
4399 		int			backend_id;
4400 
4401 		backend_id = pool_pending_message_get_target_backend_id(msg);
4402 		backend = CONNECTION(session_context->backend, backend_id);
4403 		timeout = -1;
4404 	}
4405 	else
4406 		timeout = 0;
4407 
4408 	/* Send flush messsage to backend to retrieve response of backend */
4409 	pool_write(backend, "H", 1);
4410 	len = htonl(sizeof(len));
4411 	pool_write_and_flush(backend, &len, sizeof(len));
4412 
4413 	/*
4414 	 * Push any response from backend
4415 	 */
4416 	for (;;)
4417 	{
4418 		/* check if there's any pending data */
4419 		if (!pool_ssl_pending(backend) && pool_read_buffer_is_empty(backend))
4420 		{
4421 			pool_set_timeout(timeout);
4422 			if (pool_check_fd(backend) != 0)
4423 			{
4424 				ereport(DEBUG1,
4425 						(errmsg("inject_cached_message: select shows no pending data")));
4426 				pool_set_timeout(-1);
4427 				break;
4428 			}
4429 			pool_set_timeout(-1);
4430 		}
4431 
4432 		pool_read(backend, &kind, 1);
4433 		ereport(DEBUG1,
4434 				(errmsg("inject_cached_message: push message kind: '%c'", kind)));
4435 		if (msg &&
4436 			((kind == 'T' && msg->type == POOL_DESCRIBE) ||
4437 			 (kind == '2' && msg->type == POOL_BIND)))
4438 		{
4439 			/* Pending message seen. Now it is likely to end of pending data */
4440 			timeout = 0;
4441 		}
4442 		pool_push(backend, &kind, sizeof(kind));
4443 		pool_read(backend, &len, sizeof(len));
4444 		pool_push(backend, &len, sizeof(len));
4445 		if ((ntohl(len) - sizeof(len)) > 0)
4446 		{
4447 			buf = pool_read2(backend, ntohl(len) - sizeof(len));
4448 			pool_push(backend, buf, ntohl(len) - sizeof(len));
4449 		}
4450 	}
4451 
4452 	/*
4453 	 * Inject row data and command complete
4454 	 */
4455 	while (i < qcachelen)
4456 	{
4457 		char		tmpkind;
4458 		int			tmplen;
4459 		char	   *p;
4460 
4461 		tmpkind = qcache[i];
4462 		i++;
4463 
4464 		memcpy(&tmplen, qcache + i, sizeof(tmplen));
4465 		i += sizeof(tmplen);
4466 		len = ntohl(tmplen);
4467 		p = qcache + i;
4468 		i += len - sizeof(tmplen);
4469 
4470 		/* No need to cache PARSE and BIND responses */
4471 		if (tmpkind == '1' || tmpkind == '2')
4472 		{
4473 			is_prepared_stmt = true;
4474 			continue;
4475 		}
4476 
4477 		/*
4478 		 * In the prepared statement execution, there is no need to send 'T'
4479 		 * response to the frontend.
4480 		 */
4481 		if (is_prepared_stmt && tmpkind == 'T')
4482 		{
4483 			continue;
4484 		}
4485 
4486 		/* push message */
4487 		ereport(DEBUG1,
4488 				(errmsg("inject_cached_message: push cached messages: '%c' len: %d", tmpkind, len)));
4489 		pool_push(backend, &tmpkind, 1);
4490 		pool_push(backend, &tmplen, sizeof(tmplen));
4491 		if (len > 0)
4492 			pool_push(backend, p, len - sizeof(tmplen));
4493 	}
4494 
4495 	/*
4496 	 * Pop data.
4497 	 */
4498 	pool_pop(backend, &len);
4499 }
4500