1 /* -*-pgsql-c-*- */
2 /*
3  * pgpool: a language independent connection pool server for PostgreSQL
4  * written by Tatsuo Ishii
5  *
6  * Copyright (c) 2003-2018	PgPool Global Development Group
7  *
8  * Permission to use, copy, modify, and distribute this software and
9  * its documentation for any purpose and without fee is hereby
10  * granted, provided that the above copyright notice appear in all
11  * copies and that both that copyright notice and this permission
12  * notice appear in supporting documentation, and that the name of the
13  * author not be used in advertising or publicity pertaining to
14  * distribution of the software without specific, written prior
15  * permission. The author makes no representations about the
16  * suitability of this software for any purpose.  It is provided "as
17  * is" without express or implied warranty.
18  *
19  * pool_memqcache.c: query cache on shmem or memcached
20  *
21  */
22 #define DATABASE_TO_OID_QUERY "SELECT oid FROM pg_database WHERE datname = '%s'"
23 
24 #include "pool.h"
25 
26 #include <stdio.h>
27 #include <string.h>
28 #include <unistd.h>
29 #include <sys/file.h>
30 #include <sys/stat.h>
31 #include <sys/types.h>
32 #include <fcntl.h>
33 #include <errno.h>
34 #include <string.h>
35 #include <stdlib.h>
36 #include <ctype.h>
37 #include <arpa/inet.h>
38 #include <dirent.h>
39 
40 #ifdef USE_MEMCACHED
41 #include <libmemcached/memcached.h>
42 #endif
43 
44 #include "auth/md5.h"
45 #include "pool_config.h"
46 #include "protocol/pool_proto_modules.h"
47 #include "parser/parsenodes.h"
48 #include "context/pool_session_context.h"
49 #include "query_cache/pool_memqcache.h"
50 #include "utils/pool_relcache.h"
51 #include "utils/pool_select_walker.h"
52 #include "utils/pool_stream.h"
53 #include "utils/pool_stream.h"
54 #include "utils/elog.h"
55 #include "utils/palloc.h"
56 #include "utils/memutils.h"
57 
58 
59 #ifdef USE_MEMCACHED
60 memcached_st *memc;
61 #endif
62 
63 static char* encode_key(const char *s, char *buf, POOL_CONNECTION_POOL *backend);
64 #ifdef DEBUG
65 static void dump_cache_data(const char *data, size_t len);
66 #endif
67 static int pool_commit_cache(POOL_CONNECTION_POOL *backend, char *query, char *data, size_t datalen, int num_oids, int *oids);
68 static int pool_fetch_cache(POOL_CONNECTION_POOL *backend, const char *query, char **buf, size_t *len);
69 static int send_cached_messages(POOL_CONNECTION *frontend, const char *qcache, int qcachelen);
70 static void send_message(POOL_CONNECTION *conn, char kind, int len, const char *data);
71 #ifdef USE_MEMCACHED
72 static int delete_cache_on_memcached(const char *key);
73 #endif
74 static int pool_get_dml_table_oid(int **oid);
75 static int pool_get_dropdb_table_oids(int **oids, int dboid);
76 static void pool_discard_dml_table_oid(void);
77 static void pool_invalidate_query_cache(int num_table_oids, int *table_oid, bool unlink, int dboid);
78 
79 static int pool_get_database_oid(void);
80 static void pool_add_table_oid_map(POOL_CACHEKEY *cachkey, int num_table_oids, int *table_oids);
81 static void pool_reset_memqcache_buffer(bool reset_dml_oids);
82 static POOL_CACHEID *pool_add_item_shmem_cache(POOL_QUERY_HASH *query_hash, char *data, int size);
83 static POOL_CACHEID *pool_find_item_on_shmem_cache(POOL_QUERY_HASH *query_hash);
84 static char *pool_get_item_shmem_cache(POOL_QUERY_HASH *query_hash, int *size, int *sts);
85 static POOL_QUERY_CACHE_ARRAY * pool_add_query_cache_array(POOL_QUERY_CACHE_ARRAY *cache_array, POOL_TEMP_QUERY_CACHE *cache);
86 static void pool_add_temp_query_cache(POOL_TEMP_QUERY_CACHE *temp_cache, char kind, char *data, int data_len);
87 static void pool_add_oids_temp_query_cache(POOL_TEMP_QUERY_CACHE *temp_cache, int num_oids, int *oids);
88 static POOL_INTERNAL_BUFFER *pool_create_buffer(void);
89 static void pool_discard_buffer(POOL_INTERNAL_BUFFER *buffer);
90 static void pool_add_buffer(POOL_INTERNAL_BUFFER *buffer, void *data, size_t len);
91 static void *pool_get_buffer(POOL_INTERNAL_BUFFER *buffer, size_t *len);
92 #ifdef NOT_USED
93 static char *pool_get_buffer_pointer(POOL_INTERNAL_BUFFER *buffer);
94 #endif
95 static char *pool_get_current_cache_buffer(size_t *len);
96 static size_t pool_get_buffer_length(POOL_INTERNAL_BUFFER *buffer);
97 static void pool_check_and_discard_cache_buffer(int num_oids, int *oids);
98 
99 static void pool_set_memqcache_blocks(int num_blocks);
100 static int pool_get_memqcache_blocks(void);
101 static void *pool_memory_cache_address(void);
102 static void pool_reset_fsmm(size_t size);
103 static void *pool_fsmm_address(void);
104 static void pool_update_fsmm(POOL_CACHE_BLOCKID blockid, size_t free_space);
105 static POOL_CACHE_BLOCKID pool_get_block(size_t free_space);
106 static POOL_CACHE_ITEM_HEADER *pool_cache_item_header(POOL_CACHEID *cacheid);
107 static int pool_init_cache_block(POOL_CACHE_BLOCKID blockid);
108 #if NOT_USED
109 static void pool_wipe_out_cache_block(POOL_CACHE_BLOCKID blockid);
110 #endif
111 static int pool_delete_item_shmem_cache(POOL_CACHEID *cacheid);
112 static char *block_address(int blockid);
113 static POOL_CACHE_ITEM_POINTER *item_pointer(char *block, int i);
114 static POOL_CACHE_ITEM_HEADER *item_header(char *block, int i);
115 static POOL_CACHE_BLOCKID pool_reuse_block(void);
116 #ifdef SHMEMCACHE_DEBUG
117 static void dump_shmem_cache(POOL_CACHE_BLOCKID blockid);
118 #endif
119 
120 static int pool_hash_reset(int nelements);
121 static int pool_hash_insert(POOL_QUERY_HASH *key, POOL_CACHEID *cacheid, bool update);
122 static uint32 create_hash_key(POOL_QUERY_HASH *key);
123 static volatile POOL_HASH_ELEMENT *get_new_hash_element(void);
124 static void put_back_hash_element(volatile POOL_HASH_ELEMENT *element);
125 static bool is_free_hash_element(void);
126 static void inject_cached_message(POOL_CONNECTION *backend, char *qcache, int qcachelen);
127 
128 /*
129  * Connect to Memcached
130  */
memcached_connect(void)131 int memcached_connect(void)
132 {
133 	char *memqcache_memcached_host;
134 	int memqcache_memcached_port;
135 #ifdef USE_MEMCACHED
136 	memcached_server_st *servers;
137 	memcached_return rc;
138 
139 	/* Already connected? */
140 	if (memc)
141 	{
142 		return 0;
143 	}
144 #endif
145 
146 	memqcache_memcached_host = pool_config->memqcache_memcached_host;
147 	memqcache_memcached_port = pool_config->memqcache_memcached_port;
148 
149 	ereport(DEBUG1,
150             (errmsg("connecting to memcached on Host:\"%s:%d\"", memqcache_memcached_host,memqcache_memcached_port)));
151 
152 #ifdef USE_MEMCACHED
153 	memc = memcached_create(NULL);
154 	servers = memcached_server_list_append(NULL,
155 										   memqcache_memcached_host,
156 										   memqcache_memcached_port,
157 										   &rc);
158 
159 	rc = memcached_server_push(memc, servers);
160 	if (rc != MEMCACHED_SUCCESS)
161 	{
162 		ereport(WARNING,
163 			(errmsg("failed to connect to memcached, server push error:\"%s\"\n", memcached_strerror(memc, rc))));
164 		memc = (memcached_st *)-1;
165 		return -1;
166 	}
167 	memcached_server_list_free(servers);
168 #else
169 	ereport(WARNING,
170 			(errmsg("failed to connect to memcached, memcached support is not enabled")));
171 	return -1;
172 #endif
173 	return 0;
174 }
175 
176 /*
177  * Disconnect to Memcached
178  */
memcached_disconnect(void)179 void memcached_disconnect (void)
180 {
181 #ifdef USE_MEMCACHED
182 	if (!memc)
183 	{
184 		return;
185 	}
186 	memcached_free(memc);
187 #else
188 	ereport(WARNING,
189 			(errmsg("failed to disconnect from memcached, memcached support is not enabled")));
190 #endif
191 }
192 
193 /*
194  * Register buffer data for query cache in memory cache
195  */
memqcache_register(char kind,POOL_CONNECTION * frontend,char * data,int data_len)196 void memqcache_register(char kind,
197                         POOL_CONNECTION *frontend,
198                         char *data,
199                         int data_len)
200 {
201 	POOL_TEMP_QUERY_CACHE *cache;
202 	POOL_SESSION_CONTEXT *session_context;
203 	POOL_QUERY_CONTEXT *query_context;
204 
205 	cache = pool_get_current_cache();
206 
207 	if (cache == NULL)
208 	{
209 		session_context = pool_get_session_context(true);
210 
211 		if (session_context && pool_is_query_in_progress())
212 		{
213 			char *query = pool_get_query_string();
214 			query_context = session_context->query_context;
215 
216 			if (query)
217 				query_context->temp_cache = pool_create_temp_query_cache(query);
218 		}
219 	}
220 
221 	pool_add_temp_query_cache(cache, kind, data, data_len);
222 }
223 
224 /*
225  * Commit SELECT results to cache storage.
226  */
pool_commit_cache(POOL_CONNECTION_POOL * backend,char * query,char * data,size_t datalen,int num_oids,int * oids)227 static int pool_commit_cache(POOL_CONNECTION_POOL *backend, char *query, char *data, size_t datalen, int num_oids, int *oids)
228 {
229 #ifdef USE_MEMCACHED
230 	memcached_return rc;
231 #endif
232 	POOL_CACHEKEY cachekey;
233 	char tmpkey[MAX_KEY];
234 	time_t memqcache_expire;
235 
236 	/*
237 	 * get_buflen() will return -1 if query result exceeds memqcache_maxcache
238 	 */
239 	if (datalen == -1)
240 	{
241 		return -1;
242 	}
243 
244 	/* query disabled */
245 	if (strlen(query) <= 0)
246 	{
247 		return -1;
248 	}
249 	ereport(DEBUG1,
250 		(errmsg("commiting SELECT results to cache storage"),
251 			 errdetail("Query=\"%s\"", query)));
252 
253 #ifdef DEBUG
254 	dump_cache_data(data, datalen);
255 #endif
256 
257 	/* encode md5key for memcached */
258 	encode_key(query, tmpkey, backend);
259 	ereport(DEBUG2,
260 		(errmsg("commiting SELECT results to cache storage"),
261 			 errdetail("search key : \"%s\"", tmpkey)));
262 
263 	memcpy(cachekey.hashkey, tmpkey, 32);
264 
265 	memqcache_expire = pool_config->memqcache_expire;
266 	ereport(DEBUG1,
267 		(errmsg("commiting SELECT results to cache storage"),
268 			 errdetail("memqcache_expire = %ld", memqcache_expire)));
269 
270 	if (pool_is_shmem_cache())
271 	{
272 		POOL_CACHEID *cacheid;
273 		POOL_QUERY_HASH query_hash;
274 
275 		memcpy(query_hash.query_hash, tmpkey, sizeof(query_hash.query_hash));
276 
277 		cacheid = pool_hash_search(&query_hash);
278 
279 		if (cacheid != NULL)
280 		{
281 			ereport(DEBUG1,
282 				(errmsg("commiting SELECT results to cache storage"),
283 					 errdetail("item already exists")));
284 
285 			return 0;
286 		}
287 		else
288 		{
289 			cacheid = pool_add_item_shmem_cache(&query_hash, data, datalen);
290 			if (cacheid == NULL)
291 			{
292 				ereport(LOG,
293 						(errmsg("failed to add item to shmem cache")));
294 				return -1;
295 			}
296 			else
297 			{
298 				ereport(DEBUG2,
299 					(errmsg("commiting SELECT results to cache storage"),
300 						errdetail("blockid: %d itemid: %d",
301 							   cacheid->blockid, cacheid->itemid)));
302 			}
303 			cachekey.cacheid.blockid = cacheid->blockid;
304 			cachekey.cacheid.itemid = cacheid->itemid;
305 		}
306 	}
307 
308 #ifdef USE_MEMCACHED
309 	else
310 	{
311 		rc = memcached_set(memc, tmpkey, 32,
312 						   data, datalen, (time_t)memqcache_expire, 0);
313 		if (rc != MEMCACHED_SUCCESS)
314 		{
315 			ereport(WARNING,
316 					(errmsg("cache commit failed with error:\"%s\"",memcached_strerror(memc, rc))));
317 			return -1;
318 		}
319 		ereport(DEBUG1,
320 			(errmsg("commiting SELECT results to cache storage"),
321 				 errdetail("set cache succeeded")));
322 	}
323 #endif
324 
325 	/*
326 	 * Register cache id to oid map
327 	 */
328 	pool_add_table_oid_map(&cachekey, num_oids, oids);
329 
330 	return 0;
331 }
332 
333 /*
334  * Fetch from memory cache.
335  * Return:
336  * 0: fetch success,
337  * 1: not found
338  */
pool_fetch_cache(POOL_CONNECTION_POOL * backend,const char * query,char ** buf,size_t * len)339 static int pool_fetch_cache(POOL_CONNECTION_POOL *backend, const char *query, char **buf, size_t *len)
340 {
341 	char *ptr;
342 	char tmpkey[MAX_KEY];
343 	int sts;
344 	char *p;
345 
346 	if (strlen(query) <= 0)
347 		ereport(ERROR,
348 			(errmsg("fetching from cache storage, no query")));
349 
350 	/* encode md5key for memcached */
351 	encode_key(query, tmpkey, backend);
352 	ereport(DEBUG1,
353 		(errmsg("fetching from cache storage"),
354 			 errdetail("search key \"%s\"", tmpkey)));
355 
356 
357 	if (pool_is_shmem_cache())
358 	{
359 		POOL_QUERY_HASH query_hash;
360 		int mylen;
361 
362 		memcpy(query_hash.query_hash, tmpkey, sizeof(query_hash.query_hash));
363 
364 		ptr = pool_get_item_shmem_cache(&query_hash, &mylen, &sts);
365 		if (ptr == NULL)
366 		{
367 			ereport(DEBUG1,
368 				(errmsg("fetching from cache storage"),
369 					 errdetail("cache not found on shared memory")));
370 
371 			return 1;
372 		}
373 		*len = mylen;
374 	}
375 #ifdef USE_MEMCACHED
376 	else
377 	{
378 		memcached_return rc;
379 		unsigned int flags;
380 
381 		ptr = memcached_get(memc, tmpkey, strlen(tmpkey), len, &flags, &rc);
382 
383 		if (rc != MEMCACHED_SUCCESS)
384 		{
385 			if (rc != MEMCACHED_NOTFOUND)
386 			{
387 				ereport(LOG,
388 					(errmsg("fetching from cache storage, memcached_get failed with error: \"%s\"", memcached_strerror(memc, rc))));
389 				/*
390 				 * Turn off memory cache support to prevent future errors.
391 				 */
392 				pool_config->memory_cache_enabled = 0;
393 				/* Behave as if cache not found */
394 				return 1;
395 			}
396 			else
397 			{
398 				/* Not found */
399 				ereport(DEBUG1,
400 					(errmsg("fetching from cache storage"),
401 						 errdetail("cache item not found for key: \"%s\" and query:\"%s\"",tmpkey,query)));
402 				return 1;
403 			}
404 		}
405 	}
406 #else
407 	else
408 	{
409 		ereport(ERROR,
410 			(errmsg("memcached support is not enabled")));
411 	}
412 #endif
413 
414 	p = palloc(*len);
415 
416 	memcpy(p, ptr, *len);
417 
418 	if (!pool_is_shmem_cache())
419 	{
420 		free(ptr);
421 	}
422 
423 	ereport(DEBUG1,
424 		(errmsg("fetching from cache storage"),
425 			 errdetail("query=\"%s\" len:%zd", query, *len)));
426 #ifdef DEBUG
427 	dump_cache_data(p, *len);
428 #endif
429 
430 	*buf = p;
431 
432 	return 0;
433 }
434 
435 /*
436  * encode key.
437  * create cache key as md5(username + query string + database name)
438  */
encode_key(const char * s,char * buf,POOL_CONNECTION_POOL * backend)439 static char* encode_key(const char *s, char *buf, POOL_CONNECTION_POOL *backend)
440 {
441 	char* strkey;
442 	int u_length;
443 	int d_length;
444 	int q_length;
445 	int length;
446 
447 	u_length = strlen(backend->info->user);
448 	ereport(DEBUG1,
449 		(errmsg("memcache encode key"),
450 			 errdetail("username: \"%s\" database_name: \"%s\"", backend->info->user,backend->info->database)));
451 
452 	d_length = strlen(backend->info->database);
453 
454 	q_length = strlen(s);
455 	ereport(DEBUG1,
456 		(errmsg("memcache encode key"),
457 			 errdetail("query: \"%s\"", s)));
458 
459 	length = u_length + d_length + q_length + 1;
460 
461 	strkey = (char*)palloc(sizeof(char) * length);
462 
463 	snprintf(strkey, length, "%s%s%s", backend->info->user, s, backend->info->database);
464 
465 	pool_md5_hash(strkey, strlen(strkey), buf);
466 	ereport(DEBUG1,
467 		(errmsg("memcache encode key"),
468 			 errdetail("`%s' -> `%s'", strkey, buf)));
469 	pfree(strkey);
470 	return buf;
471 }
472 
473 #ifdef DEBUG
474 /*
475  * dump cache data
476  */
dump_cache_data(const char * data,size_t len)477 static void dump_cache_data(const char *data, size_t len)
478 {
479 	int i;
480 	int plen;
481 
482 
483 	fprintf(stderr,"shmem: len = %zd\n", len);
484 
485 	while (len > 0)
486 	{
487 		fprintf(stderr,"shmem: kind:%c\n", *data++);
488 		len--;
489 		memmove(&plen, data, 4);
490 		len -= 4;
491 		data += 4;
492 		plen = ntohl(plen);
493 		fprintf(stderr,"shmem: len:%d\n", plen);
494 		plen -= 4;
495 
496 		fprintf(stderr, "shmem: ");
497 		for (i=0;i<plen;i++)
498 		{
499 			fprintf(stderr, "%02x ", (unsigned char)(*data++));
500 			len--;
501 		}
502 		fprintf(stderr, "\n");
503 	}
504 }
505 #endif
506 
507 /*
508  * send cached messages
509  */
send_cached_messages(POOL_CONNECTION * frontend,const char * qcache,int qcachelen)510 static int send_cached_messages(POOL_CONNECTION *frontend, const char *qcache, int qcachelen)
511 {
512 	int msg = 0;
513 	int i = 0;
514 	int is_prepared_stmt = 0;
515 	int len;
516 	const char *p;
517 
518 	while (i < qcachelen)
519 	{
520 		char tmpkind;
521 		int tmplen;
522 
523 		tmpkind = qcache[i];
524 		i++;
525 
526 		memcpy(&tmplen, qcache+i, sizeof(tmplen));
527 		i += sizeof(tmplen);
528 		len = ntohl(tmplen);
529 		p = qcache + i;
530 		i += len - sizeof(tmplen);
531 
532 		/* No need to cache PARSE and BIND responses */
533 		if (tmpkind == '1' || tmpkind == '2')
534 		{
535 			is_prepared_stmt = 1;
536 			continue;
537 		}
538 
539 		/*
540 		 * In the prepared statement execution, there is no need to send
541 		 * 'T' response to the frontend.
542 		 */
543 		if (is_prepared_stmt && tmpkind == 'T')
544 		{
545 			continue;
546 		}
547 
548 		/* send message to frontend */
549 		ereport(DEBUG1,
550 			(errmsg("memcache: sending cached messages: '%c' len: %d", tmpkind, len)));
551 		send_message(frontend, tmpkind, len, p);
552 
553 		msg++;
554 	}
555 
556 	return msg;
557 }
558 
559 /*
560  * send message to frontend
561  */
send_message(POOL_CONNECTION * conn,char kind,int len,const char * data)562 static void send_message(POOL_CONNECTION *conn, char kind, int len, const char *data)
563 {
564 	ereport(DEBUG2,
565 			(errmsg("memcache: sending messages: kind '%c', len=%d, data=%p", kind, len, data)));
566 
567 	pool_write(conn, &kind, 1);
568 
569 	len = htonl(len);
570 	pool_write(conn, &len, sizeof(len));
571 
572 	len = ntohl(len);
573 	pool_write(conn, (void *)data, len-sizeof(len));
574 }
575 
576 #ifdef USE_MEMCACHED
577 /*
578  * delete query cache on memcached
579  */
delete_cache_on_memcached(const char * key)580 static int delete_cache_on_memcached(const char *key)
581 {
582 
583 	memcached_return rc;
584 
585 	ereport(DEBUG2,
586 			(errmsg("memcache: deleteing cache on memcached with key: \"%s\"", key)));
587 
588 
589 	/* delete cache data on memcached. key is md5 hash query */
590     rc= memcached_delete(memc, key, 32, (time_t)0);
591 
592 	/* delete cache data on memcached is failed */
593     if (rc != MEMCACHED_SUCCESS && rc != MEMCACHED_BUFFERED)
594     {
595 		ereport(LOG,
596 				(errmsg("failed to delete cache on memcached, error:\"%s\"", memcached_strerror(memc, rc))));
597         return 0;
598     }
599     return 1;
600 
601 }
602 #endif
603 
604 /*
605  * Fetch SELECT data from cache if possible.
606  */
pool_fetch_from_memory_cache(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend,char * contents,bool * foundp)607 POOL_STATUS pool_fetch_from_memory_cache(POOL_CONNECTION *frontend,
608 										 POOL_CONNECTION_POOL *backend,
609 										 char *contents, bool *foundp)
610 {
611 	char *qcache;
612 	size_t qcachelen;
613 	int sts;
614 	pool_sigset_t oldmask;
615 
616 	ereport(DEBUG1,
617 			(errmsg("pool_fetch_from_memory_cache called")));
618 
619 	*foundp = false;
620 
621     POOL_SETMASK2(&BlockSig, &oldmask);
622 	pool_shmem_lock();
623 
624     PG_TRY();
625     {
626         sts = pool_fetch_cache(backend, contents, &qcache, &qcachelen);
627     }
628     PG_CATCH();
629     {
630         pool_shmem_unlock();
631         POOL_SETMASK(&oldmask);
632         PG_RE_THROW();
633     }
634     PG_END_TRY();
635 
636 	pool_shmem_unlock();
637 	POOL_SETMASK(&oldmask);
638 
639 	if (sts != 0)
640 		/* Cache not found */
641 		return POOL_CONTINUE;
642 
643 	/*
644 	 * Cache found. If we are doing extended query and in streaming
645 	 * replication mode, we need to retrieve any responses from backend and
646 	 * forward them to frontend.
647 	 */
648 	if (pool_is_doing_extended_query_message() && STREAM)
649 	{
650 		POOL_SESSION_CONTEXT *session_context;
651 		POOL_CONNECTION *target_backend;
652 
653 		ereport(DEBUG1,
654 				(errmsg("memcache: injecting cache data")));
655 
656 		session_context = pool_get_session_context(true);
657 		target_backend = CONNECTION(backend, session_context->load_balance_node_id);
658 		inject_cached_message(target_backend, qcache, qcachelen);
659 	}
660 	else
661 	{
662 		/*
663 		 * Send each messages to frontend
664 		 */
665 		send_cached_messages(frontend, qcache, qcachelen);
666 	}
667 
668 	pfree(qcache);
669 
670 	/*
671 	 * Send a "READY FOR QUERY" if not in extended query.
672 	 */
673 	if (!pool_is_doing_extended_query_message() && MAJOR(backend) == PROTO_MAJOR_V3)
674 	{
675 		signed char state;
676 
677 		/*
678 		 * We keep previous transaction state.
679 		 */
680 		state = MASTER(backend)->tstate;
681 		send_message(frontend, 'Z', 5, (char *)&state);
682 	}
683 
684 	if (!pool_is_doing_extended_query_message() || !STREAM)
685 	{
686 		if (pool_flush(frontend))
687 		{
688 			return POOL_END;
689 		}
690 	}
691 
692 	*foundp = true;
693 
694 	if (pool_config->log_per_node_statement)
695 		ereport(LOG,
696 				(errmsg("fetch from memory cache"),
697 				 errdetail("query result fetched from cache. statement: %s", contents)));
698 
699 	ereport(DEBUG1,
700 			(errmsg("fetch from memory cache"),
701 			 errdetail("query result found in the query cache, %s", contents)));
702 
703 	return POOL_CONTINUE;
704 }
705 
706 /*
707  * Simple and rough (thus unreliable) check if the query is likely
708  * SELECT. Just check if the query starts with SELECT or WITH. This
709  * can be used before parse tree is available.
710  */
pool_is_likely_select(char * query)711 bool pool_is_likely_select(char *query)
712 {
713 	bool do_continue = false;
714 
715 	if (query == NULL)
716 		return false;
717 
718 	if (pool_config->ignore_leading_white_space)
719 	{
720 		/* Ignore leading white spaces */
721 		while (*query && isspace(*query))
722 			query++;
723 	}
724 	if (! *query)
725 	{
726 		return false;
727 	}
728 
729 	/*
730 	 * Get rid of head comment.
731 	 * It is sure that the query is in correct format, because the parser
732 	 * has rejected bad queries such as the one with not-ended comment.
733 	 */
734 	while (*query)
735 	{
736 		/* Ignore spaces and return marks */
737 		do_continue = false;
738 		while (*query && isspace(*query))
739 		{
740 			query++;
741 			do_continue = true;
742 		}
743 		if (do_continue)
744 		{
745 			continue;
746 		}
747 
748 		while (*query && !strncmp(query, "\n", 2))
749 		{
750 			query++;
751 			do_continue = true;
752 		}
753 		if (do_continue)
754 		{
755 			query += 2;
756 			continue;
757 		}
758 
759 		/* Ignore comments like C */
760 		if (!strncmp(query, "/*", 2))
761 		{
762 			while (*query && strncmp(query, "*/", 2))
763 				query++;
764 
765 			query += 2;
766 			continue;
767 		}
768 
769 		/* Ignore SQL comments */
770 		if (!strncmp(query, "--", 2))
771 		{
772 			while (*query && strncmp(query, "\n", 2))
773 				query++;
774 
775 			query += 2;
776 			continue;
777 		}
778 
779 		if (!strncasecmp(query, "SELECT", 6) || !strncasecmp(query, "WITH", 4))
780 		{
781 			return true;
782 		}
783 
784 		query++;
785 	}
786 
787 	return false;
788 }
789 
790 /*
791  * Return true if SELECT can be cached.  "node" is the parse tree for
792  * the query and "query" is the query string.
793  * The query must be SELECT or WITH.
794  */
pool_is_allow_to_cache(Node * node,char * query)795 bool pool_is_allow_to_cache(Node *node, char *query)
796 {
797 	int i = 0;
798 	int num_oids = -1;
799 	SelectContext ctx;
800 
801 	/*
802 	 * If NO QUERY CACHE comment exists, do not cache.
803 	 */
804 	if (!strncasecmp(query, NO_QUERY_CACHE, NO_QUERY_CACHE_COMMENT_SZ))
805 		return false;
806 	/*
807 	 * Check black table list first.
808 	 */
809 	if (pool_config->num_black_memqcache_table_list > 0)
810 	{
811 		/* Extract oids in from clause of SELECT, and check if SELECT to them could be cached. */
812 		num_oids = pool_extract_table_oids_from_select_stmt(node, &ctx);
813 		if (num_oids > 0)
814 		{
815 			for (i = 0; i < num_oids; i++)
816 			{
817 				ereport(DEBUG1,
818 						(errmsg("memcache: checking if node is allowed to cache: check table_names[%d] = \"%s\"", i, ctx.table_names[i])));
819 				if (pool_is_table_in_black_list(ctx.table_names[i]) == true)
820 				{
821 					ereport(DEBUG1,
822 							(errmsg("memcache: node is not allowed to cache")));
823 					return false;
824 				}
825 			}
826 		}
827 	}
828 
829 	/* SELECT INTO or SELECT FOR SHARE or UPDATE cannot be cached */
830 	if (pool_has_insertinto_or_locking_clause(node))
831 		return false;
832 
833 	/*
834 	 * If SELECT uses non immutable functions, it's not allowed to
835 	 * cache.
836 	 */
837 	if (pool_has_non_immutable_function_call(node))
838 		return false;
839 
840 	/*
841 	 * If SELECT uses temporary tables it's not allowed to cache.
842 	 */
843 	if (pool_config->check_temp_table && pool_has_temp_table(node))
844 		return false;
845 
846 	/*
847 	 * If SELECT uses system catalogs, it's not allowed to cache.
848 	 */
849 	if (pool_has_system_catalog(node))
850 		return false;
851 
852 	/*
853 	 * TABLESAMPLE is not allowed to cache.
854 	 */
855 	if (IsA(node, SelectStmt) && ((SelectStmt *)node)->fromClause)
856 	{
857 		List *tbl_list = ((SelectStmt *)node)->fromClause;
858 		ListCell   *tbl;
859 		foreach(tbl, tbl_list)
860 		{
861 			if (IsA(lfirst(tbl), RangeTableSample))
862 				return false;
863 		}
864 	}
865 
866 
867 	/*
868 	 * If the table is in the while list, allow to cache even if it is
869 	 * VIEW or unlogged table.
870 	 */
871 	if (pool_config->num_white_memqcache_table_list > 0)
872 	{
873 		if (num_oids < 0)
874 			num_oids = pool_extract_table_oids_from_select_stmt(node, &ctx);
875 
876 		if (num_oids > 0)
877 		{
878 			for (i = 0; i < num_oids; i++)
879 			{
880 				char *table = ctx.table_names[i];
881 				ereport(DEBUG1,
882 						(errmsg("memcache: checking if node is allowed to cache: check table_names[%d] = \"%s\"", i, table)));
883 				if (is_view(table) || is_unlogged_table(table))
884 				{
885 					if (pool_is_table_in_white_list(table) == false)
886 					{
887 						ereport(DEBUG1,
888 								(errmsg("memcache: node is not allowed to cache")));
889 						return false;
890 					}
891 				}
892 			}
893 		}
894 	}
895 	else
896 	{
897 		/*
898 		 * If SELECT uses views, it's not allowed to cache.
899 		 */
900 		if (pool_has_view(node))
901 			return false;
902 
903 		/*
904 		 * If SELECT uses unlogged tables, it's not allowed to cache.
905 		 */
906 		if (pool_has_unlogged_table(node))
907 			return false;
908 	}
909 
910 	/*
911 	 * If Data-modifying statements in WITH clause, it's not allowed to cache.
912 	 */
913 	if(IsA(node, SelectStmt) && ((SelectStmt *) node)->withClause)
914 	{
915 		ListCell	*lc;
916 		WithClause	*withClause = ((SelectStmt *) node)->withClause;
917 
918 		foreach(lc, withClause->ctes)
919 		{
920 			CommonTableExpr *cte = (CommonTableExpr *)lfirst(lc);
921 			if(IsA(cte->ctequery, InsertStmt) ||
922 			   IsA(cte->ctequery, DeleteStmt) ||
923 			   IsA(cte->ctequery, UpdateStmt))
924 			{
925 				return false;
926 			}
927 		}
928 	}
929 
930 	return true;
931 }
932 
933 
934 /*
935  * Return true If the SELECTed table is in back list.
936  */
pool_is_table_in_black_list(const char * table_name)937 bool pool_is_table_in_black_list(const char *table_name)
938 {
939 
940 	if (pool_config->num_black_memqcache_table_list > 0 &&
941 	    pattern_compare((char *)table_name, BLACKLIST, "black_memqcache_table_list") == 1)
942 	{
943 		return true;
944 	}
945 
946 	return false;
947 }
948 
949 /*
950  * Return true If the SELECTed table is in white list.
951  */
pool_is_table_in_white_list(const char * table_name)952 bool pool_is_table_in_white_list(const char *table_name)
953 {
954 	if (pool_config->num_white_memqcache_table_list > 0 &&
955 		pattern_compare((char *)table_name, WHITELIST, "white_memqcache_table_list") == 1)
956 	{
957 		return true;
958 	}
959 
960 	return false;
961 }
962 
963 /*
964  * Extract table oid from INSERT/UPDATE/DELETE/TRUNCATE/
965  * DROP TABLE/ALTER TABLE/COPY FROM statement.
966  * For SELECT, if Data-modifying statements in its WITH clause,
967  * extract table oid from Data-modifying statements too.
968  * Returns number of oids.
969  * In case of error, returns 0 (InvalidOid).
970  * oids buffer (oidsp) will be discarded by subsequent call.
971  */
pool_extract_table_oids(Node * node,int ** oidsp)972 int pool_extract_table_oids(Node *node, int **oidsp)
973 {
974 #define POOL_MAX_DML_OIDS 128
975 	char *table;
976 	static int oids[POOL_MAX_DML_OIDS];
977 	int num_oids;
978 	int oid;
979 
980 	if (node == NULL)
981 	{
982 		ereport(LOG,
983 				(errmsg("memcache: error while extracting table oids. statement is NULL")));
984 		return 0;
985 	}
986 
987 	num_oids = 0;
988 	*oidsp = oids;
989 
990 	if (IsA(node, InsertStmt))
991 	{
992 		InsertStmt *stmt = (InsertStmt *) node;
993 
994 		num_oids = pool_extract_withclause_oids((Node *) stmt->withClause, *oidsp);
995 		table = make_table_name_from_rangevar(stmt->relation);
996 	}
997 	else if (IsA(node, UpdateStmt))
998 	{
999 		UpdateStmt *stmt = (UpdateStmt *) node;
1000 
1001 		num_oids = pool_extract_withclause_oids((Node *) stmt->withClause, *oidsp);
1002 		table = make_table_name_from_rangevar(stmt->relation);
1003 	}
1004 	else if (IsA(node, DeleteStmt))
1005 	{
1006 		DeleteStmt *stmt = (DeleteStmt *) node;
1007 
1008 		num_oids = pool_extract_withclause_oids((Node *) stmt->withClause, *oidsp);
1009 		table = make_table_name_from_rangevar(stmt->relation);
1010 	}
1011 	else if(IsA(node, SelectStmt))
1012 	{
1013 		SelectStmt *stmt = (SelectStmt *) node;
1014 		num_oids = pool_extract_withclause_oids((Node *) stmt->withClause, *oidsp);
1015 		table = NULL;
1016 	}
1017 #ifdef NOT_USED
1018 	/*
1019 	 * We do not handle CREATE TABLE here.  It is possible that
1020 	 * pool_extract_table_oids() is called before CREATE TABLE gets
1021 	 * executed.
1022 	 */
1023 	else if (IsA(node, CreateStmt))
1024 	{
1025 		CreateStmt *stmt = (CreateStmt *)node;
1026 		table = make_table_name_from_rangevar(stmt->relation);
1027 	}
1028 #endif
1029 
1030 	else if (IsA(node, AlterTableStmt))
1031 	{
1032 		AlterTableStmt *stmt = (AlterTableStmt *)node;
1033 		table = make_table_name_from_rangevar(stmt->relation);
1034 	}
1035 
1036 	else if (IsA(node, CopyStmt))
1037 	{
1038 		CopyStmt *stmt = (CopyStmt *)node;
1039 		if (stmt->is_from)		/* COPY FROM? */
1040 		{
1041 			table = make_table_name_from_rangevar(stmt->relation);
1042 		}
1043 		else
1044 		{
1045 			return 0;
1046 		}
1047 	}
1048 
1049 	else if (IsA(node, DropStmt))
1050 	{
1051 		ListCell *cell;
1052 
1053 		DropStmt *stmt = (DropStmt *)node;
1054 
1055 		if (stmt->removeType != OBJECT_TABLE)
1056 		{
1057 			return 0;
1058 		}
1059 
1060 		/* Here, stmt->objects is list of target relation info.  The
1061 		 * first cell of target relation info is a list (possibly)
1062 		 * consists of database, schema and relation.  We need to call
1063 		 * makeRangeVarFromNameList() before passing to
1064 		 * make_table_name_from_rangevar. Otherwise we get weird excessively
1065 		 * decorated relation name (''table_name'').
1066 		 */
1067 		foreach(cell, stmt->objects)
1068 		{
1069 			if (num_oids >= POOL_MAX_DML_OIDS)
1070 			{
1071 				ereport(LOG,
1072 						(errmsg("memcache: error while extracting table oids. too many oids:%d", num_oids)));
1073 				return 0;
1074 			}
1075 
1076 			table = make_table_name_from_rangevar(makeRangeVarFromNameList(lfirst(cell)));
1077 			oid = pool_table_name_to_oid(table);
1078 			if (oid > 0)
1079 			{
1080 				oids[num_oids++] = pool_table_name_to_oid(table);
1081 				ereport(DEBUG1,
1082 						(errmsg("memcache: extracting table oids: table: \"%s\" oid:%d", table, oids[num_oids-1])));
1083 			}
1084 		}
1085 		return num_oids;
1086 	}
1087 	else if (IsA(node, TruncateStmt))
1088 	{
1089 		ListCell *cell;
1090 
1091 		TruncateStmt *stmt = (TruncateStmt *)node;
1092 
1093 		foreach(cell, stmt->relations)
1094 		{
1095 			if (num_oids >= POOL_MAX_DML_OIDS)
1096 			{
1097 				ereport(LOG,
1098 						(errmsg("memcache: error while extracting table oids. too many oids:%d", num_oids)));
1099 				return 0;
1100 			}
1101 
1102 			table = make_table_name_from_rangevar(lfirst(cell));
1103 			oid = pool_table_name_to_oid(table);
1104 			if (oid > 0)
1105 			{
1106 				oids[num_oids++] = pool_table_name_to_oid(table);
1107 				ereport(DEBUG1,
1108 						(errmsg("memcache: extracting table oids: table: \"%s\" oid:%d", table, oids[num_oids-1])));
1109 			}
1110 		}
1111 		return num_oids;
1112 	}
1113 	else if (IsA(node, ExplainStmt))
1114 	{
1115 		ListCell	*cell;
1116 		DefElem		*def;
1117 		ExplainStmt *stmt = (ExplainStmt *) node;
1118 
1119 		foreach(cell, stmt->options)
1120 		{
1121 			def = lfirst(cell);
1122 			if (strncmp("analyze", def->defname, 7) == 0)
1123 			{
1124 				return pool_extract_table_oids(stmt->query, oidsp);
1125 			}
1126 		}
1127 
1128 		table = NULL;
1129 	}
1130 	else
1131 	{
1132 		ereport(DEBUG1,
1133 				(errmsg("memcache: extracting table oids: statment is different from INSERT/UPDATE/DELETE/TRUNCATE/DROP TABLE/ALTER TABLE")));
1134 		return 0;
1135 	}
1136 
1137 	oid = pool_table_name_to_oid(table);
1138 	if (oid > 0)
1139 	{
1140 		if (num_oids >= POOL_MAX_DML_OIDS)
1141 		{
1142 			ereport(LOG,
1143 					(errmsg("memcache: error while extracting table oids. too many oids:%d", num_oids)));
1144 			return 0;
1145 		}
1146 
1147 		oids[num_oids++] = pool_table_name_to_oid(table);
1148 		ereport(DEBUG1,
1149 				(errmsg("memcache: extracting table oids: table: \"%s\" oid:%d", table, oid)));
1150 	}
1151 	return num_oids;
1152 }
1153 
1154 /*
1155  * Extract table oid from INSERT/UPDATE/DELETE
1156  * FROM statement in WITH clause.
1157  * Returns number of oids.
1158  * oids buffer (oidsp) will be discarded by subsequent call.
1159  */
1160 int
pool_extract_withclause_oids(Node * node,int * oidsp)1161 pool_extract_withclause_oids(Node *node, int *oidsp)
1162 {
1163 	int			num_oids = 0;
1164 	int			oid;
1165 	char	   *table;
1166 	ListCell   *lc;
1167 	WithClause *with;
1168 
1169 	if(oidsp == NULL)
1170 	{
1171 		return 0;
1172 	}
1173 
1174 	if(!node || !IsA(node, WithClause))
1175 	{
1176 		return 0;
1177 	}
1178 
1179 	with = (WithClause *) node;
1180 	foreach(lc, with->ctes)
1181 	{
1182 		CommonTableExpr *cte = (CommonTableExpr *)lfirst(lc);
1183 		if(IsA(cte->ctequery, InsertStmt))
1184 		{
1185 			InsertStmt *stmt = (InsertStmt *) cte->ctequery;
1186 			table = make_table_name_from_rangevar(stmt->relation);
1187 		}
1188 		else if(IsA(cte->ctequery, DeleteStmt))
1189 		{
1190 			DeleteStmt *stmt = (DeleteStmt *) cte->ctequery;
1191 			table = make_table_name_from_rangevar(stmt->relation);
1192 		}
1193 		else if(IsA(cte->ctequery, UpdateStmt))
1194 		{
1195 			UpdateStmt *stmt = (UpdateStmt *) cte->ctequery;
1196 			table = make_table_name_from_rangevar(stmt->relation);
1197 		}
1198 		else
1199 		{
1200 			/* only check INSERT/DELETE/UPDATE in WITH clause */
1201 			table = NULL;
1202 		}
1203 
1204 		oid = pool_table_name_to_oid(table);
1205 		if (oid > 0)
1206 		{
1207 			if (num_oids >= POOL_MAX_DML_OIDS)
1208 			{
1209 				break;
1210 			}
1211 
1212 			oidsp[num_oids++] = pool_table_name_to_oid(table);
1213 			ereport(DEBUG1,
1214 					(errmsg("memcache: extracting table oids: table: \"%s\" oid:%d", table, oidsp[num_oids - 1])));
1215 		}
1216 	}
1217 
1218 	return num_oids;
1219 }
1220 
1221 #define POOL_OIDBUF_SIZE 1024
1222 static int* oidbuf;
1223 static int oidbufp;
1224 static int oidbuf_size;
1225 
1226 /*
1227  * Add table oid to internal buffer
1228  */
pool_add_dml_table_oid(int oid)1229 void pool_add_dml_table_oid(int oid)
1230 {
1231 	int i;
1232 	int* tmp;
1233 
1234 	if (oid == 0)
1235 		return;
1236 
1237 	if (oidbufp >= oidbuf_size)
1238 	{
1239 		MemoryContext oldcxt;
1240 		oidbuf_size += POOL_OIDBUF_SIZE;
1241 		/*
1242 		 * This need to live throughout the life of child so home it in
1243 		 * TopMemoryContext
1244 		 */
1245 		oldcxt = MemoryContextSwitchTo(TopMemoryContext);
1246 		tmp = repalloc(oidbuf, sizeof(int) * oidbuf_size);
1247 		MemoryContextSwitchTo(oldcxt);
1248 		if (tmp == NULL)
1249 			return;
1250 
1251 		oidbuf = tmp;
1252 	}
1253 
1254 	for (i=0;i<oidbufp;i++)
1255 	{
1256 		if (oidbuf[i] == oid)
1257 		/* Already same oid exists */
1258 			return;
1259 	}
1260 	oidbuf[oidbufp++] = oid;
1261 }
1262 
1263 
1264 /*
1265  * Get table oid buffer
1266  */
pool_get_dml_table_oid(int ** oid)1267 static int pool_get_dml_table_oid(int **oid)
1268 {
1269 	*oid = oidbuf;
1270 	return oidbufp;
1271 }
1272 
pool_get_dropdb_table_oids(int ** oids,int dboid)1273 static int pool_get_dropdb_table_oids(int **oids, int dboid)
1274 {
1275 	int *rtn = 0;
1276 	int oids_size = 0;
1277 	int *tmp;
1278 
1279 	int num_oids = 0;
1280 	DIR *dir;
1281 	struct dirent *dp;
1282 	char path[1024];
1283 
1284 	snprintf(path, sizeof(path), "%s/%d", pool_config->memqcache_oiddir, dboid);
1285 	if ((dir = opendir(path)) == NULL)
1286 	{
1287 		ereport(DEBUG1,
1288 			(errmsg("memcache: getting drop table oids"),
1289 				 errdetail("Failed to open dir: %s", path)));
1290 		return 0;
1291 	}
1292 
1293 	while ((dp = readdir(dir)) != NULL)
1294 	{
1295 		if (strcmp(dp->d_name, ".") == 0 || strcmp(dp->d_name, "..") == 0)
1296 			continue;
1297 
1298 		if (num_oids >= oids_size)
1299 		{
1300 			oids_size += POOL_OIDBUF_SIZE;
1301 			tmp = repalloc(rtn, sizeof(int) * oids_size);
1302 			if (tmp == NULL)
1303 			{
1304 				closedir(dir);
1305 				return 0;
1306 			}
1307 			rtn = tmp;
1308 		}
1309 
1310 		rtn[num_oids] = atol(dp->d_name);
1311 		num_oids++;
1312 	}
1313 
1314 	closedir(dir);
1315 	*oids = rtn;
1316 
1317 	return num_oids;
1318 }
1319 
1320 /* Discard oid internal buffer */
pool_discard_dml_table_oid(void)1321 static void pool_discard_dml_table_oid(void)
1322 {
1323 	oidbufp = 0;
1324 }
1325 
1326 /*
1327  * Management modules for oid map.  When caching SELECT results, we
1328  * record table oids to file, which has following structure.
1329  *
1330  * memqcache_oiddir -+- database_oid -+-table_oid_file1
1331  *                                    |
1332  *                                    +-table_oid_file2
1333  *                                    |
1334  *                                    +-table_oid_file3...
1335  *
1336  * table_oid_file's name is table oid, which was used by the SELECT
1337  * statement. The file has 1 or more cacheid(s). When SELECT result is
1338  * cached, the file is created and cache id is appended. Later SELECT
1339  * using same table oid will add to the same file. If the SELECT uses
1340  * multiple tables, multiple table_oid_file will be created. When
1341  * INSERT/UPDATE/DELETE is executed, corresponding caches must be
1342  * deleted (cache invalidation) (when DROP TABLE, ALTER TABLE is
1343  * executed, the caches must be deleted as well). When database is
1344  * dropped, all caches belonging to the database must be deleted.
1345  */
1346 
1347 /*
1348  * Get oid of current database
1349  */
pool_get_database_oid(void)1350 static int pool_get_database_oid(void)
1351 {
1352 /*
1353  * Query to convert table name to oid
1354  */
1355 	int oid = 0;
1356 	static POOL_RELCACHE *relcache;
1357 	POOL_CONNECTION_POOL *backend;
1358 
1359 	backend = pool_get_session_context(false)->backend;
1360 
1361 	/*
1362 	 * If relcache does not exist, create it.
1363 	 */
1364 	if (!relcache)
1365 	{
1366 		relcache = pool_create_relcache(pool_config->relcache_size, DATABASE_TO_OID_QUERY,
1367 										int_register_func, int_unregister_func,
1368 										false);
1369 		if (relcache == NULL)
1370 		{
1371 			ereport(LOG,
1372 				(errmsg("memcache: error creating relcache while getting database OID")));
1373 			return oid;
1374 		}
1375 	}
1376 
1377 	/*
1378 	 * Search relcache.
1379 	 */
1380 	oid = (int)(intptr_t)pool_search_relcache(relcache, backend,
1381 											  MASTER_CONNECTION(backend)->sp->database);
1382 	return oid;
1383 }
1384 
1385 /*
1386  * Get oid of current database for discarding cache files
1387  * after executing DROP DATABASE
1388  */
pool_get_database_oid_from_dbname(char * dbname)1389 int pool_get_database_oid_from_dbname(char *dbname)
1390 {
1391 	int dboid = 0;
1392 	POOL_SELECT_RESULT *res;
1393 	char query[1024];
1394 
1395 	POOL_CONNECTION_POOL *backend;
1396 	backend = pool_get_session_context(false)->backend;
1397 
1398 	snprintf(query, sizeof(query), DATABASE_TO_OID_QUERY, dbname);
1399 	do_query(MASTER(backend), query, &res, MAJOR(backend));
1400 
1401 	if (res->numrows != 1)
1402 	{
1403 		ereport(DEBUG1,
1404 			(errmsg("memcache: getting oid of current database"),
1405 				 errdetail("received %d rows", res->numrows)));
1406 		free_select_result(res);
1407 		return 0;
1408 	}
1409 
1410 	dboid = atol(res->data[0]);
1411 	free_select_result(res);
1412 
1413 	return dboid;
1414 }
1415 
1416 /*
1417  * Add cache id (shmem case) or hash key (memcached case) to table oid
1418  * map file.  Caller must hold shmem lock before calling this function
1419  * to avoid file extension conflict among different pgpool child
1420  * process.
1421  * As of pgpool-II 3.2, pool_handle_query_cache is responsible for that.
1422  * (pool_handle_query_cache -> pool_commit_cache -> pool_add_table_oid_map)
1423  */
pool_add_table_oid_map(POOL_CACHEKEY * cachekey,int num_table_oids,int * table_oids)1424 static void pool_add_table_oid_map(POOL_CACHEKEY *cachekey, int num_table_oids, int *table_oids)
1425 {
1426 	char *dir;
1427 	int dboid;
1428 	char path[1024];
1429 	int i;
1430 	int len;
1431 
1432 	/*
1433 	 * Create memqcache_oiddir
1434 	 */
1435 	dir = pool_config->memqcache_oiddir;
1436 
1437 	if (mkdir(dir, S_IREAD|S_IWRITE|S_IEXEC) == -1)
1438 	{
1439 		if (errno != EEXIST)
1440 		{
1441 			ereport(WARNING,
1442 				(errmsg("memcache: adding table oid maps, failed to create directory:\"%s\". error:\"%s\"", dir, strerror(errno))));
1443 			return;
1444 		}
1445 	}
1446 
1447 	/*
1448 	 * Create memqcache_oiddir/database_oid
1449 	 */
1450 	dboid = pool_get_database_oid();
1451 	ereport(DEBUG1,
1452 		(errmsg("memcache: adding table oid maps"),
1453 			 errdetail("dboid %d", dboid)));
1454 
1455 	if (dboid <= 0)
1456 	{
1457 		ereport(WARNING,
1458 				(errmsg("memcache: adding table oid maps, failed to get database OID")));
1459 		return;
1460 	}
1461 
1462 	snprintf(path, sizeof(path), "%s/%d", dir, dboid);
1463 	if (mkdir(path, S_IREAD|S_IWRITE|S_IEXEC) == -1)
1464 	{
1465 		if (errno != EEXIST)
1466 		{
1467 			ereport(WARNING,
1468 					(errmsg("memcache: adding table oid maps, failed to create directory:\"%s\". error:\"%s\"", path, strerror(errno))));
1469 			return;
1470 		}
1471 	}
1472 
1473 	if (pool_is_shmem_cache())
1474 	{
1475 		len = sizeof(cachekey->cacheid);
1476 	}
1477 	else
1478 	{
1479 		len = sizeof(cachekey->hashkey);
1480 	}
1481 
1482 	for (i=0;i<num_table_oids;i++)
1483 	{
1484 		int fd;
1485 		int oid = table_oids[i];
1486 		int sts;
1487 		struct flock fl;
1488 
1489 		/*
1490 		 * Create or open each memqcache_oiddir/database_oid/table_oid
1491 		 */
1492 		snprintf(path, sizeof(path), "%s/%d/%d", dir, dboid, oid);
1493 		if ((fd = open(path, O_CREAT|O_RDWR, S_IRUSR|S_IWUSR)) == -1)
1494 		{
1495 			ereport(WARNING,
1496 					(errmsg("memcache: adding table oid maps, failed to open file:\"%s\". error:\"%s\"", path, strerror(errno))));
1497 			return;
1498 		}
1499 
1500 		fl.l_type   = F_WRLCK;
1501 		fl.l_whence = SEEK_SET;
1502 		fl.l_start  = 0;        	/* Offset from l_whence         */
1503 		fl.l_len    = 0;        	/* length, 0 = to EOF           */
1504 
1505 		sts = fcntl(fd, F_SETLKW, &fl);
1506 		if (sts == -1)
1507 		{
1508 			ereport(WARNING,
1509 					(errmsg("memcache: adding table oid maps, failed to lock file:\"%s\". error:\"%s\"", path, strerror(errno))));
1510 
1511 			close(fd);
1512 			return;
1513 		}
1514 
1515 		/*
1516 		 * Below was ifdef-out because of a performance reason.
1517 		 * Looking for duplicate cache entries in a file needed
1518 		 * unacceptably high cost. So we gave up this and decided not
1519 		 * to care about duplicate entries in the file.
1520 		 */
1521 #ifdef NOT_USED
1522 		for (;;)
1523 		{
1524 			sts = read(fd, (char *)&buf, len);
1525 			if (sts == -1)
1526 			{
1527 				ereport(WARNING,
1528 						(errmsg("memcache: adding table oid maps, failed to read file:\"%s\". error:\"%s\"", path, strerror(errno))));
1529 				close(fd);
1530 				return;
1531 			}
1532 			else if (sts == len)
1533 			{
1534 				if (memcmp(cachekey, &buf, len) == 0)
1535 				{
1536 					/* Same key found. Skip this */
1537 					close(fd);
1538 					return;
1539 				}
1540 				continue;
1541 			}
1542 			/*
1543 			 * Must be EOF
1544 			 */
1545 			if (sts != 0)
1546 			{
1547 				ereport(WARNING,
1548 						(errmsg("memcache: adding table oid maps, invalid data length:%d in file:\"%s\". error:\"%s\"",sts, path)));
1549 				close(fd);
1550 				return;
1551 			}
1552 			break;
1553 		}
1554 #endif
1555 
1556 		if (lseek(fd, 0, SEEK_END) == -1)
1557 		{
1558 			ereport(WARNING,
1559 					(errmsg("memcache: adding table oid maps, failed seek on file:\"%s\". error:\"%s\"", path, strerror(errno))));
1560 			close(fd);
1561 			return;
1562 		}
1563 
1564 		/*
1565 		 * Write cache_id or cache key at the end of file
1566 		 */
1567 		sts = write(fd, (char *)cachekey, len);
1568 		if (sts == -1 || sts != len)
1569 		{
1570 			ereport(WARNING,
1571 					(errmsg("memcache: adding table oid maps, failed to write file:\"%s\". error:\"%s\"", path, strerror(errno))));
1572 			close(fd);
1573 			return;
1574 		}
1575 		close(fd);
1576 	}
1577 }
1578 
1579 /*
1580  * Discard all oid maps at pgpool-II startup.
1581  * This is necessary for shmem case.
1582  */
pool_discard_oid_maps(void)1583 void pool_discard_oid_maps(void)
1584 {
1585 	char command[1024];
1586 
1587 	snprintf(command, sizeof(command), "/bin/rm -fr %s/[0-9]*",
1588 			 pool_config->memqcache_oiddir);
1589 	if(system(command) == -1)
1590         ereport(WARNING,
1591             (errmsg("unable to execute command \"%s\"",command),
1592              errdetail("system() command failed with error \"%s\"",strerror(errno))));
1593 
1594 
1595 }
1596 
pool_discard_oid_maps_by_db(int dboid)1597 void pool_discard_oid_maps_by_db(int dboid)
1598 {
1599 	char command[1024];
1600 
1601 	if (pool_is_shmem_cache())
1602 	{
1603 		snprintf(command, sizeof(command), "/bin/rm -fr %s/%d/",
1604 				 pool_config->memqcache_oiddir, dboid);
1605 
1606 		ereport(DEBUG1,
1607 				(errmsg("memcache: discarding oid maps by db"),
1608 				 errdetail("command: '%s\'", command)));
1609 
1610 		if(system(command) == -1)
1611             ereport(WARNING,
1612 				(errmsg("unable to execute command \"%s\"",command),
1613                      errdetail("system() command failed with error \"%s\"",strerror(errno))));
1614 	}
1615 }
1616 
1617 /*
1618  * Read cache id (shmem case) or hash key (memcached case) from table
1619  * oid map file according to table_oids and discard cache entries.  If
1620  * unlink is true, the file will be unlinked after successful cache
1621  * removal.
1622  */
pool_invalidate_query_cache(int num_table_oids,int * table_oid,bool unlinkp,int dboid)1623 static void pool_invalidate_query_cache(int num_table_oids, int *table_oid, bool unlinkp, int dboid)
1624 {
1625 	char *dir;
1626 	char path[1024];
1627 	int i;
1628 	int len;
1629 	POOL_CACHEKEY buf;
1630 
1631 	/*
1632 	 * Create memqcache_oiddir
1633 	 */
1634 	dir = pool_config->memqcache_oiddir;
1635 	if (mkdir(dir, S_IREAD|S_IWRITE|S_IEXEC) == -1)
1636 	{
1637 		if (errno != EEXIST)
1638 		{
1639 			ereport(WARNING,
1640 					(errmsg("memcache: invalidating query cache, failed to create directory:\"%s\". error:\"%s\"", dir, strerror(errno))));
1641 			return;
1642 		}
1643 	}
1644 
1645 	/*
1646 	 * Create memqcache_oiddir/database_oid
1647 	 */
1648 	if (dboid == 0)
1649 	{
1650 		dboid = pool_get_database_oid();
1651 		ereport(DEBUG1,
1652 			(errmsg("memcache invalidating query cache"),
1653 				 errdetail("dboid %d", dboid)));
1654 
1655 		if (dboid <= 0)
1656 		{
1657 			ereport(WARNING,
1658 					(errmsg("memcache: invalidating query cache, could not get database OID")));
1659 			return;
1660 		}
1661 	}
1662 
1663 	snprintf(path, sizeof(path), "%s/%d", dir, dboid);
1664 	if (mkdir(path, S_IREAD|S_IWRITE|S_IEXEC) == -1)
1665 	{
1666 		if (errno != EEXIST)
1667 		{
1668 			ereport(WARNING,
1669 					(errmsg("memcache: invalidating query cache, failed to create directory:\"%s\". error:\"%s\"", path, strerror(errno))));
1670 			return;
1671 		}
1672 	}
1673 
1674 	if (pool_is_shmem_cache())
1675 	{
1676 		len = sizeof(buf.cacheid);
1677 	}
1678 	else
1679 	{
1680 		len = sizeof(buf.hashkey);
1681 	}
1682 
1683 	for (i=0;i<num_table_oids;i++)
1684 	{
1685 		int fd;
1686 		int oid = table_oid[i];
1687 		int sts;
1688 		struct flock fl;
1689 
1690 		/*
1691 		 * Open each memqcache_oiddir/database_oid/table_oid
1692 		 */
1693 		snprintf(path, sizeof(path), "%s/%d/%d", dir, dboid, oid);
1694 		if ((fd = open(path, O_RDONLY)) == -1)
1695 		{
1696 			/* This may be normal. It is possible that no SELECT has
1697 			 * been issued since the table has been created or since
1698 			 * pgpool-II started up.
1699 			 */
1700 			ereport(DEBUG1,
1701 				(errmsg("memcache invalidating query cache"),
1702 					errdetail("failed to open \"%s\". reason:\"%s\"",path, strerror(errno))));
1703 			continue;
1704 		}
1705 
1706 		fl.l_type   = F_RDLCK;
1707 		fl.l_whence = SEEK_SET;
1708 		fl.l_start  = 0;        	/* Offset from l_whence         */
1709 		fl.l_len    = 0;        	/* length, 0 = to EOF           */
1710 
1711 		sts = fcntl(fd, F_SETLKW, &fl);
1712 		if (sts == -1)
1713 		{
1714 			ereport(WARNING,
1715 					(errmsg("memcache: invalidating query cache, failed to lock file:\"%s\". error:\"%s\"", path, strerror(errno))));
1716 			close(fd);
1717 			return;
1718 		}
1719 		for (;;)
1720 		{
1721 			sts = read(fd, (char *)&buf, len);
1722 			if (sts == -1)
1723 			{
1724 				ereport(WARNING,
1725 						(errmsg("memcache: invalidating query cache, failed to read file:\"%s\". error:\"%s\"", path, strerror(errno))));
1726 
1727 				close(fd);
1728 				return;
1729 			}
1730 			else if (sts == len)
1731 			{
1732 				if (pool_is_shmem_cache())
1733 				{
1734 					ereport(DEBUG1,
1735 						(errmsg("memcache invalidating query cache"),
1736 							errdetail("deleting cacheid:%d itemid:%d",
1737 								   buf.cacheid.blockid, buf.cacheid.itemid)));
1738 					pool_delete_item_shmem_cache(&buf.cacheid);
1739 				}
1740 #ifdef USE_MEMCACHED
1741 				else
1742 				{
1743 					char delbuf[33];
1744 
1745 					memcpy(delbuf, buf.hashkey, 32);
1746 					delbuf[32] = 0;
1747 					ereport(DEBUG1,
1748 						(errmsg("memcache invalidating query cache"),
1749 							 errdetail("deleting %s", delbuf)));
1750 
1751 					delete_cache_on_memcached(delbuf);
1752 				}
1753 #endif
1754 				continue;
1755 			}
1756 
1757 			/*
1758 			 * Must be EOF
1759 			 */
1760 			if (sts != 0)
1761 			{
1762 				ereport(WARNING,
1763 						(errmsg("memcache: invalidating query cache, invalid data length:%d in file:\"%s\"",sts, path)));
1764 				close(fd);
1765 				return;
1766 			}
1767 			break;
1768 		}
1769 
1770 		if (unlinkp)
1771 		{
1772 			unlink(path);
1773 		}
1774 		close(fd);
1775 	}
1776 #ifdef SHMEMCACHE_DEBUG
1777 	dump_shmem_cache(0);
1778 #endif
1779 }
1780 
1781 /*
1782  * Reset SELECT data buffers.  If reset_dml_oids is true, call
1783  * pool_discard_dml_table_oid() to reset table oids used in DML statements.
1784  */
pool_reset_memqcache_buffer(bool reset_dml_oids)1785 static void pool_reset_memqcache_buffer(bool reset_dml_oids)
1786 {
1787 	POOL_SESSION_CONTEXT * session_context;
1788 
1789 	session_context = pool_get_session_context(true);
1790 
1791 	if (session_context && session_context->query_cache_array)
1792 	{
1793 		POOL_TEMP_QUERY_CACHE *cache;
1794 
1795 		ereport(DEBUG1,
1796 			(errmsg("memcache reset buffer"),
1797 				 errdetail("discard: %p", session_context->query_cache_array)));
1798 
1799 		pool_discard_query_cache_array(session_context->query_cache_array);
1800 		session_context->query_cache_array = pool_create_query_cache_array();
1801 
1802 		ereport(DEBUG1,
1803 			(errmsg("memcache reset buffer"),
1804 				 errdetail("create: %p", session_context->query_cache_array)));
1805 		/*
1806 		 * if the query context is still under use, we cannot discard
1807 		 * temporary cache.
1808 		 */
1809 		if ((SL_MODE && pool_is_doing_extended_query_message()) ||
1810 			can_query_context_destroy(session_context->query_context))
1811 		{
1812 			ereport(DEBUG1,
1813 				(errmsg("memcache reset buffer"),
1814 					errdetail("discard temp buffer of %p (%s)",
1815 						   session_context->query_context, session_context->query_context->original_query)));
1816 
1817 			cache = pool_get_current_cache();
1818 			pool_discard_temp_query_cache(cache);
1819 			/*
1820 			 * Reset temp_cache pointer in the current query context
1821 			 * so that we don't double free memory.
1822 			 */
1823 			session_context->query_context->temp_cache = NULL;
1824 		}
1825 	}
1826 
1827 	if (reset_dml_oids)
1828 		pool_discard_dml_table_oid();
1829 
1830 	pool_tmp_stats_reset_num_selects();
1831 }
1832 
1833 /*
1834  * Return true if memory cache method is "shmem".  The purpose of this
1835  * function is to cache the result of stcmp and to save a few cycle.
1836  */
pool_is_shmem_cache(void)1837 bool pool_is_shmem_cache(void)
1838 {
1839 	return (pool_config->memqcache_method == SHMEM_CACHE)?true:false;
1840 }
1841 
1842 /*
1843  * Remember memory cache number of blocks.
1844  */
1845 static int memqcache_num_blocks;
pool_set_memqcache_blocks(int num_blocks)1846 static void pool_set_memqcache_blocks(int num_blocks)
1847 {
1848 	memqcache_num_blocks = num_blocks;
1849 }
1850 
1851 /*
1852  * Return memory cache number of blocks.
1853  */
pool_get_memqcache_blocks(void)1854 static int pool_get_memqcache_blocks(void)
1855 {
1856 	return memqcache_num_blocks;
1857 }
1858 
1859 /*
1860  * Query cache on shared memory management modules.
1861  */
1862 
1863 /*
1864  * Calculate necessary shared memory size.
1865  */
pool_shared_memory_cache_size(void)1866 size_t pool_shared_memory_cache_size(void)
1867 {
1868 	int64 num_blocks;
1869 	size_t size;
1870 
1871 	if (pool_config->memqcache_maxcache > pool_config->memqcache_cache_block_size)
1872 		ereport(FATAL,
1873 			(errmsg("invalid memory cache configuration"),
1874 					errdetail("memqcache_cache_block_size %d should be greater or equal to memqcache_maxcache %d",
1875 								pool_config->memqcache_cache_block_size,
1876 								pool_config->memqcache_maxcache)));
1877 
1878 
1879 	num_blocks = pool_config->memqcache_total_size/
1880 		pool_config->memqcache_cache_block_size;
1881 	if (num_blocks == 0)
1882 		ereport(FATAL,
1883 			(errmsg("invalid memory cache configuration"),
1884 					errdetail("memqcache_total_size %ld should be greater or equal to memqcache_cache_block_size %d",
1885 								pool_config->memqcache_total_size,
1886 								pool_config->memqcache_cache_block_size)));
1887 
1888 		ereport(LOG,
1889 			(errmsg("memory cache initialized"),
1890 					errdetail("memcache blocks :%ld",num_blocks)));
1891 	/* Remember # of blocks */
1892 	pool_set_memqcache_blocks(num_blocks);
1893 	size = pool_config->memqcache_cache_block_size * num_blocks;
1894 	return size;
1895 }
1896 
1897 /*
1898  * Acquire and initialize shared memory cache. This should be called
1899  * only once from pgpool main process at the process staring up time.
1900  */
1901 static void *shmem;
pool_init_memory_cache(size_t size)1902 int pool_init_memory_cache(size_t size)
1903 {
1904 	ereport(DEBUG1,
1905 		(errmsg("memory cache request size : %zd",size)));
1906 
1907 	shmem = pool_shared_memory_create(size);
1908 	return 0;
1909 }
1910 
1911 /*
1912  * Clear all the shared memory cache and reset FSMM and hash table.
1913  */
1914 void
pool_clear_memory_cache(void)1915 pool_clear_memory_cache(void)
1916 {
1917 	size_t size;
1918 	pool_sigset_t oldmask;
1919 
1920 	POOL_SETMASK2(&BlockSig, &oldmask);
1921 	pool_shmem_lock();
1922 
1923     PG_TRY();
1924     {
1925         size = pool_shared_memory_cache_size();
1926         memset(shmem, 0, size);
1927 
1928         size = pool_shared_memory_fsmm_size();
1929         pool_reset_fsmm(size);
1930 
1931         pool_discard_oid_maps();
1932 
1933         pool_hash_reset(pool_config->memqcache_max_num_cache);
1934     }
1935     PG_CATCH();
1936     {
1937         pool_shmem_unlock();
1938         POOL_SETMASK(&oldmask);
1939         PG_RE_THROW();
1940     }
1941     PG_END_TRY();
1942 
1943 	pool_shmem_unlock();
1944 	POOL_SETMASK(&oldmask);
1945 }
1946 
1947 /*
1948  * Return shared memory cache address
1949  */
pool_memory_cache_address(void)1950 static void *pool_memory_cache_address(void)
1951 {
1952 	return shmem;
1953 }
1954 
1955 /*
1956  * Initialize new block
1957  */
1958 
1959 /*
1960  * Free space management map
1961  *
1962  * Free space management map (FSMM) consists of bytes. Each byte
1963  * corresponds to block id. For example, if you have 1GB cache and
1964  * block size is 8Kb, number of blocks = 131,072, thus total size of
1965  * FSMM is 128Kb.  Each FSMM entry has value from 0 to 255. Those
1966  * values describes total free space in each block.
1967  * For example, if the value is 2, the free space can be between 64
1968  * bytes and 95 bytes.
1969  *
1970  * value free space(in bytes)
1971  * 0     0-31
1972  * 1     32-63
1973  * 2     64-95
1974  * 3     96-127
1975  * :
1976  * 255   8160-8192
1977  */
1978 
1979 /*
1980  * Calculate necessary shared memory size for FSMM. Should be called after
1981  * pool_shared_memory_cache_size.
1982  */
pool_shared_memory_fsmm_size(void)1983 size_t pool_shared_memory_fsmm_size(void)
1984 {
1985 	size_t size;
1986 
1987 	size = pool_get_memqcache_blocks() * sizeof(char);
1988 	return size;
1989 }
1990 
1991 /*
1992  * Acquire and initialize shared memory cache for FSMM. This should be
1993  * called after pool_shared_memory_cache_size only once from pgpool
1994  * main process at the process staring up time.
1995  */
1996 static void *fsmm;
pool_init_fsmm(size_t size)1997 int pool_init_fsmm(size_t size)
1998 {
1999 	int maxblock = 	pool_get_memqcache_blocks();
2000 	int encode_value;
2001 
2002 	fsmm = pool_shared_memory_create(size);
2003 	encode_value = POOL_MAX_FREE_SPACE/POOL_FSMM_RATIO;
2004 	memset(fsmm, encode_value, maxblock);
2005 	return 0;
2006 }
2007 
2008 /*
2009  * Return shared memory fsmm address
2010  */
pool_fsmm_address(void)2011 static void *pool_fsmm_address(void)
2012 {
2013 	return fsmm;
2014 }
2015 
2016 /*
2017  * Clock algorithm shared query cache management modules.
2018  */
2019 
2020 /*
2021  * Clock hand pointing to next victim block
2022  */
2023 static int *pool_fsmm_clock_hand;
2024 
2025 /*
2026  * Allocate and initialize clock hand on shmem
2027  */
pool_allocate_fsmm_clock_hand(void)2028 void pool_allocate_fsmm_clock_hand(void)
2029 {
2030 	pool_fsmm_clock_hand = pool_shared_memory_create(sizeof(*pool_fsmm_clock_hand));
2031 	*pool_fsmm_clock_hand = 0;
2032 }
2033 
2034 /*
2035  * Reset FSMM.
2036  */
2037 static void
pool_reset_fsmm(size_t size)2038 pool_reset_fsmm(size_t size)
2039 {
2040 	int encode_value;
2041 
2042 	encode_value = POOL_MAX_FREE_SPACE/POOL_FSMM_RATIO;
2043 	memset(fsmm, encode_value, size);
2044 
2045 	*pool_fsmm_clock_hand = 0;
2046 }
2047 
2048 /*
2049  * Find victim block using clock algorithm and make it free.
2050  * Returns new free block id.
2051  */
pool_reuse_block(void)2052 static POOL_CACHE_BLOCKID pool_reuse_block(void)
2053 {
2054 	int maxblock = pool_get_memqcache_blocks();
2055 	char *block = block_address(*pool_fsmm_clock_hand);
2056 	POOL_CACHE_BLOCK_HEADER *bh = (POOL_CACHE_BLOCK_HEADER *)block;
2057 	POOL_CACHE_BLOCKID reused_block;
2058 	POOL_CACHE_ITEM_POINTER *cip;
2059 	char *p;
2060 	int i;
2061 
2062 	bh->flags = 0;
2063 	reused_block = *pool_fsmm_clock_hand;
2064 	p = block_address(reused_block);
2065 
2066 	for (i=0;i<bh->num_items;i++)
2067 	{
2068 		cip = item_pointer(p, i);
2069 
2070 		if (!(POOL_ITEM_DELETED & cip->flags))
2071 		{
2072 			pool_hash_delete(&cip->query_hash);
2073 			ereport(DEBUG1,
2074 					(errmsg("pool_reuse_block: blockid: %d item: %d", reused_block, i)));
2075 		}
2076 	}
2077 
2078 	pool_init_cache_block(reused_block);
2079 	pool_update_fsmm(reused_block, POOL_MAX_FREE_SPACE);
2080 
2081 	(*pool_fsmm_clock_hand)++;
2082 	if (*pool_fsmm_clock_hand >= maxblock)
2083 		*pool_fsmm_clock_hand = 0;
2084 
2085 	ereport(LOG,
2086 			(errmsg("pool_reuse_block: blockid: %d", reused_block)));
2087 
2088 	return reused_block;
2089 }
2090 
2091 /*
2092  * Get block id which has enough space
2093  */
pool_get_block(size_t free_space)2094 static POOL_CACHE_BLOCKID pool_get_block(size_t free_space)
2095 {
2096 	int encode_value;
2097 	unsigned char *p = pool_fsmm_address();
2098 	int i;
2099 	int maxblock = pool_get_memqcache_blocks();
2100 	POOL_CACHE_BLOCK_HEADER *bh;
2101 
2102 	if (p == NULL)
2103 	{
2104 		ereport(WARNING,
2105 				(errmsg("memcache: getting block: FSMM is not initialized")));
2106 		return -1;
2107 	}
2108 
2109 	if (free_space > POOL_MAX_FREE_SPACE)
2110 	{
2111 		ereport(WARNING,
2112 			(errmsg("memcache: getting block: invalid free space:%zd", free_space),
2113 				 errdetail("requested free space: %zd is more than maximum allowed space:%lu",free_space,POOL_MAX_FREE_SPACE)));
2114 		return -1;
2115 	}
2116 
2117 	encode_value = free_space/POOL_FSMM_RATIO;
2118 
2119 	for (i=0;i<maxblock;i++)
2120 	{
2121 		if (p[i] >= encode_value)
2122 		{
2123 			/*
2124 			 * This block *may" have enough space.
2125 			 * We need to make sure it actually has enough space.
2126 			 */
2127 			bh = (POOL_CACHE_BLOCK_HEADER *)block_address(i);
2128 			if (bh->free_bytes >= free_space)
2129 			{
2130 				return (POOL_CACHE_BLOCKID)i;
2131 			}
2132 		}
2133 	}
2134 
2135 	/*
2136 	 * No enough space found. Reuse victim block
2137 	 */
2138 	return pool_reuse_block();
2139 }
2140 
2141 /*
2142  * Update free space info for specified block
2143  */
pool_update_fsmm(POOL_CACHE_BLOCKID blockid,size_t free_space)2144 static void pool_update_fsmm(POOL_CACHE_BLOCKID blockid, size_t free_space)
2145 {
2146 	int encode_value;
2147 	char *p = pool_fsmm_address();
2148 
2149 	if (p == NULL)
2150 	{
2151 		ereport(WARNING,
2152 				(errmsg("memcache: updating free space in block: FSMM is not initialized")));
2153 		return;
2154 	}
2155 
2156 	if (blockid >= pool_get_memqcache_blocks())
2157 	{
2158 		ereport(WARNING,
2159 				(errmsg("memcache: updating free space in block: block id:%d in invalid",blockid)));
2160 		return;
2161 	}
2162 
2163 	if (free_space > POOL_MAX_FREE_SPACE)
2164 	{
2165 		ereport(WARNING,
2166 			(errmsg("memcache: updating free space in block: invalid free space:%zd", free_space),
2167 				 errdetail("requested free space: %zd is more than maximum allowed space:%lu",free_space,POOL_MAX_FREE_SPACE)));
2168 
2169 		return;
2170 	}
2171 
2172 	encode_value = free_space/POOL_FSMM_RATIO;
2173 
2174 	p[blockid] = encode_value;
2175 
2176 	return;
2177 }
2178 
2179 /*
2180  * Add item data to shared memory cache.
2181  * On successful registration, returns cache id.
2182  * The cache id is overwritten by the subsequent call to this function.
2183  * On error returns NULL.
2184  */
pool_add_item_shmem_cache(POOL_QUERY_HASH * query_hash,char * data,int size)2185 static POOL_CACHEID *pool_add_item_shmem_cache(POOL_QUERY_HASH *query_hash, char *data, int size)
2186 {
2187 	static POOL_CACHEID cacheid;
2188 	POOL_CACHE_BLOCKID blockid;
2189 	POOL_CACHE_BLOCK_HEADER *bh;
2190 	POOL_CACHE_ITEM_POINTER *cip;
2191 
2192 	POOL_CACHE_ITEM ci;
2193 	POOL_CACHE_ITEM_POINTER cip_body;
2194 	char *item;
2195 
2196 	int request_size;
2197 	char *p;
2198 	int i;
2199 	char *src;
2200 	char *dst;
2201 	int num_deleted;
2202 	char *dcip;
2203 	char *dci;
2204 	bool need_pack;
2205 	char *work_buffer;
2206 	int index;
2207 
2208 	if (query_hash == NULL)
2209 	{
2210 		ereport(LOG,
2211 				(errmsg("error while adding item to shmem cache, query hash is NULL")));
2212 		return NULL;
2213 	}
2214 
2215 	if (data == NULL)
2216 	{
2217 		ereport(LOG,
2218 				(errmsg("error while adding item to shmem cache, data is NULL")));
2219 		return NULL;
2220 	}
2221 
2222 	if (size <= 0)
2223 	{
2224 		ereport(LOG,
2225 				(errmsg("error while adding item to shmem cache, invalid request size: %d", size)));
2226 		return NULL;
2227 	}
2228 
2229 	/* Add overhead */
2230 	request_size = size + sizeof(POOL_CACHE_ITEM_POINTER) + sizeof(POOL_CACHE_ITEM_HEADER);
2231 
2232 	/* Get cache block which has enough space */
2233 	blockid = pool_get_block(request_size);
2234 
2235 	if (blockid == -1)
2236 	{
2237 		return NULL;
2238 	}
2239 
2240 	/*
2241 	 * Initialize the block if necessary. If no live items are
2242 	 * remained, we also initialize the block. If there's contiguous
2243 	 * deleted items, we turn them into free space as well.
2244 	 */
2245 	pool_init_cache_block(blockid);
2246 
2247 	/*
2248 	 * Make sure that we have at least one free hash element.
2249 	 */
2250 	while (!is_free_hash_element())
2251 	{
2252 		/* If not, reuse next victim block */
2253 		blockid = pool_reuse_block();
2254 		pool_init_cache_block(blockid);
2255 	}
2256 
2257 	/* Get block address on shmem */
2258 	p = block_address(blockid);
2259 	bh = (POOL_CACHE_BLOCK_HEADER *)p;
2260 
2261 	/*
2262 	 * Create contiguous free space. We assume that item bodies are
2263 	 * ordered from bottom to top of the block, and corresponding item
2264 	 * pointers are ordered from the youngest to the oldest in the
2265 	 * beginning of the block.
2266 	 */
2267 
2268 	/*
2269 	 * Optimization. If there's no deleted items, we don't need to
2270 	 * pack it to create contiguous free space.
2271 	 */
2272 	need_pack = false;
2273 	for (i=0;i<bh->num_items;i++)
2274 	{
2275 		cip = item_pointer(p, i);
2276 
2277 		if (POOL_ITEM_DELETED & cip->flags)		/* Deleted item? */
2278 		{
2279 			need_pack = true;
2280 			ereport(DEBUG1,
2281 				(errmsg("memcache adding item"),
2282 					 errdetail("start creating contiguous space")));
2283 			break;
2284 		}
2285 	}
2286 
2287 	/*
2288 	 * We disable packing for now.
2289 	 * Revisit and remove following code fragment later.
2290 	 */
2291 	need_pack = false;
2292 
2293 	if (need_pack)
2294 	{
2295 		/*
2296 		 * Pack and create contiguous free space.
2297 		 */
2298 		dcip = calloc(1, pool_config->memqcache_cache_block_size);
2299 		if (!dcip)
2300 		{
2301 			ereport(WARNING,
2302 					(errmsg("memcache: adding item to cache: calloc failed")));
2303 			return NULL;
2304 		}
2305 
2306 		work_buffer = dcip;
2307 		dci = dcip + pool_config->memqcache_cache_block_size;
2308 		num_deleted = 0;
2309 		index = 0;
2310 
2311 		for (i=0;i<bh->num_items;i++)
2312 		{
2313 			int total_length;
2314 			POOL_CACHEID cid;
2315 
2316 			cip = item_pointer(p, i);
2317 
2318 			if (POOL_ITEM_DELETED & cip->flags)		/* Deleted item? */
2319 			{
2320 				num_deleted++;
2321 				continue;
2322 			}
2323 
2324 			/* Copy item body */
2325 			src = p + cip->offset;
2326 			total_length = item_header(p, i)->total_length;
2327 			dst = dci - total_length;
2328 			cip->offset = dst - work_buffer;
2329 			memcpy(dst, src, total_length);
2330 
2331 			dci -= total_length;
2332 
2333 			/* Copy item pointer */
2334 			src = (char *)cip;
2335 			dst = (char *)item_pointer(dcip, index);
2336 			memcpy(dst, src, sizeof(POOL_CACHE_ITEM_POINTER));
2337 
2338 			/* Update hash index */
2339 			cid.blockid = blockid;
2340 			cid.itemid = index;
2341 			pool_hash_insert(&cip->query_hash, &cid, true);
2342 			ereport(DEBUG1,
2343 				(errmsg("memcache adding item"),
2344 					errdetail("item cid updated. old:%d %d new:%d %d",
2345 						   blockid, i, blockid, index)));
2346 			index++;
2347 		}
2348 
2349 		/* All items deleted? */
2350 		if (num_deleted > 0 && num_deleted == bh->num_items)
2351 		{
2352 			ereport(DEBUG1,
2353 				(errmsg("memcache adding item"),
2354 					 errdetail("all items deleted, total deleted:%d", num_deleted)));
2355 			bh->flags = 0;
2356 			pool_init_cache_block(blockid);
2357 			pool_update_fsmm(blockid, POOL_MAX_FREE_SPACE);
2358 		}
2359 		else
2360 		{
2361 			/* Update number of items */
2362 			bh->num_items -= num_deleted;
2363 
2364 			/* Copy back the packed block except block header */
2365 			memcpy(p+sizeof(POOL_CACHE_BLOCK_HEADER),
2366 				   work_buffer+sizeof(POOL_CACHE_BLOCK_HEADER),
2367 				   pool_config->memqcache_cache_block_size-sizeof(POOL_CACHE_BLOCK_HEADER));
2368 		}
2369 		free(work_buffer);
2370 	}
2371 
2372 	/*
2373 	 * Make sure that we have enough free space
2374 	 */
2375 	if (bh->free_bytes < request_size)
2376 	{
2377 		/* This should not happen */
2378 		ereport(WARNING,
2379 			(errmsg("memcache: adding item to cache: not enough space"),
2380 				 errdetail("free space: %d required: %d block id:%d",
2381 						   bh->free_bytes, request_size, blockid)));
2382 		return NULL;
2383 	}
2384 
2385 	/*
2386 	 * At this point, new item can be located at block_header->num_items
2387 	 */
2388 
2389 	/* Fill in cache item header */
2390 	ci.header.timestamp = time(NULL);
2391 	ci.header.total_length = sizeof(POOL_CACHE_ITEM_HEADER) + size;
2392 
2393 	/* Calculate item body address */
2394 	if (bh->num_items == 0)
2395 	{
2396 		/* This is the #0 item. So address is block_bottom -
2397 		 * data_length */
2398 		item = p + pool_config->memqcache_cache_block_size - ci.header.total_length;
2399 
2400 		/* Mark this block used */
2401 		bh->flags = POOL_BLOCK_USED;
2402 	}
2403 	else
2404 	{
2405 		cip = item_pointer(p, bh->num_items-1);
2406 		item = p + cip->offset - ci.header.total_length;
2407 	}
2408 
2409 	/* Copy item header */
2410 	memcpy(item, &ci, sizeof(POOL_CACHE_ITEM_HEADER));
2411 	bh->free_bytes -= sizeof(POOL_CACHE_ITEM_HEADER);
2412 
2413 	/* Copy item body */
2414 	memcpy(item + sizeof(POOL_CACHE_ITEM_HEADER), data, size);
2415 	bh->free_bytes -= size;
2416 
2417 	/* Copy cache item pointer */
2418 	memcpy(&cip_body.query_hash, query_hash, sizeof(POOL_QUERY_HASH));
2419 	memset(&cip_body.next, 0, sizeof(POOL_CACHEID));
2420 	cip_body.offset = item - p;
2421 	cip_body.flags = POOL_ITEM_USED;
2422 	memcpy(item_pointer(p, bh->num_items), &cip_body, sizeof(POOL_CACHE_ITEM_POINTER));
2423 	bh->free_bytes -= sizeof(POOL_CACHE_ITEM_POINTER);
2424 
2425 	/* Update FSMM */
2426 	pool_update_fsmm(blockid, bh->free_bytes);
2427 
2428 	cacheid.blockid = blockid;
2429 	cacheid.itemid = bh->num_items;
2430 	ereport(DEBUG1,
2431 		(errmsg("memcache adding item"),
2432 			errdetail("new item inserted. blockid: %d itemid:%d",
2433 				   cacheid.blockid, cacheid.itemid)));
2434 
2435 	/* Add up number of items */
2436 	bh->num_items++;
2437 
2438 	/* Update hash table */
2439 	if (pool_hash_insert(query_hash, &cacheid, false) < 0)
2440 	{
2441 		ereport(LOG,
2442 				(errmsg("error while adding item to shmem cache, hash insert failed")));
2443 
2444 		/* Since we have failed to insert hash index entry, we need to
2445 		 * undo the addition of cache entry.
2446 		 */
2447 		pool_delete_item_shmem_cache(&cacheid);
2448 		return NULL;
2449 	}
2450 	ereport(DEBUG1,
2451 		(errmsg("memcache adding item"),
2452 			 errdetail("block: %d item: %d", cacheid.blockid, cacheid.itemid)));
2453 
2454 #ifdef SHMEMCACHE_DEBUG
2455 	dump_shmem_cache(blockid);
2456 #endif
2457 	return &cacheid;
2458 }
2459 
2460 /*
2461  * Returns item data address on shared memory cache specified by query hash.
2462  * Also data length is set to *size.
2463  * On error or data not found case returns NULL.
2464  * Detail is set to *sts. (0: success, 1: not found, -1: error)
2465  */
pool_get_item_shmem_cache(POOL_QUERY_HASH * query_hash,int * size,int * sts)2466 static char *pool_get_item_shmem_cache(POOL_QUERY_HASH *query_hash, int *size, int *sts)
2467 {
2468 	POOL_CACHEID *cacheid;
2469 	POOL_CACHE_ITEM_HEADER *cih;
2470 
2471 	if (sts == NULL)
2472 	{
2473 		ereport(LOG,
2474 				(errmsg("error while getting item from shmem cache, sts is NULL")));
2475 		return NULL;
2476 	}
2477 
2478 	*sts = -1;
2479 
2480 	if (query_hash == NULL)
2481 	{
2482 		ereport(LOG,
2483 				(errmsg("error while getting item from shmem cache, query hash is NULL")));
2484 		return NULL;
2485 	}
2486 
2487 	if (size == NULL)
2488 	{
2489 		ereport(LOG,
2490 				(errmsg("error while getting item from shmem cache, size is NULL")));
2491 		return NULL;
2492 	}
2493 
2494 	/*
2495 	 * Find cache header by using hash table
2496 	 */
2497 	cacheid = pool_find_item_on_shmem_cache(query_hash);
2498 	if (cacheid == NULL)
2499 	{
2500 		/* Not found */
2501 		*sts = 1;
2502 		return NULL;
2503 	}
2504 
2505 	cih = pool_cache_item_header(cacheid);
2506 
2507 	*size = cih->total_length - sizeof(POOL_CACHE_ITEM_HEADER);
2508 	return (char *)cih + sizeof(POOL_CACHE_ITEM_HEADER);
2509 }
2510 
2511 /*
2512  * Find data on shared memory cache specified query hash.
2513  * On success returns cache id.
2514  * The cache id is overwritten by the subsequent call to this function.
2515  */
pool_find_item_on_shmem_cache(POOL_QUERY_HASH * query_hash)2516 static POOL_CACHEID *pool_find_item_on_shmem_cache(POOL_QUERY_HASH *query_hash)
2517 {
2518 	static POOL_CACHEID cacheid;
2519 	POOL_CACHEID *c;
2520 	POOL_CACHE_ITEM_HEADER *cih;
2521 	time_t now;
2522 
2523 	c = pool_hash_search(query_hash);
2524 	if (!c)
2525 	{
2526 		return NULL;
2527 	}
2528 
2529 	cih = item_header(block_address(c->blockid), c->itemid);
2530 
2531 	/* Check cache expiration */
2532 	if (pool_config->memqcache_expire > 0)
2533 	{
2534 		now = time(NULL);
2535 		if (now > (cih->timestamp + pool_config->memqcache_expire))
2536 		{
2537 			ereport(DEBUG1,
2538 				(errmsg("memcache finding item"),
2539 					errdetail("cache expired: now: %ld timestamp: %ld",
2540 						   now, cih->timestamp + pool_config->memqcache_expire)));
2541 			pool_delete_item_shmem_cache(c);
2542 			return NULL;
2543 		}
2544 	}
2545 
2546 	cacheid.blockid = c->blockid;
2547 	cacheid.itemid = c->itemid;
2548 	return &cacheid;
2549 }
2550 
2551 /*
2552  * Delete item data specified cache id from shmem.
2553  * On successful deletion, returns 0.
2554  * Other wise return -1.
2555  * FSMM is also updated.
2556  */
pool_delete_item_shmem_cache(POOL_CACHEID * cacheid)2557 static int pool_delete_item_shmem_cache(POOL_CACHEID *cacheid)
2558 {
2559 	POOL_CACHE_BLOCK_HEADER *bh;
2560 	POOL_CACHE_ITEM_POINTER *cip;
2561 	POOL_CACHE_ITEM_HEADER *cih;
2562 	POOL_QUERY_HASH key;
2563 	int size;
2564 	ereport(DEBUG1,
2565 		(errmsg("memcache deleting item data"),
2566 			errdetail("cacheid:%d itemid:%d",cacheid->blockid, cacheid->itemid)));
2567 
2568 	if (cacheid->blockid >= pool_get_memqcache_blocks())
2569 	{
2570 		ereport(LOG,
2571 				(errmsg("error while deleting item from shmem cache, invalid block: %d",
2572 						cacheid->blockid)));
2573 		return -1;
2574 	}
2575 
2576 	bh = (POOL_CACHE_BLOCK_HEADER *)block_address(cacheid->blockid);
2577 	if (!(bh->flags & POOL_BLOCK_USED))
2578 	{
2579 		ereport(LOG,
2580 				(errmsg("error while deleting item from shmem cache, block: %d is not used",
2581 						cacheid->blockid)));
2582 		return -1;
2583 	}
2584 
2585 	if (cacheid->itemid >= bh->num_items)
2586 	{
2587 		/*
2588 		 * This could happen if the block is reused.  Since contents
2589 		 * of oid map file is not updated when the block is reused.
2590 		 */
2591 		ereport(DEBUG1,
2592 			(errmsg("memcache error deleting item data"),
2593 				errdetail("invalid item id %d in block:%d",
2594 					   cacheid->itemid, cacheid->blockid)));
2595 
2596 		return -1;
2597 	}
2598 
2599 	cip = item_pointer(block_address(cacheid->blockid), cacheid->itemid);
2600 	if (!(cip->flags & POOL_ITEM_USED))
2601 	{
2602 		ereport(LOG,
2603 				(errmsg("error while deleting item from shmem cache, item: %d was not used",
2604 						cacheid->itemid)));
2605 		return -1;
2606 	}
2607 
2608 	if (cip->flags & POOL_ITEM_DELETED)
2609 	{
2610 		ereport(LOG,
2611 				(errmsg("error while deleting item from shmem cache, item: %d was already deleted",
2612 						cacheid->itemid)));
2613 		return -1;
2614 	}
2615 
2616 	/* Save cache key */
2617 	memcpy(&key, &cip->query_hash, sizeof(POOL_QUERY_HASH));
2618 
2619 	cih = pool_cache_item_header(cacheid);
2620 	size = cih->total_length + sizeof(POOL_CACHE_ITEM_POINTER);
2621 
2622 	/* Delete item pointer */
2623 	cip->flags |= POOL_ITEM_DELETED;
2624 
2625 	/*
2626 	 * We do NOT count down bh->num_items here. The deleted space will be recycled
2627 	 * by pool_add_item_shmem_cache(). However, if this is the last item, we can
2628 	 * recycle whole block.
2629 	 *
2630 	 * 2012/4/1: Now we do not pack data in
2631 	 * pool_add_item_shmem_cache() for performance reason. Also we
2632 	 * count down num_items if it is the last one.
2633 	 */
2634 	if ((bh->num_items -1) == 0)
2635 	{
2636 		ereport(DEBUG1,
2637 			(errmsg("memcache deleting item data"),
2638 				 errdetail("no item remains. initialize block")));
2639 		bh->flags = 0;
2640 		pool_init_cache_block(cacheid->blockid);
2641 	}
2642 
2643 	/* Remove hash index */
2644 	pool_hash_delete(&key);
2645 
2646 	/*
2647 	 * If the deleted item is last one in the block, we add it to the free space.
2648 	 */
2649 	if (cacheid->itemid == (bh->num_items -1))
2650 	{
2651 		bh->free_bytes += size;
2652 		ereport(DEBUG1,
2653 			(errmsg("memcache deleting item data"),
2654 				errdetail("deleted %d bytes, freebytes is = %d",
2655 					   size, bh->free_bytes)));
2656 
2657 		bh->num_items--;
2658 	}
2659 
2660 	/* Update FSMM */
2661 	pool_update_fsmm(cacheid->blockid, bh->free_bytes);
2662 
2663 	return 0;
2664 }
2665 
2666 /*
2667  * Returns item header specified by cache id.
2668  */
pool_cache_item_header(POOL_CACHEID * cacheid)2669 static POOL_CACHE_ITEM_HEADER *pool_cache_item_header(POOL_CACHEID *cacheid)
2670 {
2671 	POOL_CACHE_BLOCK_HEADER *bh;
2672 
2673 	if (cacheid->blockid >= pool_get_memqcache_blocks())
2674 	{
2675 		ereport(WARNING,
2676 				(errmsg("error while getting cache item header, invalid block id: %d", cacheid->blockid)));
2677 		return NULL;
2678 	}
2679 
2680 	bh = (POOL_CACHE_BLOCK_HEADER *)block_address(cacheid->blockid);
2681 	if (cacheid->itemid >= bh->num_items)
2682 	{
2683 		ereport(WARNING,
2684 				(errmsg("error while getting cache item header, invalid item id: %d", cacheid->itemid)));
2685 		return NULL;
2686 	}
2687 
2688 	return item_header((char *)bh, cacheid->itemid);
2689 }
2690 
2691 /*
2692  * Initialize specified block.
2693  */
pool_init_cache_block(POOL_CACHE_BLOCKID blockid)2694 static int pool_init_cache_block(POOL_CACHE_BLOCKID blockid)
2695 {
2696 	char *p;
2697 	POOL_CACHE_BLOCK_HEADER *bh;
2698 
2699 	if (blockid >= pool_get_memqcache_blocks())
2700 	{
2701 		ereport(WARNING,
2702 				(errmsg("error while initializing cache block, invalid block id: %d", blockid)));
2703 		return -1;
2704 	}
2705 
2706 	p = block_address(blockid);
2707 	bh = (POOL_CACHE_BLOCK_HEADER *)p;
2708 
2709 	/* Is this block used? */
2710 	if (!(bh->flags & POOL_BLOCK_USED))
2711 	{
2712 		/* Initialize empty block */
2713 		memset(p, 0, pool_config->memqcache_cache_block_size);
2714 		bh->free_bytes = pool_config->memqcache_cache_block_size -
2715 			sizeof(POOL_CACHE_BLOCK_HEADER);
2716 	}
2717 	return 0;
2718 }
2719 
2720 #if NOT_USED
2721 /*
2722  * Delete all items in the block.
2723  */
pool_wipe_out_cache_block(POOL_CACHE_BLOCKID blockid)2724 static void pool_wipe_out_cache_block(POOL_CACHE_BLOCKID blockid)
2725 {
2726 	char *p;
2727 	POOL_CACHE_BLOCK_HEADER *bh;
2728 	POOL_CACHE_ITEM_POINTER *cip;
2729 	POOL_CACHEID cacheid;
2730 	int i;
2731 
2732 	/* Get block address on shmem */
2733 	p = block_address(blockid);
2734 	bh = (POOL_CACHE_BLOCK_HEADER *)p;
2735 	cacheid.blockid = blockid;
2736 
2737 	for (i=0;i<bh->num_items;i++)
2738 	{
2739 		cip = item_pointer(p, i);
2740 
2741 		if ((POOL_ITEM_DELETED & cip->flags) == 0)		/* Not deleted item? */
2742 		{
2743 			cacheid.itemid = i;
2744 			pool_delete_item_shmem_cache(&cacheid);
2745 		}
2746 	}
2747 
2748 	bh->flags = 0;
2749 	pool_init_cache_block(blockid);
2750 	pool_update_fsmm(blockid, POOL_MAX_FREE_SPACE);
2751 }
2752 #endif
2753 
2754 /*
2755  * Acquire lock: XXX giant lock
2756  */
pool_shmem_lock(void)2757 void pool_shmem_lock(void)
2758 {
2759 	if (pool_is_shmem_cache())
2760 	{
2761 		pool_semaphore_lock(SHM_CACHE_SEM);
2762 
2763 	}
2764 }
2765 
2766 /*
2767  * Release lock
2768  */
pool_shmem_unlock(void)2769 void pool_shmem_unlock(void)
2770 {
2771 	if (pool_is_shmem_cache())
2772 	{
2773 		pool_semaphore_unlock(SHM_CACHE_SEM);
2774 	}
2775 }
2776 
2777 /*
2778  * Returns cache block address specified by block id
2779  */
block_address(int blockid)2780 static char *block_address(int blockid)
2781 {
2782 	char *p;
2783 
2784 	p = pool_memory_cache_address() +
2785 		blockid *	pool_config->memqcache_cache_block_size;
2786 	return p;
2787 }
2788 
2789 /*
2790  * Returns i th item pointer in block address block
2791  */
item_pointer(char * block,int i)2792 static POOL_CACHE_ITEM_POINTER *item_pointer(char *block, int i)
2793 {
2794 	return (POOL_CACHE_ITEM_POINTER *)(block + sizeof(POOL_CACHE_BLOCK_HEADER) +
2795 									   sizeof(POOL_CACHE_ITEM_POINTER) * i);
2796 }
2797 
2798 /*
2799  * Returns i th item header in block address block
2800  */
item_header(char * block,int i)2801 static POOL_CACHE_ITEM_HEADER *item_header(char *block, int i)
2802 {
2803 	POOL_CACHE_ITEM_POINTER *cip;
2804 
2805 	cip = item_pointer(block, i);
2806 	return (POOL_CACHE_ITEM_HEADER *)(block + cip->offset);
2807 }
2808 
2809 #ifdef SHMEMCACHE_DEBUG
2810 /*
2811  * Dump shmem cache block
2812  */
dump_shmem_cache(POOL_CACHE_BLOCKID blockid)2813 static void dump_shmem_cache(POOL_CACHE_BLOCKID blockid)
2814 {
2815 	POOL_CACHE_BLOCK_HEADER *bh;
2816 	POOL_CACHE_ITEM_POINTER *cip;
2817 	POOL_CACHE_ITEM_HEADER *cih;
2818 	int i;
2819 
2820 	bh = (POOL_CACHE_BLOCK_HEADER *)block_address(blockid);
2821 	fprintf(stderr, "shmem: block header(%lu bytes): flags:%x num_items:%d free_bytes:%d\n",
2822 			sizeof(*bh), bh->flags, bh->num_items, bh->free_bytes);
2823 	for (i=0;i<bh->num_items;i++)
2824 	{
2825 		cip = item_pointer((char *)bh, i);
2826 		fprintf(stderr, "shmem: block: %d %d th item pointer(%lu bytes): offset:%d flags:%x\n",
2827 				blockid, i, sizeof(*cip), cip->offset, cip->flags);
2828 		cih = item_header((char *)bh, i);
2829 		fprintf(stderr, "shmem: block: %d %d th item header(%lu bytes): timestamp:%ld length:%d\n",
2830 				blockid, i, sizeof(*cih), cih->timestamp, cih->total_length);
2831 	}
2832 }
2833 #endif
2834 
2835 /*
2836  * SELECT query result array modules
2837  */
pool_create_query_cache_array(void)2838 POOL_QUERY_CACHE_ARRAY *pool_create_query_cache_array(void)
2839 {
2840 #define POOL_QUERY_CACHE_ARRAY_ALLOCATE_NUM 128
2841 #define POOL_QUERY_CACHE_ARRAY_HEADER_SIZE (sizeof(int)+sizeof(int))
2842 
2843 	size_t size;
2844 	POOL_QUERY_CACHE_ARRAY *p;
2845 	POOL_SESSION_CONTEXT *session_context =pool_get_session_context(false);
2846 	MemoryContext old_context = MemoryContextSwitchTo(session_context->memory_context);
2847 
2848 	size = POOL_QUERY_CACHE_ARRAY_HEADER_SIZE + POOL_QUERY_CACHE_ARRAY_ALLOCATE_NUM *
2849 		sizeof(POOL_TEMP_QUERY_CACHE *);
2850 	p = palloc(size);
2851 	p->num_caches = 0;
2852 	p->array_size = POOL_QUERY_CACHE_ARRAY_ALLOCATE_NUM;
2853     MemoryContextSwitchTo(old_context);
2854 	return p;
2855 }
2856 
2857 /*
2858  * Discard query cache array
2859  */
pool_discard_query_cache_array(POOL_QUERY_CACHE_ARRAY * cache_array)2860 void pool_discard_query_cache_array(POOL_QUERY_CACHE_ARRAY *cache_array)
2861 {
2862 	int i;
2863 
2864 	if (!cache_array)
2865 		return;
2866 
2867 	ereport(DEBUG1,
2868 		(errmsg("memcache discarding query cache array"),
2869 			 errdetail("num_caches: %d", cache_array->num_caches)));
2870 
2871 	for (i=0;i<cache_array->num_caches;i++)
2872 	{
2873 		ereport(DEBUG2,
2874 			(errmsg("memcache discarding query cache array"),
2875 				 errdetail("cache no: %d cache: %p", i, cache_array->caches[i])));
2876 		pool_discard_temp_query_cache(cache_array->caches[i]);
2877 	}
2878 	pfree(cache_array);
2879 }
2880 
2881 /*
2882  * Add query cache array
2883  */
pool_add_query_cache_array(POOL_QUERY_CACHE_ARRAY * cache_array,POOL_TEMP_QUERY_CACHE * cache)2884 static POOL_QUERY_CACHE_ARRAY * pool_add_query_cache_array(POOL_QUERY_CACHE_ARRAY *cache_array, POOL_TEMP_QUERY_CACHE *cache)
2885 {
2886 	size_t size;
2887 	POOL_QUERY_CACHE_ARRAY *cp = cache_array;
2888 
2889 	if (!cache_array)
2890 		return cp;
2891 
2892 	ereport(DEBUG2,
2893 		(errmsg("memcache adding query cache array"),
2894 			 errdetail("num_caches: %d cache: %p", cache_array->num_caches, cache)));
2895 	if (cache_array->num_caches >= 	cache_array->array_size)
2896 	{
2897 		cache_array->array_size += POOL_QUERY_CACHE_ARRAY_ALLOCATE_NUM;
2898 		size = POOL_QUERY_CACHE_ARRAY_HEADER_SIZE + cache_array->array_size *
2899 			sizeof(POOL_TEMP_QUERY_CACHE *);
2900 		cache_array = repalloc(cache_array, size);
2901 	}
2902 	cache_array->caches[cache_array->num_caches++] = cache;
2903 	return cache_array;
2904 }
2905 
2906 /*
2907  * SELECT query result temporary cache modules
2908  */
2909 
2910 /*
2911  * Create SELECT result temporary cache
2912  */
pool_create_temp_query_cache(char * query)2913 POOL_TEMP_QUERY_CACHE *pool_create_temp_query_cache(char *query)
2914 {
2915 	POOL_SESSION_CONTEXT *session_context =pool_get_session_context(false);
2916     MemoryContext old_context = MemoryContextSwitchTo(session_context->memory_context);
2917 	POOL_TEMP_QUERY_CACHE *p;
2918 	p = palloc(sizeof(*p));
2919     p->query = pstrdup(query);
2920 
2921     p->buffer = pool_create_buffer();
2922     p->oids = pool_create_buffer();
2923     p->num_oids = 0;
2924     p->is_exceeded = false;
2925     p->is_discarded = false;
2926 
2927     MemoryContextSwitchTo(old_context);
2928 
2929 	ereport(DEBUG1,
2930 			(errmsg("pool_create_temp_query_cache: cache created: %p", p)));
2931 
2932 	return p;
2933 }
2934 
2935 /*
2936  * Discard temp query cache
2937  */
pool_discard_temp_query_cache(POOL_TEMP_QUERY_CACHE * temp_cache)2938 void pool_discard_temp_query_cache(POOL_TEMP_QUERY_CACHE *temp_cache)
2939 {
2940 	if (!temp_cache)
2941 		return;
2942 
2943 	if (temp_cache->query)
2944 		pfree(temp_cache->query);
2945 	if (temp_cache->buffer)
2946 		pool_discard_buffer(temp_cache->buffer);
2947 	if (temp_cache->oids)
2948 		pool_discard_buffer(temp_cache->oids);
2949 
2950 	ereport(DEBUG1,
2951 			(errmsg("pool_discard_temp_query_cache: cache discarded: %p", temp_cache)));
2952 
2953 	pfree(temp_cache);
2954 }
2955 
2956 /*
2957  * Add data to temp query cache.
2958  * Data must be FE/BE protocol packet.
2959  */
pool_add_temp_query_cache(POOL_TEMP_QUERY_CACHE * temp_cache,char kind,char * data,int data_len)2960 static void pool_add_temp_query_cache(POOL_TEMP_QUERY_CACHE *temp_cache, char kind, char *data, int data_len)
2961 {
2962 	POOL_INTERNAL_BUFFER *buffer;
2963 	size_t buflen;
2964 	int send_len;
2965 
2966 	if (temp_cache == NULL)
2967 	{
2968 		/* This could happen if cache exceeded in previous query
2969 		 * execution in the same unnamed portal.
2970 		 */
2971 		ereport(DEBUG1,
2972 			(errmsg("memcache adding temporary query cache"),
2973 				 errdetail("POOL_TEMP_QUERY_CACHE is NULL")));
2974 		return;
2975 	}
2976 
2977 	if (temp_cache->is_exceeded)
2978 	{
2979 		ereport(DEBUG1,
2980 			(errmsg("memcache adding temporary query cache"),
2981 				 errdetail("memqcache_maxcache exceeds")));
2982 		return;
2983 	}
2984 
2985 	/*
2986 	 * We only store T(Table Description), D(Data row), C(Command Complete),
2987 	 * 1(ParseComplete), 2(BindComplete)
2988 	 */
2989     if (kind != 'T' && kind != 'D' && kind != 'C' && kind != '1' && kind != '2')
2990     {
2991 		return;
2992 	}
2993 
2994 	/* Check data limit */
2995 	buffer = temp_cache->buffer;
2996 	buflen = pool_get_buffer_length(buffer);
2997 
2998 	if ((buflen+data_len+sizeof(int)+1) > pool_config->memqcache_maxcache)
2999 	{
3000 		ereport(DEBUG1,
3001 			(errmsg("memcache adding temporary query cache"),
3002 				errdetail("data size exceeds memqcache_maxcache. current:%zd requested:%zd memq_maxcache:%d",
3003 					   buflen, data_len+sizeof(int)+1, pool_config->memqcache_maxcache)));
3004 		temp_cache->is_exceeded = true;
3005 		return;
3006 	}
3007 
3008 	pool_add_buffer(buffer, &kind, 1);
3009 	send_len = htonl(data_len + sizeof(int));
3010 	pool_add_buffer(buffer, (char *)&send_len, sizeof(int));
3011 	pool_add_buffer(buffer, data, data_len);
3012 
3013 	return;
3014 }
3015 
3016 /*
3017  * Add table oids used by SELECT to temp query cache.
3018  */
pool_add_oids_temp_query_cache(POOL_TEMP_QUERY_CACHE * temp_cache,int num_oids,int * oids)3019 static void pool_add_oids_temp_query_cache(POOL_TEMP_QUERY_CACHE *temp_cache, int num_oids, int *oids)
3020 {
3021 	POOL_INTERNAL_BUFFER *buffer;
3022 
3023 	if (!temp_cache || num_oids <= 0)
3024 		return;
3025 
3026 	buffer = temp_cache->oids;
3027 	pool_add_buffer(buffer, oids, num_oids*sizeof(int));
3028 	temp_cache->num_oids = num_oids;
3029 }
3030 
3031 /*
3032  * Internal buffer management modules.
3033  * Usage:
3034  * 1) Create buffer using pool_create_buffer().
3035  * 2) Add data to buffer using pool_add_buffer().
3036  * 3) Extract (copied) data from buffer using pool_get_buffer().
3037  * 4) Optionally you can:
3038  *		Obtain buffer length by using pool_get_buffer_length().
3039  *		Obtain buffer pointer by using pool_get_buffer_pointer().
3040  * 5) Discard buffer using pool_discard_buffer().
3041  */
3042 
3043 /*
3044  * Create and return internal buffer
3045  */
pool_create_buffer(void)3046 static POOL_INTERNAL_BUFFER *pool_create_buffer(void)
3047 {
3048 	POOL_INTERNAL_BUFFER *p;
3049 	p = palloc0(sizeof(*p));
3050 	return p;
3051 }
3052 
3053 /*
3054  * Discard internal buffer
3055  */
pool_discard_buffer(POOL_INTERNAL_BUFFER * buffer)3056 static void pool_discard_buffer(POOL_INTERNAL_BUFFER *buffer)
3057 {
3058 	if (buffer)
3059 	{
3060 		if (buffer->buf)
3061 			pfree(buffer->buf);
3062 		pfree(buffer);
3063 	}
3064 }
3065 
3066 /*
3067  * Add data to internal buffer
3068  */
pool_add_buffer(POOL_INTERNAL_BUFFER * buffer,void * data,size_t len)3069 static void pool_add_buffer(POOL_INTERNAL_BUFFER *buffer, void *data, size_t len)
3070 {
3071 #define POOL_ALLOCATE_UNIT 8192
3072 
3073 	/* Sanity check */
3074 	if (!buffer || !data || len == 0)
3075 		return;
3076 	POOL_SESSION_CONTEXT *session_context =pool_get_session_context(false);
3077 	MemoryContext old_context = MemoryContextSwitchTo(session_context->memory_context);
3078 
3079 	/* Check if we need to increase the buffer size */
3080 	if ((buffer->buflen + len) > buffer->bufsize)
3081 	{
3082 		size_t allocate_size = ((buffer->buflen + len)/POOL_ALLOCATE_UNIT +1)*POOL_ALLOCATE_UNIT;
3083 		ereport(DEBUG2,
3084 			(errmsg("memcache adding data to internal buffer"),
3085 				errdetail("realloc old size:%zd new size:%zd",
3086 					   buffer->bufsize, allocate_size)));
3087 		buffer->bufsize = allocate_size;
3088 		buffer->buf = (char *)repalloc(buffer->buf, buffer->bufsize);
3089 	}
3090 	/* Add data to buffer */
3091 	memcpy(buffer->buf+buffer->buflen, data, len);
3092 	buffer->buflen += len;
3093 	ereport(DEBUG2,
3094 		(errmsg("memcache adding data to internal buffer"),
3095 			errdetail("len:%zd, total:%zd bufsize:%zd",
3096 				   len, buffer->buflen, buffer->bufsize)));
3097 	MemoryContextSwitchTo(old_context);
3098 	return;
3099 }
3100 
3101 /*
3102  * Get data from internal buffer.
3103  * Data is stored in newly malloc memory.
3104  * Data length is returned to len.
3105  */
pool_get_buffer(POOL_INTERNAL_BUFFER * buffer,size_t * len)3106 static void *pool_get_buffer(POOL_INTERNAL_BUFFER *buffer, size_t *len)
3107 {
3108 	void *p;
3109 
3110 	if (buffer->bufsize == 0 || buffer->buflen == 0 ||
3111 		buffer->buf == NULL)
3112 	{
3113 		*len = 0;
3114 		return NULL;
3115 	}
3116 
3117 	p = palloc(buffer->buflen);
3118 	memcpy(p, buffer->buf, buffer->buflen);
3119 	*len = buffer->buflen;
3120 	return p;
3121 }
3122 
3123 /*
3124  * Get internal buffer length.
3125  */
pool_get_buffer_length(POOL_INTERNAL_BUFFER * buffer)3126 static size_t pool_get_buffer_length(POOL_INTERNAL_BUFFER *buffer)
3127 {
3128 	if (buffer == NULL)
3129 		return 0;
3130 
3131 	return buffer->buflen;
3132 }
3133 
3134 #ifdef NOT_USED
3135 /*
3136  * Get internal buffer pointer.
3137  */
pool_get_buffer_pointer(POOL_INTERNAL_BUFFER * buffer)3138 static char *pool_get_buffer_pointer(POOL_INTERNAL_BUFFER *buffer)
3139 {
3140 	if (buffer == NULL)
3141 		return NULL;
3142 	return buffer->buf;
3143 }
3144 #endif
3145 /*
3146  * Get query cache buffer struct of current query context
3147  */
pool_get_current_cache(void)3148 POOL_TEMP_QUERY_CACHE *pool_get_current_cache(void)
3149 {
3150 	POOL_SESSION_CONTEXT *session_context;
3151 	POOL_QUERY_CONTEXT *query_context;
3152 	POOL_TEMP_QUERY_CACHE *p = NULL;
3153 
3154 	session_context = pool_get_session_context(true);
3155 	if (session_context)
3156 	{
3157 		query_context = session_context->query_context;
3158 		if (query_context)
3159 		{
3160 			p = query_context->temp_cache;
3161 		}
3162 	}
3163 	return p;
3164 }
3165 
3166 /*
3167  * Get query cache buffer of current query context
3168  */
pool_get_current_cache_buffer(size_t * len)3169 static char *pool_get_current_cache_buffer(size_t *len)
3170 {
3171 	char *p = NULL;
3172 	*len = 0;
3173 	POOL_TEMP_QUERY_CACHE *cache;
3174 
3175 	cache = pool_get_current_cache();
3176 	if (cache)
3177 	{
3178 		p = pool_get_buffer(cache->buffer, len);
3179 	}
3180 	return p;
3181 }
3182 
3183 /*
3184  * Mark this temporary query cache buffer discarded if the SELECT
3185  * uses the table oid specified by oids.
3186  */
pool_check_and_discard_cache_buffer(int num_oids,int * oids)3187 static void pool_check_and_discard_cache_buffer(int num_oids, int *oids)
3188 {
3189 	POOL_SESSION_CONTEXT *session_context;
3190 	POOL_TEMP_QUERY_CACHE *cache;
3191 	int num_caches;
3192 	size_t len;
3193 	int *soids;
3194 	int i, j, k;
3195 
3196 	session_context = pool_get_session_context(true);
3197 
3198 	if (!session_context || !session_context->query_cache_array)
3199 		return;
3200 
3201 	num_caches = session_context->query_cache_array->num_caches;
3202 
3203 	for (i=0;i<num_caches;i++)
3204 	{
3205 		cache = session_context->query_cache_array->caches[i];
3206 		if (!cache || cache->is_discarded)
3207 			continue;
3208 
3209 		soids = (int *)pool_get_buffer(cache->oids, &len);
3210 		if (!soids || !len)
3211 			continue;
3212 
3213 		for(j=0;j<cache->num_oids;j++)
3214 		{
3215 			if (cache->is_discarded)
3216 				break;
3217 
3218 			for (k=0;k<num_oids;k++)
3219 			{
3220 				if (soids[j] == oids[k])
3221 				{
3222 					ereport(DEBUG1,
3223 							(errmsg("discard cache for \"%s\"",cache->query)));
3224 					cache->is_discarded = true;
3225 					break;
3226 				}
3227 			}
3228 		}
3229 		pfree(soids);
3230 	}
3231 }
3232 
3233 /*
3234  * At Ready for Query or Comand Complete handle query cache.  For streaming
3235  * replication mode and extended query at Comand Complete handle query cache.
3236  * For other case At Ready for Query handle query cache.
3237  */
pool_handle_query_cache(POOL_CONNECTION_POOL * backend,char * query,Node * node,char state)3238 void pool_handle_query_cache(POOL_CONNECTION_POOL *backend, char *query, Node *node, char state)
3239 {
3240 	POOL_SESSION_CONTEXT *session_context;
3241 	pool_sigset_t oldmask;
3242 	char *cache_buffer;
3243 	size_t len;
3244 	int num_oids;
3245 	int *oids;
3246 	int i;
3247 
3248 	session_context = pool_get_session_context(true);
3249 
3250 	/* Ok to cache SELECT result? */
3251 	if (pool_is_cache_safe())
3252 	{
3253 		SelectContext ctx;
3254 		MemoryContext old_context;
3255 		old_context = MemoryContextSwitchTo(session_context->memory_context);
3256 		num_oids = pool_extract_table_oids_from_select_stmt(node, &ctx);
3257 		MemoryContextSwitchTo(old_context);
3258 		oids = ctx.table_oids;
3259 		ereport(DEBUG2,
3260 			(errmsg("query cache handler for ReadyForQuery"),
3261 				 errdetail("num_oids: %d oid: %d", num_oids, *oids)));
3262 
3263 		if (state == 'I')		/* Not inside a transaction? */
3264 		{
3265 			/*
3266 			 * Make sure that temporary cache is not exceeded.
3267 			 */
3268 			if (!pool_is_cache_exceeded())
3269 			{
3270 				POOL_TEMP_QUERY_CACHE *cache;
3271 				/*
3272 				 * If we are not inside a transaction, we can
3273 				 * immediately register to cache storage.
3274 				 */
3275 				/* Register to memcached or shmem */
3276 				POOL_SETMASK2(&BlockSig, &oldmask);
3277 				pool_shmem_lock();
3278 
3279 				cache_buffer =  pool_get_current_cache_buffer(&len);
3280 				if (cache_buffer)
3281 				{
3282 					if (session_context->query_context->skip_cache_commit == false)
3283 					{
3284 						if (pool_commit_cache(backend, query, cache_buffer, len, num_oids, oids) != 0)
3285 						{
3286 							ereport(WARNING,
3287 									(errmsg("ReadyForQuery: pool_commit_cache failed")));
3288 						}
3289 					}
3290 					/*
3291 					 * Reset temporary query cache buffer. This is
3292 					 * necessary if extended query protocol is used and a
3293 					 * bind/execute message arrives which uses a statement
3294 					 * created by prior parse message. In this case since
3295 					 * the temp_cache is not initialized by a parse
3296 					 * message, messages are added to pre existing temp
3297 					 * cache buffer. The problem was found in bug#152.
3298 					 * http://www.pgpool.net/mantisbt/view.php?id=152
3299 					 */
3300 					cache = pool_get_current_cache();
3301 					ereport(DEBUG1,
3302 							(errmsg("pool_handle_query_cache: temp_cache: %p", cache)));
3303 					pool_discard_temp_query_cache(cache);
3304 
3305 					if (STREAM && pool_is_doing_extended_query_message())
3306 						session_context->query_context->temp_cache = NULL;
3307 					else
3308 						session_context->query_context->temp_cache = pool_create_temp_query_cache(query);
3309 					pfree(cache_buffer);
3310 				}
3311 				pool_shmem_unlock();
3312 				POOL_SETMASK(&oldmask);
3313 			}
3314 
3315 			/* Count up SELECT stats */
3316 			pool_stats_count_up_num_selects(1);
3317 
3318 			/* Reset temp buffer */
3319 			pool_reset_memqcache_buffer(true);
3320 		}
3321 		else
3322 		{
3323 			POOL_TEMP_QUERY_CACHE *cache = pool_get_current_cache();
3324 
3325 			/* In transaction. Keep to temp query cache array */
3326 			pool_add_oids_temp_query_cache(cache, num_oids, oids);
3327 
3328 			/*
3329 			 * If temp cache has been overflowed, just trash the half
3330 			 * baked temp cache.
3331 			 */
3332 			if (pool_is_cache_exceeded())
3333 			{
3334 				POOL_TEMP_QUERY_CACHE *cache;
3335 
3336 				cache = pool_get_current_cache();
3337 				pool_discard_temp_query_cache(cache);
3338 				/*
3339 				 * Reset temp_cache pointer in the current query context
3340 				 * so that we don't double free memory.
3341 				 */
3342 				session_context->query_context->temp_cache = NULL;
3343 
3344 			}
3345 			/*
3346 			 * Otherwise add to the temp cache array.
3347 			 */
3348 			else
3349 			{
3350 				session_context->query_cache_array =
3351 					pool_add_query_cache_array(session_context->query_cache_array, cache);
3352 				/*
3353 				 * Reset temp_cache pointer in the current query
3354 				 * context so that we don't add the same temp cache to
3355 				 * the cache array. This is necessary such that case
3356 				 * when next query is just a "bind message", without
3357 				 * "parse message". In the case the query context is
3358 				 * reused and same cache pointer will be added to the
3359 				 * query_cache_array which we do not want.
3360 				 */
3361 				session_context->query_context->temp_cache = NULL;
3362 			}
3363 
3364 			/* Count up temporary SELECT stats */
3365 			pool_tmp_stats_count_up_num_selects();
3366 		}
3367 	}
3368 	else if (is_rollback_query(node))	/* Rollback? */
3369 	{
3370 		/* Discard buffered data */
3371 		pool_reset_memqcache_buffer(true);
3372 	}
3373 	else if (is_commit_query(node))		/* Commit? */
3374 	{
3375 		int num_caches;
3376 
3377 		POOL_SETMASK2(&BlockSig, &oldmask);
3378 		pool_shmem_lock();
3379 
3380 		/* Invalidate query cache */
3381 		if (pool_config->memqcache_auto_cache_invalidation)
3382 		{
3383 			num_oids = pool_get_dml_table_oid(&oids);
3384 			pool_invalidate_query_cache(num_oids, oids, true, 0);
3385 		}
3386 
3387 		/*
3388 		 * If we have something in the query cache buffer, that means
3389 		 * either:
3390 		 * - We only had SELECTs in the transaction
3391 		 * - We had only SELECTs after last DML
3392 		 * Thus we can register SELECT results to cache storage.
3393 		 */
3394 		num_caches = session_context->query_cache_array->num_caches;
3395 		for (i=0;i<num_caches;i++)
3396 		{
3397 			POOL_TEMP_QUERY_CACHE *cache;
3398 
3399 			cache = session_context->query_cache_array->caches[i];
3400 			if (!cache || cache->is_discarded)
3401 				continue;
3402 
3403 			num_oids = cache->num_oids;
3404 			oids = pool_get_buffer(cache->oids, &len);
3405 			cache_buffer = pool_get_buffer(cache->buffer, &len);
3406 
3407 			if (pool_commit_cache(backend, cache->query, cache_buffer, len, num_oids, oids) != 0)
3408 			{
3409 				ereport(WARNING,
3410 						(errmsg("ReadyForQuery: pool_commit_cache failed")));
3411 			}
3412 			if (oids)
3413 				pfree(oids);
3414 			if (cache_buffer)
3415 				pfree(cache_buffer);
3416 		}
3417 		pool_shmem_unlock();
3418 		POOL_SETMASK(&oldmask);
3419 
3420 		/* Count up number of SELECT stats */
3421 		pool_stats_count_up_num_selects(pool_tmp_stats_get_num_selects());
3422 
3423 		pool_reset_memqcache_buffer(true);
3424 	}
3425 	else		/* Non cache safe queries */
3426 	{
3427 		/* Non cachable SELECT */
3428 		if (node && IsA(node, SelectStmt))
3429 		{
3430 			/* Extract table oids from buffer */
3431 			num_oids = pool_get_dml_table_oid(&oids);
3432 
3433 			if (state == 'I')
3434 			{
3435 				/*
3436 				 * If Data-modifying statements in SELECT's WITH clause,
3437 				 * invalidate query cache.
3438 				 */
3439 				if (num_oids > 0 && pool_config->memqcache_auto_cache_invalidation)
3440 				{
3441 					POOL_SETMASK2(&BlockSig, &oldmask);
3442 					pool_shmem_lock();
3443 					pool_invalidate_query_cache(num_oids, oids, true, 0);
3444 					pool_shmem_unlock();
3445 					POOL_SETMASK(&oldmask);
3446 				}
3447 
3448 				/* Count up SELECT stats */
3449 				pool_stats_count_up_num_selects(1);
3450 				pool_reset_memqcache_buffer(true);
3451 			}
3452 			else
3453 			{
3454 				/*
3455 				 * If we are inside a transaction, we cannot invalidate
3456 				 * query cache yet. However we can clear cache buffer, if
3457 				 * DML/DDL modifies the TABLE which SELECT uses.
3458 				 */
3459 				if (num_oids > 0 && pool_config->memqcache_auto_cache_invalidation)
3460 				{
3461 					pool_check_and_discard_cache_buffer(num_oids, oids);
3462 					pool_reset_memqcache_buffer(false);
3463 				}
3464 
3465 				/* Count up temporary SELECT stats */
3466 				pool_tmp_stats_count_up_num_selects();
3467 			}
3468 		}
3469 		/*
3470 		 * If the query is DROP DATABASE, discard both of caches in shmem/memcached and
3471 		 * oidmap in memqcache_oiddir.
3472 		 */
3473 		else if (is_drop_database(node) && session_context->query_context->dboid != 0)
3474 		{
3475 			int dboid = session_context->query_context->dboid;
3476 			num_oids = pool_get_dropdb_table_oids(&oids, dboid);
3477 
3478 			if (num_oids > 0 && pool_config->memqcache_auto_cache_invalidation)
3479 			{
3480 				pool_shmem_lock();
3481 				pool_invalidate_query_cache(num_oids, oids, true, dboid);
3482 				pool_discard_oid_maps_by_db(dboid);
3483 				pool_shmem_unlock();
3484 				pool_reset_memqcache_buffer(true);
3485 
3486 				pfree(oids);
3487 				ereport(DEBUG2,
3488 					(errmsg("query cache handler for ReadyForQuery"),
3489 						 errdetail("deleted all cache files for the DROPped DB")));
3490 			}
3491 		}
3492 		else
3493 		{
3494 			/*
3495 			 * DML/DCL/DDL case
3496 			 */
3497 
3498 			/* Extract table oids from buffer */
3499 			num_oids = pool_get_dml_table_oid(&oids);
3500 			if (num_oids > 0 && pool_config->memqcache_auto_cache_invalidation)
3501 			{
3502 				/*
3503 				 * If we are not inside a transaction, we can
3504 				 * immediately invalidate query cache.
3505 				 */
3506 				if (state == 'I')
3507 				{
3508 					POOL_SETMASK2(&BlockSig, &oldmask);
3509 					pool_shmem_lock();
3510 					pool_invalidate_query_cache(num_oids, oids, true, 0);
3511 					pool_shmem_unlock();
3512 					POOL_SETMASK(&oldmask);
3513 					pool_reset_memqcache_buffer(true);
3514 				}
3515 				else
3516 				{
3517 					/*
3518 					 * If we are inside a transaction, we
3519 					 * cannot invalidate query cache
3520 					 * yet. However we can clear cache buffer,
3521 					 * if DML/DDL modifies the TABLE which SELECT uses.
3522 					 */
3523 					pool_check_and_discard_cache_buffer(num_oids, oids);
3524 					pool_reset_memqcache_buffer(false);
3525 				}
3526 			}
3527 			else if (num_oids == 0)
3528 			{
3529 				/*
3530 				 * It is also necessary to clear cache buffers in case of
3531 				 * no oid queries (like BEGIN, CHECKPOINT, VACUUM, etc) too.
3532 				 */
3533 				pool_reset_memqcache_buffer(true);
3534 			}
3535 		}
3536 	}
3537 }
3538 
3539 /*
3540  * Create and initialize query cache stats
3541  */
3542 static POOL_QUERY_CACHE_STATS *stats;
pool_init_memqcache_stats(void)3543 int pool_init_memqcache_stats(void)
3544 {
3545 	stats = pool_shared_memory_create(sizeof(POOL_QUERY_CACHE_STATS));
3546 	pool_reset_memqcache_stats();
3547 	return 0;
3548 }
3549 
3550 /*
3551  * Returns copy of stats area. The copy is in static area and will be
3552  * overwritten by next call to this function.
3553  */
pool_get_memqcache_stats(void)3554 POOL_QUERY_CACHE_STATS *pool_get_memqcache_stats(void)
3555 {
3556 	static POOL_QUERY_CACHE_STATS mystats;
3557 	pool_sigset_t oldmask;
3558 
3559 	memset(&mystats, 0, sizeof(POOL_QUERY_CACHE_STATS));
3560 
3561 	if (stats)
3562 	{
3563 		POOL_SETMASK2(&BlockSig, &oldmask);
3564 		pool_semaphore_lock(QUERY_CACHE_STATS_SEM);
3565 		memcpy(&mystats, stats, sizeof(POOL_QUERY_CACHE_STATS));
3566 		pool_semaphore_unlock(QUERY_CACHE_STATS_SEM);
3567 		POOL_SETMASK(&oldmask);
3568 	}
3569 
3570 	return &mystats;
3571 }
3572 
3573 /*
3574  * Reset query cache stats. Caller must lock QUERY_CACHE_STATS_SEM if
3575  * necessary.
3576  */
pool_reset_memqcache_stats(void)3577 void pool_reset_memqcache_stats(void)
3578 {
3579 	memset(stats, 0, sizeof(POOL_QUERY_CACHE_STATS));
3580 	stats->start_time = time(NULL);
3581 }
3582 
3583 /*
3584  * Count up number of successful SELECTs and returns the number.
3585  * QUERY_CACHE_STATS_SEM lock is acquired in this function.
3586  */
pool_stats_count_up_num_selects(long long int num)3587 long long int pool_stats_count_up_num_selects(long long int num)
3588 {
3589 	pool_sigset_t oldmask;
3590 
3591 	POOL_SETMASK2(&BlockSig, &oldmask);
3592 	pool_semaphore_lock(QUERY_CACHE_STATS_SEM);
3593 	stats->num_selects += num;
3594 	pool_semaphore_unlock(QUERY_CACHE_STATS_SEM);
3595 	POOL_SETMASK(&oldmask);
3596 	return stats->num_selects;
3597 }
3598 
3599 /*
3600  * Count up number of successful SELECTs in temporary area and returns
3601  * the number.
3602  */
pool_tmp_stats_count_up_num_selects(void)3603 long long int pool_tmp_stats_count_up_num_selects(void)
3604 {
3605 	POOL_SESSION_CONTEXT *session_context;
3606 
3607 	session_context = pool_get_session_context(false);
3608 	session_context->num_selects++;
3609 	return 	session_context->num_selects;
3610 }
3611 
3612 /*
3613  * Return number of successful SELECTs in temporary area.
3614  */
pool_tmp_stats_get_num_selects(void)3615 long long int pool_tmp_stats_get_num_selects(void)
3616 {
3617 	POOL_SESSION_CONTEXT *session_context;
3618 
3619 	session_context = pool_get_session_context(false);
3620 	return session_context->num_selects;
3621 }
3622 
3623 /*
3624  * Reset number of successful SELECTs in temporary area.
3625  */
pool_tmp_stats_reset_num_selects(void)3626 void pool_tmp_stats_reset_num_selects(void)
3627 {
3628 	POOL_SESSION_CONTEXT *session_context;
3629 
3630 	session_context = pool_get_session_context(false);
3631 	session_context->num_selects = 0;
3632 }
3633 
3634 /*
3635  * Count up number of SELECTs extracted from cache returns the number.
3636  * QUERY_CACHE_STATS_SEM lock is acquired in this function.
3637  */
pool_stats_count_up_num_cache_hits(void)3638 long long int pool_stats_count_up_num_cache_hits(void)
3639 {
3640 	pool_sigset_t oldmask;
3641 
3642 	POOL_SETMASK2(&BlockSig, &oldmask);
3643 	pool_semaphore_lock(QUERY_CACHE_STATS_SEM);
3644 	stats->num_cache_hits++;
3645 	pool_semaphore_unlock(QUERY_CACHE_STATS_SEM);
3646 	POOL_SETMASK(&oldmask);
3647 	return stats->num_cache_hits;
3648 }
3649 
3650 /*
3651  * On shared memory hash table implementation.  We use sub part of md5
3652  * hash key as hash function.  The experiment has shown that has_any()
3653  * of PostgreSQL is a little bit better than the method using part of
3654  * md5 hash value, but it seems adding some cpu cycles to call
3655  * hash_any() is not worth the trouble.
3656  */
3657 
3658 static volatile POOL_HASH_HEADER *hash_header;
3659 static volatile POOL_HASH_ELEMENT *hash_elements;
3660 static volatile POOL_HASH_ELEMENT *hash_free;
3661 
3662 /*
3663  * Initialize hash table on shared memory "nelements" is max number of
3664  * hash keys. The actual number of hash key is rounded up to power of
3665  * 2.
3666  */
3667 #undef POOL_HASH_DEBUG
3668 
pool_hash_init(int nelements)3669 int pool_hash_init(int nelements)
3670 {
3671 	size_t size;
3672 	int nelements2;		/* number of rounded up hash keys */
3673 	int shift;
3674 	uint32 mask;
3675 	POOL_HASH_HEADER hh;
3676 	int i;
3677 
3678 	if (nelements <= 0)
3679 		ereport(ERROR,
3680 			(errmsg("initializing hash table on shared memory, invalid number of elements: %d",nelements)));
3681 
3682 	/* Round up to power of 2 */
3683 	shift = 32;
3684 	nelements2 = 1;
3685 	do
3686 	{
3687 		nelements2 <<= 1;
3688 		shift--;
3689 	} while (nelements2 < nelements);
3690 
3691 	mask = ~0;
3692 	mask >>= shift;
3693 	size = (char *)&hh.elements - (char *)&hh + sizeof(POOL_HEADER_ELEMENT)*nelements2;
3694 	hash_header = pool_shared_memory_create(size);
3695 	hash_header->nhash = nelements2;
3696     hash_header->mask = mask;
3697 
3698 #ifdef POOL_HASH_DEBUG
3699 	ereport(LOG,
3700 		(errmsg("initializing hash table on shared memory"),
3701 			 errdetail("size:%zd nelements2:%d", size, nelements2)));
3702 
3703 #endif
3704 
3705 	size = sizeof(POOL_HASH_ELEMENT)*nelements2;
3706 	hash_elements = pool_shared_memory_create(size);
3707 
3708 #ifdef POOL_HASH_DEBUG
3709 	ereport(LOG,
3710 		(errmsg("initializing hash table on shared memory"),
3711 			 errdetail("size:%zd nelements2:%d", size, nelements2)));
3712 #endif
3713 
3714 	for (i=0;i<nelements2-1;i++)
3715 	{
3716 		hash_elements[i].next = (POOL_HASH_ELEMENT *)&hash_elements[i+1];
3717 	}
3718 	hash_elements[nelements2-1].next = NULL;
3719 	hash_free = hash_elements;
3720 
3721 	return 0;
3722 }
3723 
3724 /*
3725  * Reset hash table on shared memory "nelements" is max number of
3726  * hash keys. The actual number of hash key is rounded up to power of
3727  * 2.
3728  */
3729 static int
pool_hash_reset(int nelements)3730 pool_hash_reset(int nelements)
3731 {
3732 	size_t size;
3733 	int nelements2;		/* number of rounded up hash keys */
3734 	int shift;
3735 	uint32 mask;
3736 	POOL_HASH_HEADER hh;
3737 	int i;
3738 
3739 	if (nelements <= 0)
3740 		ereport(ERROR,
3741 				(errmsg("clearing hash table on shared memory, invalid number of elements: %d",nelements)));
3742 
3743 	/* Round up to power of 2 */
3744 	shift = 32;
3745 	nelements2 = 1;
3746 	do
3747 	{
3748 		nelements2 <<= 1;
3749 		shift--;
3750 	} while (nelements2 < nelements);
3751 
3752 	mask = ~0;
3753 	mask >>= shift;
3754 
3755 	size = (char *)&hh.elements - (char *)&hh + sizeof(POOL_HEADER_ELEMENT)*nelements2;
3756 	memset((void *)hash_header, 0, size);
3757 
3758 	hash_header->nhash = nelements2;
3759     	hash_header->mask = mask;
3760 
3761 	size = sizeof(POOL_HASH_ELEMENT)*nelements2;
3762 	memset((void *)hash_elements, 0, size);
3763 
3764 	for (i=0;i<nelements2-1;i++)
3765 	{
3766 		hash_elements[i].next = (POOL_HASH_ELEMENT *)&hash_elements[i+1];
3767 	}
3768 	hash_elements[nelements2-1].next = NULL;
3769 	hash_free = hash_elements;
3770 
3771 	return 0;
3772 }
3773 
3774 /*
3775  * Search cacheid by MD5 hash key string
3776  * If found, returns cache id, otherwise NULL.
3777  */
pool_hash_search(POOL_QUERY_HASH * key)3778 POOL_CACHEID *pool_hash_search(POOL_QUERY_HASH *key)
3779 {
3780 	volatile POOL_HASH_ELEMENT *element;
3781 
3782 	uint32 hash_key = create_hash_key(key);
3783 
3784 	if (hash_key >= hash_header->nhash)
3785 	{
3786 		ereport(WARNING,
3787 			(errmsg("memcache: searching cacheid from hash. invalid hash key"),
3788 				errdetail("invalid hash key: %uld nhash: %ld",
3789 					   hash_key, hash_header->nhash)));
3790 		return NULL;
3791 	}
3792 
3793 	{
3794 		char md5[POOL_MD5_HASHKEYLEN+1];
3795 		memcpy(md5, key->query_hash, POOL_MD5_HASHKEYLEN);
3796 		md5[POOL_MD5_HASHKEYLEN] = '\0';
3797 #ifdef POOL_HASH_DEBUG
3798 		ereport(LOG,
3799 			(errmsg("searching hash table"),
3800 				 errdetail("hash_key:%d md5:%s", hash_key, md5)));
3801 #endif
3802 	}
3803 
3804 	element = hash_header->elements[hash_key].element;
3805 	while (element)
3806 	{
3807 		{
3808 			char md5[POOL_MD5_HASHKEYLEN+1];
3809 			memcpy(md5, key->query_hash, POOL_MD5_HASHKEYLEN);
3810 			md5[POOL_MD5_HASHKEYLEN] = '\0';
3811 #ifdef POOL_HASH_DEBUG
3812 			ereport(LOG,
3813 				(errmsg("searching hash table"),
3814 					 errdetail("element md5:%s", md5)));
3815 #endif
3816 		}
3817 
3818 		if (memcmp((const void *)element->hashkey.query_hash,
3819 				   (const void *)key->query_hash, sizeof(key->query_hash)) == 0)
3820 		{
3821 			return (POOL_CACHEID *)&element->cacheid;
3822 		}
3823 		element = element->next;
3824 	}
3825 	return NULL;
3826 }
3827 
3828 /*
3829  * Insert MD5 key and associated cache id into shmem hash table.  If
3830  * "update" is true, replace cacheid associated with the MD5 key,
3831  * rather than throw an error.
3832  */
pool_hash_insert(POOL_QUERY_HASH * key,POOL_CACHEID * cacheid,bool update)3833 static int pool_hash_insert(POOL_QUERY_HASH *key, POOL_CACHEID *cacheid, bool update)
3834 {
3835 	POOL_HASH_ELEMENT *element;
3836 	POOL_HASH_ELEMENT *new_element;
3837 
3838 	uint32 hash_key = create_hash_key(key);
3839 
3840 	if (hash_key >= hash_header->nhash)
3841 	{
3842 		ereport(WARNING,
3843 			(errmsg("memcache: adding cacheid to hash. invalid hash key"),
3844 				 errdetail("invalid hash key: %uld nhash: %ld",
3845 						   hash_key, hash_header->nhash)));
3846 		return -1;
3847 	}
3848 
3849 	{
3850 		char md5[POOL_MD5_HASHKEYLEN+1];
3851 		memcpy(md5, key->query_hash, POOL_MD5_HASHKEYLEN);
3852 		md5[POOL_MD5_HASHKEYLEN] = '\0';
3853 #ifdef POOL_HASH_DEBUG
3854 		ereport(LOG,
3855 			(errmsg("searching hash table"),
3856 				 errdetail("hash_key:%d md5:%s block:%d item:%d", hash_key, md5, cacheid->blockid, cacheid->itemid)));
3857 #endif
3858 	}
3859 
3860 	/*
3861 	 * Look for hash key.
3862 	 */
3863 	element = hash_header->elements[hash_key].element;
3864 
3865 	while (element)
3866 	{
3867 		if (memcmp((const void *)element->hashkey.query_hash,
3868 				   (const void *)key->query_hash, sizeof(key->query_hash)) == 0)
3869 		{
3870 			/* Hash key found. If "update" is false, just throw an error. */
3871 			char md5[POOL_MD5_HASHKEYLEN+1];
3872 
3873 			if (!update)
3874 			{
3875 				memcpy(md5, key->query_hash, POOL_MD5_HASHKEYLEN);
3876 				md5[POOL_MD5_HASHKEYLEN] = '\0';
3877 				ereport(LOG,
3878 					(errmsg("memcache: adding cacheid to hash. hash key:\"%s\" already exists",md5)));
3879 				return -1;
3880 			}
3881 			else
3882 			{
3883 				/* Update cache id */
3884 				memcpy((void *)&element->cacheid, cacheid, sizeof(POOL_CACHEID));
3885 				return 0;
3886 			}
3887 		}
3888 		element = element->next;
3889 	}
3890 
3891 	/*
3892 	 * Ok, same key did not exist. Just insert new hash key.
3893 	 */
3894 	new_element = (POOL_HASH_ELEMENT *)get_new_hash_element();
3895 	if (!new_element)
3896 	{
3897 		ereport(LOG,
3898 				(errmsg("memcache: adding cacheid to hash. failed to get new element")));
3899 		return -1;
3900 	}
3901 
3902 	element = hash_header->elements[hash_key].element;
3903 
3904 	hash_header->elements[hash_key].element = new_element;
3905 	new_element->next = element;
3906 
3907 	memcpy((void *)new_element->hashkey.query_hash, key->query_hash, POOL_MD5_HASHKEYLEN);
3908 	memcpy((void *)&new_element->cacheid, cacheid, sizeof(POOL_CACHEID));
3909 
3910 	return 0;
3911 }
3912 
3913 /*
3914  * Delete MD5 key and associated cache id from shmem hash table.
3915  */
pool_hash_delete(POOL_QUERY_HASH * key)3916 int pool_hash_delete(POOL_QUERY_HASH *key)
3917 {
3918 	POOL_HASH_ELEMENT *element;
3919 	POOL_HASH_ELEMENT **delete_point;
3920 	bool found;
3921 
3922 	uint32 hash_key = create_hash_key(key);
3923 
3924 	if (hash_key >= hash_header->nhash)
3925 	{
3926 		ereport(LOG,
3927 			(errmsg("memcache: deleting key from hash. invalid key"),
3928 				 errdetail("invalid hash key: %uld nhash: %ld",
3929 						   hash_key, hash_header->nhash)));
3930 		return -1;
3931 	}
3932 
3933 	/*
3934 	 * Look for delete location
3935 	 */
3936 	found = false;
3937 	delete_point = (POOL_HASH_ELEMENT **)&(hash_header->elements[hash_key].element);
3938 	element = hash_header->elements[hash_key].element;
3939 
3940 	while (element)
3941 	{
3942 		if (memcmp(element->hashkey.query_hash, key->query_hash, sizeof(key->query_hash)) == 0)
3943 		{
3944 			found = true;
3945 			break;
3946 		}
3947 		delete_point = &element->next;
3948 		element = element->next;
3949 	}
3950 
3951 	if (!found)
3952 	{
3953 		char md5[POOL_MD5_HASHKEYLEN+1];
3954 
3955 		memcpy(md5, key->query_hash, POOL_MD5_HASHKEYLEN);
3956 		md5[POOL_MD5_HASHKEYLEN] = '\0';
3957 		ereport(LOG,
3958 			(errmsg("memcache: deleting key from hash. key:\"%s\" not found",md5)));
3959 		return -1;
3960 	}
3961 
3962 	/*
3963 	 * Put back the element to free list
3964 	 */
3965 	*delete_point = element->next;
3966 	put_back_hash_element(element);
3967 
3968 	return 0;
3969 }
3970 
3971 /*
3972  * Calculate 32bit binary hash key (i.e. location in hash header) from MD5
3973  * string. We use top most 8 characters of MD5 string for calculation.
3974 */
create_hash_key(POOL_QUERY_HASH * key)3975 static uint32 create_hash_key(POOL_QUERY_HASH *key)
3976 {
3977 #define POOL_HASH_NCHARS 8
3978 
3979 	char md5[POOL_HASH_NCHARS+1];
3980 	uint32 mask;
3981 
3982 	memcpy(md5, key->query_hash, POOL_HASH_NCHARS);
3983 	md5[POOL_HASH_NCHARS] = '\0';
3984 	mask = strtoul(md5, NULL, 16);
3985 	mask &= hash_header->mask;
3986 	return mask;
3987 }
3988 
3989 /*
3990  * Get new free hash element from free list.
3991  */
get_new_hash_element(void)3992 static volatile POOL_HASH_ELEMENT *get_new_hash_element(void)
3993 {
3994 	volatile POOL_HASH_ELEMENT *elm;
3995 
3996 	if (!hash_free->next)
3997 	{
3998 		/* No free element */
3999 		return NULL;
4000 	}
4001 
4002 #ifdef POOL_HASH_DEBUG
4003 	ereport(LOG,
4004 		(errmsg("getting new hash element"),
4005 			errdetail("hash_free->next:%p hash_free->next->next:%p",
4006 				   hash_free->next, hash_free->next->next)));
4007 #endif
4008 
4009 	elm = hash_free->next;
4010 	hash_free->next = elm->next;
4011 
4012 	return elm;
4013 }
4014 
4015 /*
4016  * Put back hash element to free list.
4017  */
put_back_hash_element(volatile POOL_HASH_ELEMENT * element)4018 static void put_back_hash_element(volatile POOL_HASH_ELEMENT *element)
4019 {
4020 	POOL_HASH_ELEMENT *elm;
4021 
4022 #ifdef POOL_HASH_DEBUG
4023 	ereport(LOG,
4024 		(errmsg("getting new hash element"),
4025 			errdetail("hash_free->next:%p hash_free->next->next:%p",
4026 				   hash_free->next, hash_free->next->next)));
4027 #endif
4028 
4029 	elm = hash_free->next;
4030 	hash_free->next = (POOL_HASH_ELEMENT *)element;
4031 	element->next = elm;
4032 }
4033 
4034 /*
4035  * Return true if there's a free hash element.
4036  */
is_free_hash_element(void)4037 static bool is_free_hash_element(void)
4038 {
4039 	return hash_free->next != NULL;
4040 }
4041 
4042 /*
4043  * Returns shared memory cache stats.
4044  * Subsequent call to this function will break return value
4045  * because its in static memory.
4046  * Caller must hold shmem_lock before calling this function.
4047  * If in memory query cache is not enabled, all stats are 0.
4048  */
pool_get_shmem_storage_stats(void)4049 POOL_SHMEM_STATS *pool_get_shmem_storage_stats(void)
4050 {
4051 	static POOL_SHMEM_STATS mystats;
4052 	POOL_HASH_ELEMENT *element;
4053 	int nblocks;
4054 	int i;
4055 
4056 	memset(&mystats, 0, sizeof(POOL_SHMEM_STATS));
4057 
4058 	if (!pool_config-> memory_cache_enabled)
4059 		return &mystats;
4060 
4061 	/*
4062 	 * Copy cache hit data
4063 	 */
4064 	mystats.cache_stats.num_selects = stats->num_selects;
4065 	mystats.cache_stats.num_cache_hits = stats->num_cache_hits;
4066 
4067 	if (pool_config->memqcache_method != SHMEM_CACHE)
4068 		return &mystats;
4069 
4070 	/* number of total hash entries */
4071 	mystats.num_hash_entries = hash_header->nhash;
4072 
4073 	/* number of used hash entries */
4074 	for (i=0;i<hash_header->nhash;i++)
4075 	{
4076 		element = hash_header->elements[i].element;
4077 		while (element)
4078 		{
4079 			mystats.used_hash_entries++;
4080 			element = element->next;
4081 		}
4082 	}
4083 
4084 	nblocks = pool_get_memqcache_blocks();
4085 
4086 	for (i=0;i<nblocks;i++)
4087 	{
4088 		POOL_CACHE_BLOCK_HEADER *bh;
4089 		POOL_CACHE_ITEM_POINTER *cip;
4090 		char *p = block_address(i);
4091 		bh = (POOL_CACHE_BLOCK_HEADER *)p;
4092 		int j;
4093 
4094 		if (bh->flags & POOL_BLOCK_USED)
4095 		{
4096 			for (j=0;j<bh->num_items;j++)
4097 			{
4098 				cip = item_pointer(p, j);
4099 				if (POOL_ITEM_DELETED & cip->flags)
4100 				{
4101 					mystats.fragment_cache_entries_size += item_header(p, j)->total_length;
4102 				}
4103 				else
4104 				{
4105 					/* number of used cache entries */
4106 					mystats.num_cache_entries++;
4107 					/* total size of used cache entries */
4108 					mystats.used_cache_entries_size += (item_header(p, j)->total_length + sizeof(POOL_CACHE_ITEM_POINTER));
4109 				}
4110 			}
4111 			mystats.used_cache_entries_size += sizeof(POOL_CACHE_BLOCK_HEADER);
4112 			/* total size of free(usable) cache entries */
4113 			mystats.free_cache_entries_size += bh->free_bytes;
4114 		}
4115 		else
4116 		{
4117 			mystats.free_cache_entries_size += pool_config->memqcache_cache_block_size;
4118 		}
4119 	}
4120 
4121 	/*
4122 	 * Copy POOL_QUERY_CACHE_STATS
4123 	 */
4124 	memcpy(&mystats.cache_stats, stats, sizeof(mystats.cache_stats));
4125 
4126 	return &mystats;
4127 }
4128 
4129 /*
4130  * Inject cached message to the target backend buffer to pretend as if backend
4131  * actually replies with Data row and Command Complete message.
4132  */
inject_cached_message(POOL_CONNECTION * backend,char * qcache,int qcachelen)4133 static void inject_cached_message(POOL_CONNECTION *backend, char *qcache, int qcachelen)
4134 {
4135 	char kind;
4136 	int len;
4137 	char *buf;
4138 	int timeout;
4139 	int i = 0;
4140 	bool is_prepared_stmt = false;
4141 	POOL_SESSION_CONTEXT *session_context;
4142 	POOL_QUERY_CONTEXT* query_context;
4143 	POOL_PENDING_MESSAGE *msg;
4144 
4145 	session_context = pool_get_session_context(false);
4146 	query_context = session_context->query_context;
4147 	msg = pool_pending_message_find_lastest_by_query_context(query_context);
4148 
4149 	if (msg)
4150 	{
4151 		/*
4152 		 * If pending message found, we should extract target backend from it
4153 		 */
4154 		int backend_id;
4155 
4156 		backend_id = pool_pending_message_get_target_backend_id(msg);
4157 		backend = CONNECTION(session_context->backend, backend_id);
4158 		timeout = -1;
4159 	}
4160 	else
4161 		timeout = 0;
4162 
4163 	/* Send flush messsage to backend to retrieve response of backend */
4164 	pool_write(backend, "H", 1);
4165 	len = htonl(sizeof(len));
4166 	pool_write_and_flush(backend, &len, sizeof(len));
4167 
4168 	/*
4169 	 * Push any response from backend
4170 	 */
4171 	for(;;)
4172 	{
4173 		pool_read(backend, &kind, 1);
4174 		ereport(DEBUG1,
4175 				(errmsg("inject_cached_message: push message kind: '%c'", kind)));
4176 		if (msg &&
4177 			((kind == 'T' && msg->type == POOL_DESCRIBE) ||
4178 			 (kind == '2' && msg->type == POOL_BIND)))
4179 		{
4180 			/* Pending message seen. Now it is likely to end of pending data */
4181 			timeout = 0;
4182 		}
4183 		pool_push(backend, &kind, sizeof(kind));
4184 		pool_read(backend, &len, sizeof(len));
4185 		pool_push(backend, &len, sizeof(len));
4186 		if ((ntohl(len)-sizeof(len)) > 0)
4187 		{
4188 			buf = pool_read2(backend, ntohl(len)-sizeof(len));
4189 			pool_push(backend, buf, ntohl(len)-sizeof(len));
4190 		}
4191 
4192 		/* check if there's any pending data */
4193 		if (!pool_ssl_pending(backend) && pool_read_buffer_is_empty(backend))
4194 		{
4195 			pool_set_timeout(timeout);
4196 			if (pool_check_fd(backend) != 0)
4197 			{
4198 				ereport(DEBUG1,
4199 						(errmsg("inject_cached_message: select shows no pending data")));
4200 				pool_set_timeout(-1);
4201 				break;
4202 			}
4203 			pool_set_timeout(-1);
4204 		}
4205 	}
4206 
4207 	/*
4208 	 * Inject row data and command complete
4209 	 */
4210 	while (i < qcachelen)
4211 	{
4212 		char tmpkind;
4213 		int tmplen;
4214 		char *p;
4215 
4216 		tmpkind = qcache[i];
4217 		i++;
4218 
4219 		memcpy(&tmplen, qcache+i, sizeof(tmplen));
4220 		i += sizeof(tmplen);
4221 		len = ntohl(tmplen);
4222 		p = qcache + i;
4223 		i += len - sizeof(tmplen);
4224 
4225 		/* No need to cache PARSE and BIND responses */
4226 		if (tmpkind == '1' || tmpkind == '2')
4227 		{
4228 			is_prepared_stmt = true;
4229 			continue;
4230 		}
4231 
4232 		/*
4233 		 * In the prepared statement execution, there is no need to send
4234 		 * 'T' response to the frontend.
4235 		 */
4236 		if (is_prepared_stmt && tmpkind == 'T')
4237 		{
4238 			continue;
4239 		}
4240 
4241 		/* push message */
4242 		ereport(DEBUG1,
4243 			(errmsg("inject_cached_message: push cached messages: '%c' len: %d", tmpkind, len)));
4244 		pool_push(backend, &tmpkind, 1);
4245 		pool_push(backend, &tmplen, sizeof(tmplen));
4246 		if (len > 0)
4247 			pool_push(backend, p, len - sizeof(tmplen));
4248 	}
4249 
4250 	/*
4251 	 * Pop data.
4252 	 */
4253 	pool_pop(backend, &len);
4254 }
4255