1 /* -*-pgsql-c-*- */
2 /*
3  *
4  * $Header$
5  *
6  * pgpool: a language independent connection pool server for PostgreSQL
7  * written by Tatsuo Ishii
8  *
9  * Copyright (c) 2003-2017	PgPool Global Development Group
10  *
11  * Permission to use, copy, modify, and distribute this software and
12  * its documentation for any purpose and without fee is hereby
13  * granted, provided that the above copyright notice appear in all
14  * copies and that both that copyright notice and this permission
15  * notice appear in supporting documentation, and that the name of the
16  * author not be used in advertising or publicity pertaining to
17  * distribution of the software without specific, written prior
18  * permission. The author makes no representations about the
19  * suitability of this software for any purpose.  It is provided "as
20  * is" without express or implied warranty.
21  *
22  */
23 #include "pool.h"
24 #include "pool_config.h"
25 #include "protocol/pool_proto_modules.h"
26 #include "utils/palloc.h"
27 #include "utils/memutils.h"
28 #include "utils/elog.h"
29 #include "utils/pool_select_walker.h"
30 #include "utils/pool_stream.h"
31 #include "context/pool_session_context.h"
32 #include "context/pool_query_context.h"
33 #include "parser/nodes.h"
34 
35 #include <string.h>
36 #include <netinet/in.h>
37 #include <stdlib.h>
38 
39 /*
40  * Where to send query
41  */
42 typedef enum {
43 	POOL_PRIMARY,
44 	POOL_STANDBY,
45 	POOL_EITHER,
46 	POOL_BOTH
47 } POOL_DEST;
48 
49 #define CHECK_QUERY_CONTEXT_IS_VALID \
50 						do { \
51 							if (!query_context) \
52 								ereport(ERROR, \
53 									(errmsg("setting db node for query to be sent, no query context")));\
54 						} while (0)
55 
56 static POOL_DEST send_to_where(Node *node, char *query);
57 static void where_to_send_deallocate(POOL_QUERY_CONTEXT *query_context, Node *node);
58 static char* remove_read_write(int len, const char *contents, int *rewritten_len);
59 
60 /*
61  * Create and initialize per query session context
62  */
pool_init_query_context(void)63 POOL_QUERY_CONTEXT *pool_init_query_context(void)
64 {
65 	MemoryContext memory_context = AllocSetContextCreate(QueryContext,
66 															"QueryContextMemoryContext",
67 															ALLOCSET_SMALL_MINSIZE,
68 															ALLOCSET_SMALL_INITSIZE,
69 															ALLOCSET_SMALL_MAXSIZE);
70 
71 	MemoryContext oldcontext = MemoryContextSwitchTo(memory_context);
72 	POOL_QUERY_CONTEXT *qc;
73 	qc = palloc0(sizeof(*qc));
74 	qc->memory_context = memory_context;
75 	MemoryContextSwitchTo(oldcontext);
76 	return qc;
77 }
78 
79 /*
80  * Destroy query context
81  */
pool_query_context_destroy(POOL_QUERY_CONTEXT * query_context)82 void pool_query_context_destroy(POOL_QUERY_CONTEXT *query_context)
83 {
84 	POOL_SESSION_CONTEXT *session_context;
85 
86 	if (query_context)
87 	{
88 		MemoryContext memory_context = query_context->memory_context;
89 
90 		ereport(DEBUG1,
91 				(errmsg("pool_query_context_destroy: query context:%p", query_context)));
92 
93 		session_context = pool_get_session_context(false);
94 		pool_unset_query_in_progress();
95 		if (!pool_is_command_success() && query_context->pg_terminate_backend_conn)
96 		{
97 			ereport(DEBUG1,
98 				 (errmsg("clearing the connection flag for pg_terminate_backend")));
99 			pool_unset_connection_will_be_terminated(query_context->pg_terminate_backend_conn);
100 		}
101 		query_context->pg_terminate_backend_conn = NULL;
102 		query_context->original_query = NULL;
103 		session_context->query_context = NULL;
104 		pfree(query_context);
105 		MemoryContextDelete(memory_context);
106 	}
107 }
108 
109 /*
110  * Perform shallow copy of given query context. Used in parse_before_bind.
111  */
pool_query_context_shallow_copy(POOL_QUERY_CONTEXT * query_context)112 POOL_QUERY_CONTEXT *pool_query_context_shallow_copy(POOL_QUERY_CONTEXT *query_context)
113 {
114 	POOL_QUERY_CONTEXT *qc;
115 	MemoryContext memory_context;
116 
117 	qc = pool_init_query_context();
118 	memory_context = qc->memory_context;
119 	memcpy(qc, query_context, sizeof(POOL_QUERY_CONTEXT));
120 	qc->memory_context = memory_context;
121 	return qc;
122 }
123 
124 /*
125  * Start query
126  */
pool_start_query(POOL_QUERY_CONTEXT * query_context,char * query,int len,Node * node)127 void pool_start_query(POOL_QUERY_CONTEXT *query_context, char *query, int len, Node *node)
128 {
129 	POOL_SESSION_CONTEXT *session_context;
130 
131 	if (query_context)
132 	{
133 		MemoryContext old_context;
134 		session_context = pool_get_session_context(false);
135 		old_context = MemoryContextSwitchTo(query_context->memory_context);
136 		query_context->original_length = len;
137 		query_context->rewritten_length = -1;
138 		query_context->original_query = pstrdup(query);
139 		query_context->rewritten_query = NULL;
140 		query_context->parse_tree = node;
141 		query_context->virtual_master_node_id = my_master_node_id;
142 		query_context->is_cache_safe = false;
143 		query_context->num_original_params = -1;
144 		if (pool_config->memory_cache_enabled)
145 			query_context->temp_cache = pool_create_temp_query_cache(query);
146 		pool_set_query_in_progress();
147 		query_context->skip_cache_commit = false;
148 		session_context->query_context = query_context;
149 		MemoryContextSwitchTo(old_context);
150 	}
151 }
152 
153 /*
154  * Specify DB node to send query
155  */
pool_set_node_to_be_sent(POOL_QUERY_CONTEXT * query_context,int node_id)156 void pool_set_node_to_be_sent(POOL_QUERY_CONTEXT *query_context, int node_id)
157 {
158 	CHECK_QUERY_CONTEXT_IS_VALID;
159 
160 	if (node_id < 0 || node_id >= MAX_NUM_BACKENDS)
161 		ereport(ERROR,
162 			(errmsg("setting db node for query to be sent, invalid node id:%d",node_id),
163 				 errdetail("backend node id: %d out of range, node id can be between 0 and %d",node_id,MAX_NUM_BACKENDS)));
164 
165 	query_context->where_to_send[node_id] = true;
166 
167 	return;
168 }
169 
170 /*
171  * Unspecify DB node to send query
172  */
pool_unset_node_to_be_sent(POOL_QUERY_CONTEXT * query_context,int node_id)173 void pool_unset_node_to_be_sent(POOL_QUERY_CONTEXT *query_context, int node_id)
174 {
175 	CHECK_QUERY_CONTEXT_IS_VALID;
176 
177 	if (node_id < 0 || node_id >= MAX_NUM_BACKENDS)
178 		ereport(ERROR,
179 			(errmsg("un setting db node for query to be sent, invalid node id:%d",node_id),
180 				 errdetail("backend node id: %d out of range, node id can be between 0 and %d",node_id,MAX_NUM_BACKENDS)));
181 
182 	query_context->where_to_send[node_id] = false;
183 
184 	return;
185 }
186 
187 /*
188  * Clear DB node map
189  */
pool_clear_node_to_be_sent(POOL_QUERY_CONTEXT * query_context)190 void pool_clear_node_to_be_sent(POOL_QUERY_CONTEXT *query_context)
191 {
192 	CHECK_QUERY_CONTEXT_IS_VALID;
193 
194 	memset(query_context->where_to_send, false, sizeof(query_context->where_to_send));
195 	return;
196 }
197 
198 /*
199  * Set all DB node map entry
200  */
pool_setall_node_to_be_sent(POOL_QUERY_CONTEXT * query_context)201 void pool_setall_node_to_be_sent(POOL_QUERY_CONTEXT *query_context)
202 {
203 	int i;
204 	POOL_SESSION_CONTEXT *sc;
205 
206 	sc = pool_get_session_context(false);
207 
208 	CHECK_QUERY_CONTEXT_IS_VALID;
209 
210 	for (i=0;i<NUM_BACKENDS;i++)
211 	{
212 		if (private_backend_status[i] == CON_UP ||
213 			(private_backend_status[i] == CON_CONNECT_WAIT))
214 		{
215 			/*
216 			 * In streaming replication mode, if the node is not
217 			 * primary node nor load balance node, there's no point to
218 			 * send query.
219 			 */
220 			if (pool_config->master_slave_mode &&
221 				pool_config->master_slave_sub_mode == STREAM_MODE &&
222 				i != PRIMARY_NODE_ID && i != sc->load_balance_node_id)
223 			{
224 				continue;
225 			}
226 			query_context->where_to_send[i] = true;
227 		}
228 	}
229 	return;
230 }
231 
232 /*
233  * Return true if multiple nodes are targets
234  */
pool_multi_node_to_be_sent(POOL_QUERY_CONTEXT * query_context)235 bool pool_multi_node_to_be_sent(POOL_QUERY_CONTEXT *query_context)
236 {
237 	int i;
238 	int cnt = 0;
239 
240 	CHECK_QUERY_CONTEXT_IS_VALID;
241 
242 	for (i=0;i<NUM_BACKENDS;i++)
243 	{
244 		if (((BACKEND_INFO(i)).backend_status == CON_UP ||
245 			 BACKEND_INFO((i)).backend_status == CON_CONNECT_WAIT) &&
246 			query_context->where_to_send[i])
247 		{
248 			cnt++;
249 			if (cnt > 1)
250 			{
251 				return true;
252 			}
253 		}
254 	}
255 	return false;
256 }
257 
258 /*
259  * Return if the DB node is needed to send query
260  */
pool_is_node_to_be_sent(POOL_QUERY_CONTEXT * query_context,int node_id)261 bool pool_is_node_to_be_sent(POOL_QUERY_CONTEXT *query_context, int node_id)
262 {
263 	CHECK_QUERY_CONTEXT_IS_VALID;
264 
265 	if (node_id < 0 || node_id >= MAX_NUM_BACKENDS)
266 		ereport(ERROR,
267 			(errmsg("checking if db node is needed to be sent, invalid node id:%d",node_id),
268 				 errdetail("backend node id: %d out of range, node id can be between 0 and %d",node_id,MAX_NUM_BACKENDS)));
269 
270 	return query_context->where_to_send[node_id];
271 }
272 
273 /*
274  * Returns true if the DB node is needed to send query.
275  * Intended to be called from VALID_BACKEND
276  */
pool_is_node_to_be_sent_in_current_query(int node_id)277 bool pool_is_node_to_be_sent_in_current_query(int node_id)
278 {
279 	POOL_SESSION_CONTEXT *sc;
280 
281 	if (RAW_MODE)
282 		return node_id == REAL_MASTER_NODE_ID;
283 
284 	sc = pool_get_session_context(true);
285 	if (!sc)
286 		return true;
287 
288 	if (pool_is_query_in_progress() && sc->query_context)
289 	{
290 		return pool_is_node_to_be_sent(sc->query_context, node_id);
291 	}
292 	return true;
293 }
294 
295 /*
296  * Returns virtual master DB node id,
297  */
pool_virtual_master_db_node_id(void)298 int pool_virtual_master_db_node_id(void)
299 {
300 	POOL_SESSION_CONTEXT *sc;
301 
302 	/*
303 	 * Check whether failover is in progress. If so, just abort this session.
304 	 */
305 	if (Req_info->switching)
306 	{
307 #ifdef NOT_USED
308 		POOL_SETMASK(&BlockSig);
309 		ereport(WARNING,
310 				(errmsg("failover/failback is in progress"),
311 						errdetail("executing failover or failback on backend"),
312 				 errhint("In a moment you should be able to reconnect to the database")));
313 		POOL_SETMASK(&UnBlockSig);
314 #endif
315 		child_exit(POOL_EXIT_AND_RESTART);
316 	}
317 
318 	sc = pool_get_session_context(true);
319 	if (!sc)
320 	{
321 		return REAL_MASTER_NODE_ID;
322 	}
323 
324 	if (sc->in_progress && sc->query_context)
325 	{
326 		int node_id = sc->query_context->virtual_master_node_id;
327 
328 		if (STREAM)
329 		{
330 			 /*
331 			  * Make sure that virtual_master_node_id is either primary node
332 			  * id or load balance node id.  If not, it is likely that
333 			  * virtual_master_node_id is not set up yet. Let's use the
334 			  * primary node id. except for the special case where we need
335 			  * to send the query to the node which is not primary nor the
336 			  * load balance node. Currently there is only one special such
337 			  * case that is handling of pg_terminate_backend() function, which
338 			  * may refer to the backend connection that is neither hosted by
339 			  * the primary or load balance node for current child process, but
340 			  * the query must be forwarded to that node. Since only that backend
341 			  * node can handle that pg_terminate_backend query
342 			  *
343 			  */
344 
345 			ereport(DEBUG1,
346 					(errmsg("pool_virtual_master_db_node_id: virtual_master_node_id:%d load_balance_node_id:%d PRIMARY_NODE_ID:%d",
347 							node_id, sc->load_balance_node_id, PRIMARY_NODE_ID)));
348 
349 			if (node_id != sc->load_balance_node_id && node_id != PRIMARY_NODE_ID)
350 			{
351 				/*
352 				 * Only return the primary node id if we are not processing the
353 				 * pg_terminate_backend query
354 				 */
355 				if (sc->query_context->pg_terminate_backend_conn == NULL)
356 					node_id = PRIMARY_NODE_ID;
357 			}
358 		}
359 
360 		return node_id;
361 	}
362 
363 	/*
364 	 * No query context exists.  If in master/slave mode, returns
365 	 * primary node if exists.  Otherwise returns my_master_node_id,
366 	 * which represents the last REAL_MASTER_NODE_ID.
367 	 */
368 	if (MASTER_SLAVE)
369 	{
370 		int node_id;
371 
372 		node_id = pool_get_preferred_master_node_id();
373 		if (node_id >= 0)
374 			return node_id;
375 
376 		return PRIMARY_NODE_ID;
377 	}
378 	return my_master_node_id;
379 }
380 
381 /*
382  * The function sets the destination for the current query to the specific backend node
383  */
pool_force_query_node_to_backend(POOL_QUERY_CONTEXT * query_context,int backend_id)384 void pool_force_query_node_to_backend(POOL_QUERY_CONTEXT *query_context, int backend_id)
385 {
386 	int i;
387 	CHECK_QUERY_CONTEXT_IS_VALID;
388 
389 	ereport(DEBUG1,
390 		(errmsg("forcing query destination node to backend node:%d",backend_id)));
391 
392 	pool_set_node_to_be_sent(query_context,backend_id);
393 	for (i=0;i<NUM_BACKENDS;i++)
394 	{
395 		if (query_context->where_to_send[i])
396 		{
397 			query_context->virtual_master_node_id = i;
398 			break;
399 		}
400 	}
401 }
402 
403 /*
404  * Decide where to send queries(thus expecting response)
405  */
pool_where_to_send(POOL_QUERY_CONTEXT * query_context,char * query,Node * node)406 void pool_where_to_send(POOL_QUERY_CONTEXT *query_context, char *query, Node *node)
407 {
408 	POOL_SESSION_CONTEXT *session_context;
409 	POOL_CONNECTION_POOL *backend;
410 	int i;
411 
412 	CHECK_QUERY_CONTEXT_IS_VALID;
413 
414 	session_context = pool_get_session_context(false);
415 	backend = session_context->backend;
416 
417 	/*
418 	 * Zap out DB node map
419 	 */
420 	pool_clear_node_to_be_sent(query_context);
421 
422 	/*
423 	 * If there is "NO LOAD BALANCE" comment, we send only to master node.
424 	 */
425 	if (!strncasecmp(query, NO_LOAD_BALANCE, NO_LOAD_BALANCE_COMMENT_SZ))
426 	{
427 		pool_set_node_to_be_sent(query_context,
428 								 MASTER_SLAVE ? PRIMARY_NODE_ID : REAL_MASTER_NODE_ID);
429 		for (i=0;i<NUM_BACKENDS;i++)
430 		{
431 			if (query_context->where_to_send[i])
432 			{
433 				query_context->virtual_master_node_id = i;
434 				break;
435 			}
436 		}
437 		return;
438 	}
439 
440 	/*
441 	 * In raw mode, we send only to master node. Simple enough.
442 	 */
443 	if (RAW_MODE)
444 	{
445 		pool_set_node_to_be_sent(query_context, REAL_MASTER_NODE_ID);
446 	}
447 	else if (MASTER_SLAVE && query_context->is_multi_statement)
448 	{
449 		/*
450 		 * If we are in master/slave mode and we have multi statement
451 		 * query, we should send it to primary server only. Otherwise
452 		 * it is possible to send a write query to standby servers
453 		 * because we only use the first element of the multi
454 		 * statement query and don't care about the rest.  Typical
455 		 * situation where we are bugged by this is, "BEGIN;DELETE
456 		 * FROM table;END". Note that from pgpool-II 3.1.0
457 		 * transactional statements such as "BEGIN" is unconditionally
458 		 * sent to all nodes(see send_to_where() for more details).
459 		 * Someday we might be able to understand all part of multi
460 		 * statement queries, but until that day we need this band
461 		 * aid.
462 		 */
463 		pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
464 	}
465 	else if (MASTER_SLAVE)
466 	{
467 		POOL_DEST dest;
468 
469 		dest = send_to_where(node, query);
470 
471 		ereport(DEBUG1,
472 			(errmsg("decide where to send the query"),
473 				 errdetail("destination = %d for query= \"%s\"", dest, query)));
474 
475 		/* Should be sent to primary only? */
476 		if (dest == POOL_PRIMARY)
477 		{
478 			pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
479 		}
480 		/* Should be sent to both primary and standby? */
481 		else if (dest == POOL_BOTH)
482 		{
483 			pool_setall_node_to_be_sent(query_context);
484 		}
485 
486 		/*
487 		 * Ok, we might be able to load balance the SELECT query.
488 		 */
489 		else
490 		{
491 			if (pool_config->load_balance_mode &&
492 				is_select_query(node, query) &&
493 				MAJOR(backend) == PROTO_MAJOR_V3)
494 			{
495 				/*
496 				 * If (we are outside of an explicit transaction) OR
497 				 * (the transaction has not issued a write query yet, AND
498 				 *	transaction isolation level is not SERIALIZABLE)
499 				 * we might be able to load balance.
500 				 */
501 
502 				ereport(DEBUG1,
503 						(errmsg("checking load balance precondtions. TSTATE:%c wrting_trancation:%d failed_transaction:%d isolation:%d",
504 								TSTATE(backend, PRIMARY_NODE_ID),
505 								pool_is_writing_transaction(),
506 								pool_is_failed_transaction(),
507 								pool_get_transaction_isolation()),
508 						 errdetail("destination = %d for query= \"%s\"", dest, query)));
509 
510 				if (TSTATE(backend, PRIMARY_NODE_ID) == 'I' ||
511 					(!pool_is_writing_transaction() &&
512 					 !pool_is_failed_transaction() &&
513 					 pool_get_transaction_isolation() != POOL_SERIALIZABLE))
514 				{
515 					BackendInfo *bkinfo = pool_get_node_info(session_context->load_balance_node_id);
516 
517 					/*
518 					 * Load balance if possible
519 					 */
520 
521 					/*
522 					 * If replication delay is too much, we prefer to send to the primary.
523 					 */
524 					if (pool_config->master_slave_sub_mode == STREAM_MODE &&
525 						pool_config->delay_threshold &&
526 						bkinfo->standby_delay > pool_config->delay_threshold)
527 					{
528 						ereport(DEBUG1,
529 								(errmsg("could not load balance because of too much replication delay"),
530 								 errdetail("destination = %d for query= \"%s\"", dest, query)));
531 
532 						pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
533 					}
534 
535 					/*
536 					 * If a writing function call is used,
537 					 * we prefer to send to the primary.
538 					 */
539 					else if (pool_has_function_call(node))
540 					{
541 						ereport(DEBUG1,
542 								(errmsg("could not load balance because writing functions are used"),
543 								 errdetail("destination = %d for query= \"%s\"", dest, query)));
544 
545 						pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
546 					}
547 
548 					/*
549 					 * If system catalog is used in the SELECT, we
550 					 * prefer to send to the primary. Example: SELECT
551 					 * * FROM pg_class WHERE relname = 't1'; Because
552 					 * 't1' is a constant, it's hard to recognize as
553 					 * table name.  Most use case such query is
554 					 * against system catalog, and the table name can
555 					 * be a temporary table, it's best to query
556 					 * against primary system catalog.
557 					 * Please note that this test must be done *before*
558 					 * test using pool_has_temp_table.
559 					 */
560 					else if (pool_has_system_catalog(node))
561 					{
562 						ereport(DEBUG1,
563 								(errmsg("could not load balance because systems catalogs are used"),
564 								 errdetail("destination = %d for query= \"%s\"", dest, query)));
565 
566 						pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
567 					}
568 
569 					/*
570 					 * If temporary table is used in the SELECT,
571 					 * we prefer to send to the primary.
572 					 */
573 					else if (pool_config->check_temp_table && pool_has_temp_table(node))
574 					{
575 						ereport(DEBUG1,
576 								(errmsg("could not load balance because temporary tables are used"),
577 								 errdetail("destination = %d for query= \"%s\"", dest, query)));
578 
579 						pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
580 					}
581 
582 					/*
583 					 * If unlogged table is used in the SELECT,
584 					 * we prefer to send to the primary.
585 					 */
586 					else if (pool_config->check_unlogged_table && pool_has_unlogged_table(node))
587 					{
588 						ereport(DEBUG1,
589 								(errmsg("could not load balance because unlogged tables are used"),
590 								 errdetail("destination = %d for query= \"%s\"", dest, query)));
591 
592 						pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
593 					}
594 
595 					else
596 					{
597 						pool_set_node_to_be_sent(query_context,
598 												 session_context->load_balance_node_id);
599 					}
600 				}
601 				else
602 				{
603 					/* Send to the primary only */
604 					pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
605 				}
606 			}
607 			else
608 			{
609 				/* Send to the primary only */
610 				pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
611 			}
612 		}
613 	}
614 	else if (REPLICATION)
615 	{
616 		if (pool_config->load_balance_mode &&
617 			is_select_query(node, query) &&
618 			MAJOR(backend) == PROTO_MAJOR_V3)
619 		{
620 			/*
621 			 * If a writing function call is used or replicate_select is true,
622 			 * we prefer to send to all nodes.
623 			 */
624 			if (pool_has_function_call(node) || pool_config->replicate_select)
625 			{
626 				pool_setall_node_to_be_sent(query_context);
627 			}
628 			/*
629 			 * If (we are outside of an explicit transaction) OR
630 			 * (the transaction has not issued a write query yet, AND
631 			 *	transaction isolation level is not SERIALIZABLE)
632 			 * we might be able to load balance.
633 			 */
634 			else if (TSTATE(backend, MASTER_NODE_ID) == 'I' ||
635 					 (!pool_is_writing_transaction() &&
636 					  !pool_is_failed_transaction() &&
637 					  pool_get_transaction_isolation() != POOL_SERIALIZABLE))
638 			{
639 				/* load balance */
640 				pool_set_node_to_be_sent(query_context,
641 										 session_context->load_balance_node_id);
642 			}
643 			else
644 			{
645 				/* only send to master node */
646 				pool_set_node_to_be_sent(query_context, REAL_MASTER_NODE_ID);
647 			}
648 		}
649 		else
650 		{
651 			if (is_select_query(node, query) && !pool_config->replicate_select &&
652 				!pool_has_function_call(node))
653 			{
654 				/* only send to master node */
655 				pool_set_node_to_be_sent(query_context, REAL_MASTER_NODE_ID);
656 			}
657 			else
658 			{
659 				/* send to all nodes */
660 				pool_setall_node_to_be_sent(query_context);
661 			}
662 		}
663 	}
664 	else
665 	{
666 		ereport(WARNING,
667 				(errmsg("unknown pgpool-II mode while deciding for where to send query")));
668 		return;
669 	}
670 
671 	/*
672 	 * EXECUTE?
673 	 */
674 	if (IsA(node, ExecuteStmt))
675 	{
676 		POOL_SENT_MESSAGE *msg;
677 
678 		msg = pool_get_sent_message('Q', ((ExecuteStmt *)node)->name, POOL_SENT_MESSAGE_CREATED);
679 		if (!msg)
680 			msg = pool_get_sent_message('P', ((ExecuteStmt *)node)->name, POOL_SENT_MESSAGE_CREATED);
681 		if (msg)
682 			pool_copy_prep_where(msg->query_context->where_to_send,
683 								 query_context->where_to_send);
684 	}
685 
686 	/*
687 	 * DEALLOCATE?
688 	 */
689 	else if (IsA(node, DeallocateStmt))
690 	{
691 		where_to_send_deallocate(query_context, node);
692 	}
693 
694 	for (i=0;i<NUM_BACKENDS;i++)
695 	{
696 		if (query_context->where_to_send[i])
697 		{
698 			query_context->virtual_master_node_id = i;
699 			break;
700 		}
701 	}
702 
703 	return;
704 }
705 
706 /*
707  * Send simple query and wait for response
708  * send_type:
709  *  -1: do not send this node_id
710  *   0: send to all nodes
711  *  >0: send to this node_id
712  */
pool_send_and_wait(POOL_QUERY_CONTEXT * query_context,int send_type,int node_id)713 POOL_STATUS pool_send_and_wait(POOL_QUERY_CONTEXT *query_context,
714 							   int send_type, int node_id)
715 {
716 	POOL_SESSION_CONTEXT *session_context;
717 	POOL_CONNECTION *frontend;
718 	POOL_CONNECTION_POOL *backend;
719 	bool is_commit;
720 	bool is_begin_read_write;
721 	int i;
722 	int len;
723 	char *string;
724 
725 	session_context = pool_get_session_context(false);
726 	frontend = session_context->frontend;
727 	backend = session_context->backend;
728 	is_commit = is_commit_or_rollback_query(query_context->parse_tree);
729 	is_begin_read_write = false;
730 	len = 0;
731 	string = NULL;
732 
733 	/*
734 	 * If the query is BEGIN READ WRITE or
735 	 * BEGIN ... SERIALIZABLE in master/slave mode,
736 	 * we send BEGIN to slaves/standbys instead.
737 	 * original_query which is BEGIN READ WRITE is sent to primary.
738 	 * rewritten_query which is BEGIN is sent to standbys.
739 	 */
740 	if (pool_need_to_treat_as_if_default_transaction(query_context))
741 	{
742 		is_begin_read_write = true;
743 	}
744 	else
745 	{
746 		if (query_context->rewritten_query)
747 		{
748 			len = query_context->rewritten_length;
749 			string = query_context->rewritten_query;
750 		}
751 		else
752 		{
753 			len = query_context->original_length;
754 			string = query_context->original_query;
755 		}
756 	}
757 
758 	/* Send query */
759 	for (i=0;i<NUM_BACKENDS;i++)
760 	{
761 		if (!VALID_BACKEND(i))
762 			continue;
763 		else if (send_type < 0 && i == node_id)
764 			continue;
765 		else if (send_type > 0 && i != node_id)
766 			continue;
767 
768 		/*
769 		 * If in master/slave mode, we do not send COMMIT/ABORT to
770 		 * slaves/standbys if it's in I(idle) state.
771 		 */
772 		if (is_commit && MASTER_SLAVE && !IS_MASTER_NODE_ID(i) && TSTATE(backend, i) == 'I')
773 		{
774 			pool_unset_node_to_be_sent(query_context, i);
775 			continue;
776 		}
777 
778 		/*
779 		 * If in reset context, we send COMMIT/ABORT to nodes those
780 		 * are not in I(idle) state.  This will ensure that
781 		 * transactions are closed.
782 		 */
783 		if (is_commit && session_context->reset_context && TSTATE(backend, i) == 'I')
784 		{
785 			pool_unset_node_to_be_sent(query_context, i);
786 			continue;
787 		}
788 
789 		if (is_begin_read_write)
790 		{
791 			if (REAL_PRIMARY_NODE_ID == i)
792 			{
793 				len = query_context->original_length;
794 				string = query_context->original_query;
795 			}
796 			else
797 			{
798 				len = query_context->rewritten_length;
799 				string = query_context->rewritten_query;
800 			}
801 		}
802 
803 		per_node_statement_log(backend, i, string);
804 		stat_count_up(i, query_context->parse_tree);
805 		send_simplequery_message(CONNECTION(backend, i), len, string, MAJOR(backend));
806 	}
807 
808 	/* Wait for response */
809 	for (i=0;i<NUM_BACKENDS;i++)
810 	{
811 		if (!VALID_BACKEND(i))
812 			continue;
813 		else if (send_type < 0 && i == node_id)
814 			continue;
815 		else if (send_type > 0 && i != node_id)
816 			continue;
817 
818 #ifdef NOT_USED
819 		/*
820 		 * If in master/slave mode, we do not send COMMIT/ABORT to
821 		 * slaves/standbys if it's in I(idle) state.
822 		 */
823 		if (is_commit && MASTER_SLAVE && !IS_MASTER_NODE_ID(i) && TSTATE(backend, i) == 'I')
824 		{
825 			continue;
826 		}
827 #endif
828 
829 		if (is_begin_read_write)
830 		{
831 			if(REAL_PRIMARY_NODE_ID == i)
832 				string = query_context->original_query;
833 			else
834 				string = query_context->rewritten_query;
835 		}
836 
837         wait_for_query_response_with_trans_cleanup(frontend,
838                                                    CONNECTION(backend, i),
839                                                    MAJOR(backend),
840                                                    MASTER_CONNECTION(backend)->pid,
841                                                    MASTER_CONNECTION(backend)->key);
842 
843 		/*
844 		 * Check if some error detected.  If so, emit
845 		 * log. This is useful when invalid encoding error
846 		 * occurs. In this case, PostgreSQL does not report
847 		 * what statement caused that error and make users
848 		 * confused.
849 		 */
850 		per_node_error_log(backend, i, string, "pool_send_and_wait: Error or notice message from backend: ", true);
851 	}
852 
853 	return POOL_CONTINUE;
854 }
855 
856 /*
857  * Send extended query and wait for response
858  * send_type:
859  *  -1: do not send this node_id
860  *   0: send to all nodes
861  *  >0: send to this node_id
862  */
pool_extended_send_and_wait(POOL_QUERY_CONTEXT * query_context,char * kind,int len,char * contents,int send_type,int node_id,bool nowait)863 POOL_STATUS pool_extended_send_and_wait(POOL_QUERY_CONTEXT *query_context,
864 										char *kind, int len, char *contents,
865 										int send_type, int node_id, bool nowait)
866 {
867 	POOL_SESSION_CONTEXT *session_context;
868 	POOL_CONNECTION *frontend;
869 	POOL_CONNECTION_POOL *backend;
870 	bool is_commit;
871 	bool is_begin_read_write;
872 	int i;
873 	int str_len;
874 	int rewritten_len;
875 	char *str;
876 	char *rewritten_begin;
877 
878 	session_context = pool_get_session_context(false);
879 	frontend = session_context->frontend;
880 	backend = session_context->backend;
881 	is_commit = is_commit_or_rollback_query(query_context->parse_tree);
882 	is_begin_read_write = false;
883 	str_len = 0;
884 	rewritten_len = 0;
885 	str = NULL;
886 	rewritten_begin = NULL;
887 
888 	/*
889 	 * If the query is BEGIN READ WRITE or
890 	 * BEGIN ... SERIALIZABLE in master/slave mode,
891 	 * we send BEGIN to slaves/standbys instead.
892 	 * original_query which is BEGIN READ WRITE is sent to primary.
893 	 * rewritten_query which is BEGIN is sent to standbys.
894 	 */
895 	if (pool_need_to_treat_as_if_default_transaction(query_context))
896 	{
897 		is_begin_read_write = true;
898 
899 		if (*kind == 'P')
900 			rewritten_begin = remove_read_write(len, contents, &rewritten_len);
901 	}
902 
903 	if (!rewritten_begin)
904 	{
905 		str_len = len;
906 		str = contents;
907 	}
908 
909 	/* Send query */
910 	for (i=0;i<NUM_BACKENDS;i++)
911 	{
912 		if (!VALID_BACKEND(i))
913 			continue;
914 		else if (send_type < 0 && i == node_id)
915 			continue;
916 		else if (send_type > 0 && i != node_id)
917 			continue;
918 
919 		/*
920 		 * If in reset context, we send COMMIT/ABORT to nodes those
921 		 * are not in I(idle) state.  This will ensure that
922 		 * transactions are closed.
923 		 */
924 		if (is_commit && session_context->reset_context && TSTATE(backend, i) == 'I')
925 		{
926 			pool_unset_node_to_be_sent(query_context, i);
927 			continue;
928 		}
929 
930 		if (rewritten_begin)
931 		{
932 			if (REAL_PRIMARY_NODE_ID == i)
933 			{
934 				str = contents;
935 				str_len = len;
936 			}
937 			else
938 			{
939 				str = rewritten_begin;
940 				str_len = rewritten_len;
941 			}
942 		}
943 
944 		if (pool_config->log_per_node_statement)
945 		{
946 			char msgbuf[QUERY_STRING_BUFFER_LEN];
947 			char *stmt;
948 
949 			if (*kind == 'P' || *kind == 'E')
950 			{
951 				if (query_context->rewritten_query)
952 				{
953 					if (is_begin_read_write)
954 					{
955 						if (REAL_PRIMARY_NODE_ID == i)
956 							stmt = query_context->original_query;
957 						else
958 							stmt = query_context->rewritten_query;
959 					}
960 					else
961 					{
962 						stmt = query_context->rewritten_query;
963 					}
964 				}
965 				else
966 				{
967 					stmt = query_context->original_query;
968 				}
969 
970 				if (*kind == 'P')
971 					snprintf(msgbuf, sizeof(msgbuf), "Parse: %s", stmt);
972 				else
973 					snprintf(msgbuf, sizeof(msgbuf), "Execute: %s", stmt);
974 			}
975 			else
976 			{
977 				snprintf(msgbuf, sizeof(msgbuf), "%c message", *kind);
978 			}
979 
980 			per_node_statement_log(backend, i, msgbuf);
981 		}
982 
983 		/* if Execute message, count up stats count */
984 		if (*kind == 'E')
985 		{
986 			stat_count_up(i, query_context->parse_tree);
987 		}
988 
989 		send_extended_protocol_message(backend, i, kind, str_len, str);
990 
991 		if ((*kind == 'P' || *kind == 'E' || *kind == 'C') && STREAM)
992 		{
993 			/*
994 			 * Send flush message to backend to make sure that we get any response
995 			 * from backend in Streaming replication mode.
996 			 */
997 
998 			POOL_CONNECTION *cp = CONNECTION(backend, i);
999 			int len;
1000 
1001 			pool_write(cp, "H", 1);
1002 			len = htonl(sizeof(len));
1003 			pool_write_and_flush(cp, &len, sizeof(len));
1004 
1005 			ereport(DEBUG1,
1006 					(errmsg("pool_send_and_wait: send flush message to %d", i)));
1007 		}
1008 	}
1009 
1010 	if (!is_begin_read_write)
1011 	{
1012 		if (query_context->rewritten_query)
1013 			str = query_context->rewritten_query;
1014 		else
1015 			str = query_context->original_query;
1016 	}
1017 
1018 	if (!nowait)
1019 	{
1020 		/* Wait for response */
1021 		for (i=0;i<NUM_BACKENDS;i++)
1022 		{
1023 			if (!VALID_BACKEND(i))
1024 				continue;
1025 			else if (send_type < 0 && i == node_id)
1026 				continue;
1027 			else if (send_type > 0 && i != node_id)
1028 				continue;
1029 
1030 			/*
1031 			 * If in master/slave mode, we do not send COMMIT/ABORT to
1032 			 * slaves/standbys if it's in I(idle) state.
1033 			 */
1034 			if (is_commit && MASTER_SLAVE && !IS_MASTER_NODE_ID(i) && TSTATE(backend, i) == 'I')
1035 			{
1036 				continue;
1037 			}
1038 
1039 			if (is_begin_read_write)
1040 			{
1041 				if (REAL_PRIMARY_NODE_ID == i)
1042 					str = query_context->original_query;
1043 				else
1044 					str = query_context->rewritten_query;
1045 			}
1046 
1047 			wait_for_query_response_with_trans_cleanup(frontend,
1048 													   CONNECTION(backend, i),
1049 													   MAJOR(backend),
1050 													   MASTER_CONNECTION(backend)->pid,
1051 													   MASTER_CONNECTION(backend)->key);
1052 
1053 			/*
1054 			 * Check if some error detected.  If so, emit
1055 			 * log. This is useful when invalid encoding error
1056 			 * occurs. In this case, PostgreSQL does not report
1057 			 * what statement caused that error and make users
1058 			 * confused.
1059 			 */
1060 			per_node_error_log(backend, i, str, "pool_send_and_wait: Error or notice message from backend: ", true);
1061 		}
1062 	}
1063 
1064 	if(rewritten_begin)
1065         pfree(rewritten_begin);
1066 	return POOL_CONTINUE;
1067 }
1068 
1069 /*
1070  * From syntactically analysis decide the statement to be sent to the
1071  * primary, the standby or either or both in master/slave+HR/SR mode.
1072  */
send_to_where(Node * node,char * query)1073 static POOL_DEST send_to_where(Node *node, char *query)
1074 
1075 {
1076 /* From storage/lock.h */
1077 #define NoLock					0
1078 #define AccessShareLock			1		/* SELECT */
1079 #define RowShareLock			2		/* SELECT FOR UPDATE/FOR SHARE */
1080 #define RowExclusiveLock		3		/* INSERT, UPDATE, DELETE */
1081 #define ShareUpdateExclusiveLock 4		/* VACUUM (non-FULL),ANALYZE, CREATE
1082 										 * INDEX CONCURRENTLY */
1083 #define ShareLock				5		/* CREATE INDEX (WITHOUT CONCURRENTLY) */
1084 #define ShareRowExclusiveLock	6		/* like EXCLUSIVE MODE, but allows ROW
1085 										 * SHARE */
1086 #define ExclusiveLock			7		/* blocks ROW SHARE/SELECT...FOR
1087 										 * UPDATE */
1088 #define AccessExclusiveLock		8		/* ALTER TABLE, DROP TABLE, VACUUM
1089 										 * FULL, and unqualified LOCK TABLE */
1090 
1091 /* From 9.5 include/nodes/node.h ("TAGS FOR STATEMENT NODES" part) */
1092 	static NodeTag nodemap[] = {
1093 		T_PlannedStmt,
1094 		T_InsertStmt,
1095 		T_DeleteStmt,
1096 		T_UpdateStmt,
1097 		T_SelectStmt,
1098 		T_AlterTableStmt,
1099 		T_AlterTableCmd,
1100 		T_AlterDomainStmt,
1101 		T_SetOperationStmt,
1102 		T_GrantStmt,
1103 		T_GrantRoleStmt,
1104 		T_AlterDefaultPrivilegesStmt,
1105 		T_ClosePortalStmt,
1106 		T_ClusterStmt,
1107 		T_CopyStmt,
1108 		T_CreateStmt,	/* CREATE TABLE */
1109 		T_DefineStmt,	/* CREATE AGGREGATE, OPERATOR, TYPE */
1110 		T_DropStmt,		/* DROP TABLE etc. */
1111 		T_TruncateStmt,
1112 		T_CommentStmt,
1113 		T_FetchStmt,
1114 		T_IndexStmt,	/* CREATE INDEX */
1115 		T_CreateFunctionStmt,
1116 		T_AlterFunctionStmt,
1117 		T_DoStmt,
1118 		T_RenameStmt,	/* ALTER AGGREGATE etc. */
1119 		T_RuleStmt,		/* CREATE RULE */
1120 		T_NotifyStmt,
1121 		T_ListenStmt,
1122 		T_UnlistenStmt,
1123 		T_TransactionStmt,
1124 		T_ViewStmt,		/* CREATE VIEW */
1125 		T_LoadStmt,
1126 		T_CreateDomainStmt,
1127 		T_CreatedbStmt,
1128 		T_DropdbStmt,
1129 		T_VacuumStmt,
1130 		T_ExplainStmt,
1131 		T_CreateTableAsStmt,
1132 		T_CreateSeqStmt,
1133 		T_AlterSeqStmt,
1134 		T_VariableSetStmt,		/* SET */
1135 		T_VariableShowStmt,
1136 		T_DiscardStmt,
1137 		T_CreateTrigStmt,
1138 		T_CreatePLangStmt,
1139 		T_CreateRoleStmt,
1140 		T_AlterRoleStmt,
1141 		T_DropRoleStmt,
1142 		T_LockStmt,
1143 		T_ConstraintsSetStmt,
1144 		T_ReindexStmt,
1145 		T_CheckPointStmt,
1146 		T_CreateSchemaStmt,
1147 		T_AlterDatabaseStmt,
1148 		T_AlterDatabaseSetStmt,
1149 		T_AlterRoleSetStmt,
1150 		T_CreateConversionStmt,
1151 		T_CreateCastStmt,
1152 		T_CreateOpClassStmt,
1153 		T_CreateOpFamilyStmt,
1154 		T_AlterOpFamilyStmt,
1155 		T_PrepareStmt,
1156 		T_ExecuteStmt,
1157 		T_DeallocateStmt,		/* DEALLOCATE */
1158 		T_DeclareCursorStmt,	/* DECLARE */
1159 		T_CreateTableSpaceStmt,
1160 		T_DropTableSpaceStmt,
1161 		T_AlterObjectSchemaStmt,
1162 		T_AlterOwnerStmt,
1163 		T_DropOwnedStmt,
1164 		T_ReassignOwnedStmt,
1165 		T_CompositeTypeStmt,	/* CREATE TYPE */
1166 		T_CreateEnumStmt,
1167 		T_CreateRangeStmt,
1168 		T_AlterEnumStmt,
1169 		T_AlterTSDictionaryStmt,
1170 		T_AlterTSConfigurationStmt,
1171 		T_CreateFdwStmt,
1172 		T_AlterFdwStmt,
1173 		T_CreateForeignServerStmt,
1174 		T_AlterForeignServerStmt,
1175 		T_CreateUserMappingStmt,
1176 		T_AlterUserMappingStmt,
1177 		T_DropUserMappingStmt,
1178 		T_AlterTableSpaceOptionsStmt,
1179 		T_AlterTableMoveAllStmt,
1180 		T_SecLabelStmt,
1181 		T_CreateForeignTableStmt,
1182 		T_ImportForeignSchemaStmt,
1183 		T_CreateExtensionStmt,
1184 		T_AlterExtensionStmt,
1185 		T_AlterExtensionContentsStmt,
1186 		T_CreateEventTrigStmt,
1187 		T_AlterEventTrigStmt,
1188 		T_RefreshMatViewStmt,
1189 		T_ReplicaIdentityStmt,
1190 		T_AlterSystemStmt,
1191 		T_CreatePolicyStmt,
1192 		T_AlterPolicyStmt,
1193 		T_CreateTransformStmt,
1194 	};
1195 
1196 	if (bsearch(&nodeTag(node), nodemap, sizeof(nodemap)/sizeof(nodemap[0]),
1197 				sizeof(NodeTag), compare) != NULL)
1198 	{
1199 		/*
1200 		 * SELECT INTO
1201 		 * SELECT FOR SHARE or UPDATE
1202 		 */
1203 		if (IsA(node, SelectStmt))
1204 		{
1205 			/* SELECT INTO or SELECT FOR SHARE or UPDATE ? */
1206 			if (pool_has_insertinto_or_locking_clause(node))
1207 				return POOL_PRIMARY;
1208 
1209 			/* non-SELECT query in WITH clause ? */
1210 			if (((SelectStmt *)node)->withClause)
1211 			{
1212 				List *ctes = ((SelectStmt *)node)->withClause->ctes;
1213 				ListCell   *cte_item;
1214 				foreach(cte_item, ctes)
1215 				{
1216 					CommonTableExpr *cte = (CommonTableExpr *)lfirst(cte_item);
1217 					if (!IsA(cte->ctequery, SelectStmt))
1218 						return POOL_PRIMARY;
1219 				}
1220 			}
1221 
1222 			return POOL_EITHER;
1223 		}
1224 
1225 		/*
1226 		 * COPY
1227 		 */
1228 		else if (IsA(node, CopyStmt))
1229 		{
1230 			if (((CopyStmt *)node)->is_from)
1231 				return POOL_PRIMARY;
1232 			else
1233 			{
1234 				if (((CopyStmt *)node)->query == NULL)
1235 					return POOL_EITHER;
1236 				else
1237 					return (IsA(((CopyStmt *)node)->query, SelectStmt))?POOL_EITHER:POOL_PRIMARY;
1238 			}
1239 		}
1240 
1241 		/*
1242 		 * LOCK
1243 		 */
1244 		else if (IsA(node, LockStmt))
1245 		{
1246 			return (((LockStmt *)node)->mode >= RowExclusiveLock)?POOL_PRIMARY:POOL_BOTH;
1247 		}
1248 
1249 		/*
1250 		 * Transaction commands
1251 		 */
1252 		else if (IsA(node, TransactionStmt))
1253 		{
1254 			/*
1255 			 * Check "BEGIN READ WRITE" "START TRANSACTION READ WRITE"
1256 			 */
1257 			if (is_start_transaction_query(node))
1258 			{
1259 				/* But actually, we send BEGIN to standby if it's
1260 				   BEGIN READ WRITE or START TRANSACTION READ WRITE */
1261 				if (is_read_write((TransactionStmt *)node))
1262 					return POOL_BOTH;
1263 				/* Other TRANSACTION start commands are sent to both primary
1264 				   and standby */
1265 				else
1266 					return POOL_BOTH;
1267 			}
1268 			/* SAVEPOINT related commands are sent to both primary and standby */
1269 			else if (is_savepoint_query(node))
1270 				return POOL_BOTH;
1271 			/*
1272 			 * 2PC commands
1273 			 */
1274 			else if (is_2pc_transaction_query(node))
1275 				return POOL_PRIMARY;
1276 			else
1277 				/* COMMIT etc. */
1278 				return POOL_BOTH;
1279 		}
1280 
1281 		/*
1282 		 * SET
1283 		 */
1284 		else if (IsA(node, VariableSetStmt))
1285 		{
1286 			ListCell   *list_item;
1287 			bool ret = POOL_BOTH;
1288 
1289 			/*
1290 			 * SET transaction_read_only TO off
1291 			 */
1292 			if (((VariableSetStmt *)node)->kind == VAR_SET_VALUE &&
1293 				!strcmp(((VariableSetStmt *)node)->name, "transaction_read_only"))
1294 			{
1295 				List *options = ((VariableSetStmt *)node)->args;
1296 				foreach(list_item, options)
1297 				{
1298 					A_Const *v = (A_Const *)lfirst(list_item);
1299 
1300 					switch (v->val.type)
1301 					{
1302 						case T_String:
1303 							if (!strcasecmp(v->val.val.str, "off") ||
1304 								!strcasecmp(v->val.val.str, "f") ||
1305 								!strcasecmp(v->val.val.str, "false"))
1306 								ret = POOL_PRIMARY;
1307 							break;
1308 						case T_Integer:
1309 							if (v->val.val.ival)
1310 								ret = POOL_PRIMARY;
1311 						default:
1312 							break;
1313 					}
1314 				}
1315 				return ret;
1316 			}
1317 
1318 			/* SET TRANSACTION ISOLATION LEVEL SERIALIZABLE or
1319 			 * SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL SERIALIZABLE or
1320 			 * SET transaction_isolation TO 'serializable'
1321 			 * SET default_transaction_isolation TO 'serializable'
1322 			 */
1323 			else if (is_set_transaction_serializable(node))
1324 			{
1325 				return POOL_PRIMARY;
1326 			}
1327 
1328 			/*
1329 			 * Check "SET TRANSACTION READ WRITE" "SET SESSION
1330 			 * CHARACTERISTICS AS TRANSACTION READ WRITE"
1331 			 */
1332 			else if (((VariableSetStmt *)node)->kind == VAR_SET_MULTI &&
1333 				(!strcmp(((VariableSetStmt *)node)->name, "TRANSACTION") ||
1334 				 !strcmp(((VariableSetStmt *)node)->name, "SESSION CHARACTERISTICS")))
1335 			{
1336 				List *options = ((VariableSetStmt *)node)->args;
1337 				foreach(list_item, options)
1338 				{
1339 					DefElem *opt = (DefElem *) lfirst(list_item);
1340 
1341 					if (!strcmp("transaction_read_only", opt->defname))
1342 					{
1343 						bool read_only;
1344 
1345 						read_only = ((A_Const *)opt->arg)->val.val.ival;
1346 						if (!read_only)
1347 							return POOL_PRIMARY;
1348 					}
1349 				}
1350 				return POOL_BOTH;
1351 			}
1352 			else
1353 			{
1354 				/*
1355 				 * All other SET command sent to both primary and
1356 				 * standby
1357 				 */
1358 				return POOL_BOTH;
1359 			}
1360 		}
1361 
1362 		/*
1363 		 * DISCARD
1364 		 */
1365 		else if (IsA(node, DiscardStmt))
1366 		{
1367 			return POOL_BOTH;
1368 		}
1369 
1370 		/*
1371 		 * PREPARE
1372 		 */
1373 		else if (IsA(node, PrepareStmt))
1374 		{
1375 			PrepareStmt *prepare_statement = (PrepareStmt *)node;
1376 
1377 			char *string = nodeToString(prepare_statement->query);
1378 
1379 			/* Note that this is a recursive call */
1380 			return send_to_where((Node *)(prepare_statement->query), string);
1381 		}
1382 
1383 		/*
1384 		 * EXECUTE
1385 		 */
1386 		else if (IsA(node, ExecuteStmt))
1387 		{
1388 			/* This is temporary decision. where_to_send will inherit
1389 			 *  same destination AS PREPARE.
1390 			 */
1391 			return POOL_PRIMARY;
1392 		}
1393 
1394 		/*
1395 		 * DEALLOCATE
1396 		 */
1397 		else if (IsA(node, DeallocateStmt))
1398 		{
1399 			/* This is temporary decision. where_to_send will inherit
1400 			 *  same destination AS PREPARE.
1401 			 */
1402 			return POOL_PRIMARY;
1403 		}
1404 		/*
1405 		 * SHOW
1406 		 */
1407 		else if (IsA(node, VariableShowStmt))
1408 		{
1409 			return POOL_EITHER;
1410 		}
1411 
1412 		/*
1413 		 * Other statements are sent to primary
1414 		 */
1415 		return POOL_PRIMARY;
1416 	}
1417 
1418 	/*
1419 	 * All unknown statements are sent to primary
1420 	 */
1421 	return POOL_PRIMARY;
1422 }
1423 
1424 static
where_to_send_deallocate(POOL_QUERY_CONTEXT * query_context,Node * node)1425 void where_to_send_deallocate(POOL_QUERY_CONTEXT *query_context, Node *node)
1426 {
1427 	DeallocateStmt *d = (DeallocateStmt *)node;
1428 	POOL_SENT_MESSAGE *msg;
1429 
1430 	/* DEALLOCATE ALL? */
1431 	if (d->name == NULL)
1432 	{
1433 		pool_setall_node_to_be_sent(query_context);
1434 	}
1435 	else
1436 	{
1437 		msg = pool_get_sent_message('Q', d->name, POOL_SENT_MESSAGE_CREATED);
1438 		if (!msg)
1439 			msg = pool_get_sent_message('P', d->name, POOL_SENT_MESSAGE_CREATED);
1440 		if (msg)
1441 		{
1442 			/* Inherit same map from PREPARE or PARSE */
1443 			pool_copy_prep_where(msg->query_context->where_to_send,
1444 								 query_context->where_to_send);
1445 			return;
1446 		}
1447 		/* prepared statement was not found */
1448 		pool_setall_node_to_be_sent(query_context);
1449 	}
1450 }
1451 
1452 /*
1453  * Returns parse tree for current query.
1454  * Precondition: the query is in progress state.
1455  */
pool_get_parse_tree(void)1456 Node *pool_get_parse_tree(void)
1457 {
1458 	POOL_SESSION_CONTEXT *sc;
1459 
1460 	sc = pool_get_session_context(true);
1461 	if (!sc)
1462 		return NULL;
1463 
1464 	if (pool_is_query_in_progress() && sc->query_context)
1465 	{
1466 		return sc->query_context->parse_tree;
1467 	}
1468 	return NULL;
1469 }
1470 
1471 /*
1472  * Returns raw query string for current query.
1473  * Precondition: the query is in progress state.
1474  */
pool_get_query_string(void)1475 char *pool_get_query_string(void)
1476 {
1477 	POOL_SESSION_CONTEXT *sc;
1478 
1479 	sc = pool_get_session_context(true);
1480 	if (!sc)
1481 		return NULL;
1482 
1483 	if (pool_is_query_in_progress() && sc->query_context)
1484 	{
1485 		return sc->query_context->original_query;
1486 	}
1487 	return NULL;
1488 }
1489 
1490 /*
1491  * Returns true if the query is one of:
1492  *
1493  * SET TRANSACTION ISOLATION LEVEL SERIALIZABLE or
1494  * SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL SERIALIZABLE or
1495  * SET transaction_isolation TO 'serializable'
1496  * SET default_transaction_isolation TO 'serializable'
1497  */
is_set_transaction_serializable(Node * node)1498 bool is_set_transaction_serializable(Node *node)
1499 {
1500 	ListCell   *list_item;
1501 
1502 	if (!IsA(node, VariableSetStmt))
1503 		return false;
1504 
1505 	if (((VariableSetStmt *)node)->kind == VAR_SET_VALUE &&
1506 		(!strcmp(((VariableSetStmt *)node)->name, "transaction_isolation") ||
1507                  !strcmp(((VariableSetStmt *)node)->name, "default_transaction_isolation")))
1508 	{
1509 		List *options = ((VariableSetStmt *)node)->args;
1510 		foreach(list_item, options)
1511 		{
1512 			A_Const *v = (A_Const *)lfirst(list_item);
1513 
1514 			switch (v->val.type)
1515 			{
1516 				case T_String:
1517 					if (!strcasecmp(v->val.val.str, "serializable"))
1518 						return true;
1519 					break;
1520 				default:
1521 					break;
1522 			}
1523 		}
1524 		return false;
1525 	}
1526 
1527 	else if (((VariableSetStmt *)node)->kind == VAR_SET_MULTI &&
1528 			 (!strcmp(((VariableSetStmt *)node)->name, "TRANSACTION") ||
1529 			  !strcmp(((VariableSetStmt *)node)->name, "SESSION CHARACTERISTICS")))
1530 	{
1531 		List *options = ((VariableSetStmt *)node)->args;
1532 		foreach(list_item, options)
1533 		{
1534 			DefElem *opt = (DefElem *) lfirst(list_item);
1535 			if (!strcmp("transaction_isolation", opt->defname) ||
1536 				!strcmp("default_transaction_isolation", opt->defname))
1537 			{
1538 				A_Const *v = (A_Const *)opt->arg;
1539 
1540 				if (!strcasecmp(v->val.val.str, "serializable"))
1541 					return true;
1542 			}
1543 		}
1544 	}
1545 	return false;
1546 }
1547 
1548 /*
1549  * Returns true if SQL is transaction starting command (START
1550  * TRANSACTION or BEGIN)
1551  */
is_start_transaction_query(Node * node)1552 bool is_start_transaction_query(Node *node)
1553 {
1554 	TransactionStmt *stmt;
1555 
1556 	if (node == NULL || !IsA(node, TransactionStmt))
1557 		return false;
1558 
1559 	stmt = (TransactionStmt *)node;
1560 	return stmt->kind == TRANS_STMT_START || stmt->kind == TRANS_STMT_BEGIN;
1561 }
1562 
1563 /*
1564  * Return true if start transaction query with "READ WRITE" option.
1565  */
is_read_write(TransactionStmt * node)1566 bool is_read_write(TransactionStmt *node)
1567 {
1568 	ListCell   *list_item;
1569 
1570 	List *options = node->options;
1571 	foreach(list_item, options)
1572 	{
1573 		DefElem *opt = (DefElem *) lfirst(list_item);
1574 
1575 		if (!strcmp("transaction_read_only", opt->defname))
1576 		{
1577 			bool read_only;
1578 
1579 			read_only = ((A_Const *)opt->arg)->val.val.ival;
1580 			if (read_only)
1581 				return false;	/* TRANSACTION READ ONLY */
1582 			else
1583 				/*
1584 				 * TRANSACTION READ WRITE specified. This sounds a little bit strange,
1585 				 * but actually the parse code works in the way.
1586 				 */
1587 				return true;
1588 		}
1589 	}
1590 
1591 	/*
1592 	 * No TRANSACTION READ ONLY/READ WRITE clause specified.
1593 	 */
1594 	return false;
1595 }
1596 
1597 /*
1598  * Return true if start transaction query with "SERIALIZABLE" option.
1599  */
is_serializable(TransactionStmt * node)1600 bool is_serializable(TransactionStmt *node)
1601 {
1602 	ListCell   *list_item;
1603 
1604 	List *options = node->options;
1605 	foreach(list_item, options)
1606 	{
1607 		DefElem *opt = (DefElem *) lfirst(list_item);
1608 
1609 		if (!strcmp("transaction_isolation", opt->defname) &&
1610 			IsA(opt->arg, A_Const) &&
1611 			((A_Const *)opt->arg)->val.type == T_String &&
1612 			!strcmp("serializable", ((A_Const *)opt->arg)->val.val.str))
1613 				return true;
1614 	}
1615 	return false;
1616 }
1617 
1618 /*
1619  * If the query is BEGIN READ WRITE or
1620  * BEGIN ... SERIALIZABLE in master/slave mode,
1621  * we send BEGIN to slaves/standbys instead.
1622  * original_query which is BEGIN READ WRITE is sent to primary.
1623  * rewritten_query which is BEGIN is sent to standbys.
1624  */
pool_need_to_treat_as_if_default_transaction(POOL_QUERY_CONTEXT * query_context)1625 bool pool_need_to_treat_as_if_default_transaction(POOL_QUERY_CONTEXT *query_context)
1626 {
1627 	return (MASTER_SLAVE &&
1628 			is_start_transaction_query(query_context->parse_tree) &&
1629 			(is_read_write((TransactionStmt *)query_context->parse_tree) ||
1630 			 is_serializable((TransactionStmt *)query_context->parse_tree)));
1631 }
1632 
1633 /*
1634  * Return true if the query is SAVEPOINT related query.
1635  */
is_savepoint_query(Node * node)1636 bool is_savepoint_query(Node *node)
1637 {
1638 	if (((TransactionStmt *)node)->kind == TRANS_STMT_SAVEPOINT ||
1639 		((TransactionStmt *)node)->kind == TRANS_STMT_ROLLBACK_TO ||
1640 		((TransactionStmt *)node)->kind == TRANS_STMT_RELEASE)
1641 		return true;
1642 
1643 	return false;
1644 }
1645 
1646 /*
1647  * Return true if the query is 2PC transaction query.
1648  */
is_2pc_transaction_query(Node * node)1649 bool is_2pc_transaction_query(Node *node)
1650 {
1651 	if (((TransactionStmt *)node)->kind == TRANS_STMT_PREPARE ||
1652 		((TransactionStmt *)node)->kind == TRANS_STMT_COMMIT_PREPARED ||
1653 		((TransactionStmt *)node)->kind == TRANS_STMT_ROLLBACK_PREPARED)
1654 		return true;
1655 
1656 	return false;
1657 }
1658 
1659 /*
1660  * Set query state, if a current state is before it than the specified state.
1661  */
pool_set_query_state(POOL_QUERY_CONTEXT * query_context,POOL_QUERY_STATE state)1662 void pool_set_query_state(POOL_QUERY_CONTEXT *query_context, POOL_QUERY_STATE state)
1663 {
1664 	int i;
1665 
1666 	CHECK_QUERY_CONTEXT_IS_VALID;
1667 
1668 	for (i = 0; i < NUM_BACKENDS; i++)
1669 	{
1670 		if (query_context->where_to_send[i] &&
1671 			statecmp(query_context->query_state[i], state) < 0)
1672 			query_context->query_state[i] = state;
1673 	}
1674 }
1675 
1676 /*
1677  * Return -1, 0 or 1 according to s1 is "before, equal or after" s2 in terms of state
1678  * transition order.
1679  * The State transition order is defined as: UNPARSED < PARSE_COMPLETE < BIND_COMPLETE < EXECUTE_COMPLETE
1680  */
statecmp(POOL_QUERY_STATE s1,POOL_QUERY_STATE s2)1681 int statecmp(POOL_QUERY_STATE s1, POOL_QUERY_STATE s2)
1682 {
1683 	int ret;
1684 
1685 	switch (s2) {
1686 		case POOL_UNPARSED:
1687 			ret = (s1 == s2) ? 0 : 1;
1688 			break;
1689 		case POOL_PARSE_COMPLETE:
1690 			if (s1 == POOL_UNPARSED)
1691 				ret = -1;
1692 			else
1693 				ret = (s1 == s2) ? 0 : 1;
1694 			break;
1695 		case POOL_BIND_COMPLETE:
1696 			if (s1 == POOL_UNPARSED || s1 == POOL_PARSE_COMPLETE)
1697 				ret = -1;
1698 			else
1699 				ret = (s1 == s2) ? 0 : 1;
1700 			break;
1701 		case POOL_EXECUTE_COMPLETE:
1702 			ret = (s1 == s2) ? 0 : -1;
1703 			break;
1704 		default:
1705 			ret = -2;
1706 			break;
1707 	}
1708 
1709 	return ret;
1710 }
1711 
1712 /*
1713  * Remove READ WRITE option from the packet of START TRANSACTION command.
1714  * To free the return value is required.
1715  */
1716 static
remove_read_write(int len,const char * contents,int * rewritten_len)1717 char* remove_read_write(int len, const char* contents, int *rewritten_len)
1718 {
1719 	char *rewritten_query;
1720 	char *rewritten_contents;
1721 	const char *name;
1722 	const char *stmt;
1723 
1724 	rewritten_query = "BEGIN";
1725 	name = contents;
1726 	stmt = contents + strlen(name) + 1;
1727 
1728 	*rewritten_len = len - strlen(stmt) + strlen(rewritten_query);
1729 	if (len < *rewritten_len)
1730 	{
1731         ereport(ERROR,
1732             (errmsg("invalid message length of transaction packet")));
1733 	}
1734 
1735 	rewritten_contents = palloc(*rewritten_len);
1736 
1737 	strcpy(rewritten_contents, name);
1738 	strcpy(rewritten_contents + strlen(name) + 1, rewritten_query);
1739 	memcpy(rewritten_contents + strlen(name) + strlen(rewritten_query) + 2,
1740 		   stmt + strlen(stmt) + 1,
1741 		   len - (strlen(name) + strlen(stmt) + 2));
1742 
1743 	return rewritten_contents;
1744 }
1745 
1746 /*
1747  * Return true if current query is safe to cache.
1748  */
pool_is_cache_safe(void)1749 bool pool_is_cache_safe(void)
1750 {
1751 	POOL_SESSION_CONTEXT *sc;
1752 
1753 	sc = pool_get_session_context(true);
1754 	if (!sc)
1755 		return false;
1756 
1757 	if (pool_is_query_in_progress() && sc->query_context)
1758 	{
1759 		return sc->query_context->is_cache_safe;
1760 	}
1761 	return false;
1762 }
1763 
1764 /*
1765  * Set safe to cache.
1766  */
pool_set_cache_safe(void)1767 void pool_set_cache_safe(void)
1768 {
1769 	POOL_SESSION_CONTEXT *sc;
1770 
1771 	sc = pool_get_session_context(true);
1772 	if (!sc)
1773 		return;
1774 
1775 	if (sc->query_context)
1776 	{
1777 		sc->query_context->is_cache_safe = true;
1778 	}
1779 }
1780 
1781 /*
1782  * Unset safe to cache.
1783  */
pool_unset_cache_safe(void)1784 void pool_unset_cache_safe(void)
1785 {
1786 	POOL_SESSION_CONTEXT *sc;
1787 
1788 	sc = pool_get_session_context(true);
1789 	if (!sc)
1790 		return;
1791 
1792 	if (sc->query_context)
1793 	{
1794 		sc->query_context->is_cache_safe = false;
1795 	}
1796 }
1797 
1798 /*
1799  * Return true if current temporary query cache is exceeded
1800  */
pool_is_cache_exceeded(void)1801 bool pool_is_cache_exceeded(void)
1802 {
1803 	POOL_SESSION_CONTEXT *sc;
1804 
1805 	sc = pool_get_session_context(true);
1806 	if (!sc)
1807 		return false;
1808 
1809 	if (pool_is_query_in_progress() && sc->query_context)
1810 	{
1811 		if (sc->query_context->temp_cache)
1812 			return sc->query_context->temp_cache->is_exceeded;
1813 		return true;
1814 	}
1815 	return false;
1816 }
1817 
1818 /*
1819  * Set current temporary query cache is exceeded
1820  */
pool_set_cache_exceeded(void)1821 void pool_set_cache_exceeded(void)
1822 {
1823 	POOL_SESSION_CONTEXT *sc;
1824 
1825 	sc = pool_get_session_context(true);
1826 	if (!sc)
1827 		return;
1828 
1829 	if (sc->query_context && sc->query_context->temp_cache)
1830 	{
1831 		sc->query_context->temp_cache->is_exceeded = true;
1832 	}
1833 }
1834 
1835 /*
1836  * Unset current temporary query cache is exceeded
1837  */
pool_unset_cache_exceeded(void)1838 void pool_unset_cache_exceeded(void)
1839 {
1840 	POOL_SESSION_CONTEXT *sc;
1841 
1842 	sc = pool_get_session_context(true);
1843 	if (!sc)
1844 		return;
1845 
1846 	if (sc->query_context && sc->query_context->temp_cache)
1847 	{
1848 		sc->query_context->temp_cache->is_exceeded = false;
1849 	}
1850 }
1851 
1852 /*
1853  * Return true if one of followings is true
1854  *
1855  * SET transaction_read_only TO on
1856  * SET TRANSACTION READ ONLY
1857  * SET TRANSACTION CHARACTERISTICS AS TRANSACTION READ ONLY
1858  *
1859  * Note that if the node is not a variable statement, returns false.
1860  */
pool_is_transaction_read_only(Node * node)1861 bool pool_is_transaction_read_only(Node *node)
1862 {
1863 	ListCell   *list_item;
1864 	bool ret = false;
1865 
1866 	if (!IsA(node, VariableSetStmt))
1867 		return ret;
1868 
1869 	/*
1870 	 * SET transaction_read_only TO on
1871 	 */
1872 	if (((VariableSetStmt *)node)->kind == VAR_SET_VALUE &&
1873 		!strcmp(((VariableSetStmt *)node)->name, "transaction_read_only"))
1874 	{
1875 		List *options = ((VariableSetStmt *)node)->args;
1876 		foreach(list_item, options)
1877 		{
1878 			A_Const *v = (A_Const *)lfirst(list_item);
1879 
1880 			switch (v->val.type)
1881 			{
1882 				case T_String:
1883 					if (!strcasecmp(v->val.val.str, "on") ||
1884 						!strcasecmp(v->val.val.str, "t") ||
1885 						!strcasecmp(v->val.val.str, "true"))
1886 						ret = true;
1887 					break;
1888 				case T_Integer:
1889 					if (v->val.val.ival)
1890 						ret = true;
1891 				default:
1892 					break;
1893 			}
1894 		}
1895 	}
1896 
1897 	/*
1898 	 * SET SESSION CHARACTERISTICS AS TRANSACTION READ ONLY
1899 	 * SET TRANSACTION READ ONLY
1900 	 */
1901 	else if (((VariableSetStmt *)node)->kind == VAR_SET_MULTI &&
1902 			 (!strcmp(((VariableSetStmt *)node)->name, "TRANSACTION") ||
1903 			  !strcmp(((VariableSetStmt *)node)->name, "SESSION CHARACTERISTICS")))
1904 	{
1905 		List *options = ((VariableSetStmt *)node)->args;
1906 		foreach(list_item, options)
1907 		{
1908 			DefElem *opt = (DefElem *) lfirst(list_item);
1909 
1910 			if (!strcmp("transaction_read_only", opt->defname))
1911 			{
1912 				bool read_only;
1913 
1914 				read_only = ((A_Const *)opt->arg)->val.val.ival;
1915 				if (read_only)
1916 				{
1917 					ret = true;
1918 					break;
1919 				}
1920 			}
1921 		}
1922 	}
1923 	return ret;
1924 }
1925