1 /* -*-pgsql-c-*- */
2 /*
3  * $Header$
4  *
5  * pgpool: a language independent connection pool server for PostgreSQL
6  * written by Tatsuo Ishii
7  *
8  * Copyright (c) 2003-2021	PgPool Global Development Group
9  *
10  * Permission to use, copy, modify, and distribute this software and
11  * its documentation for any purpose and without fee is hereby
12  * granted, provided that the above copyright notice appear in all
13  * copies and that both that copyright notice and this permission
14  * notice appear in supporting documentation, and that the name of the
15  * author not be used in advertising or publicity pertaining to
16  * distribution of the software without specific, written prior
17  * permission. The author makes no representations about the
18  * suitability of this software for any purpose.  It is provided "as
19  * is" without express or implied warranty.
20  *
21  *---------------------------------------------------------------------
22  * pool_proto_modules.c: modules corresponding to message protocols.
23  * used by pool_process_query()
24  *---------------------------------------------------------------------
25  */
26 #include "config.h"
27 #include <errno.h>
28 
29 #ifdef HAVE_SYS_TYPES_H
30 #include <sys/types.h>
31 #endif
32 #ifdef HAVE_SYS_TIME_H
33 #include <sys/time.h>
34 #endif
35 #ifdef HAVE_SYS_SELECT_H
36 #include <sys/select.h>
37 #endif
38 
39 
40 #include <stdlib.h>
41 #include <unistd.h>
42 #include <string.h>
43 #include <netinet/in.h>
44 #include <ctype.h>
45 
46 #include "pool.h"
47 #include "rewrite/pool_timestamp.h"
48 #include "rewrite/pool_lobj.h"
49 #include "protocol/pool_proto_modules.h"
50 #include "pool_config.h"
51 #include "parser/pool_string.h"
52 #include "context/pool_session_context.h"
53 #include "context/pool_query_context.h"
54 #include "utils/elog.h"
55 #include "utils/pool_select_walker.h"
56 #include "utils/pool_relcache.h"
57 #include "utils/pool_stream.h"
58 #include "query_cache/pool_memqcache.h"
59 #include "utils/pool_signal.h"
60 #include "utils/palloc.h"
61 #include "utils/memutils.h"
62 #include "pool_config_variables.h"
63 
64 char *copy_table = NULL;  /* copy table name */
65 char *copy_schema = NULL;  /* copy table name */
66 char copy_delimiter; /* copy delimiter char */
67 char *copy_null = NULL; /* copy null string */
68 
69 /*
70  * Non 0 if allow to close internal transaction.  This variable was
71  * introduced on 2008/4/3 not to close an internal transaction when
72  * Sync message is received after receiving Parse message. This hack
73  * is for PHP-PDO.
74  */
75 static int allow_close_transaction = 1;
76 
77 int is_select_pgcatalog = 0;
78 int is_select_for_update = 0; /* 1 if SELECT INTO or SELECT FOR UPDATE */
79 
80 /*
81  * last query string sent to simpleQuery()
82  */
83 char query_string_buffer[QUERY_STRING_BUFFER_LEN];
84 
85 static int check_errors(POOL_CONNECTION_POOL *backend, int backend_id);
86 static void generate_error_message(char *prefix, int specific_error, char *query);
87 static POOL_STATUS parse_before_bind(POOL_CONNECTION *frontend,
88 									 POOL_CONNECTION_POOL *backend,
89 									 POOL_SENT_MESSAGE *message,
90 									 POOL_SENT_MESSAGE *bind_message);
91 static int* find_victim_nodes(int *ntuples, int nmembers, int master_node, int *number_of_nodes);
92 static POOL_STATUS close_standby_transactions(POOL_CONNECTION *frontend,
93 											  POOL_CONNECTION_POOL *backend);
94 
95 static char *
96 flatten_set_variable_args(const char *name, List *args);
97 static bool
98 process_pg_terminate_backend_func(POOL_QUERY_CONTEXT *query_context);
99 static void pool_wait_till_ready_for_query(POOL_CONNECTION_POOL *backend);
100 static void pool_discard_except_sync_and_ready_for_query(POOL_CONNECTION *frontend,
101 														 POOL_CONNECTION_POOL *backend);
102 
103 /*
104  * This is the workhorse of processing the pg_terminate_backend function to
105  * make sure that the use of function should not trigger the backend node failover.
106  *
107  * The function searches for the pg_terminate_backend() function call in the
108  * query parse tree and if the search comes out to be successful,
109  * the next step is to locate the pgpool-II child process and a backend node
110  * of that connection whose PID is specified in pg_terminate_backend argument.
111  *
112  * Once the connection is identified, we set the swallow_termination flag of
113  * that connection (in shared memory) and also sets the query destination to
114  * the same backend that hosts the connection.
115  *
116  * The function returns true on success, i.e. when the query contains the
117  * pg_terminate_backend call and that call refers to the backend
118  * connection that belongs to pgpool-II.
119  *
120  * Note:  Since upon successful return this function has already
121  * set the destination backend node for the current query,
122  * so for that case pool_where_to_send() should not be called.
123  *
124  */
process_pg_terminate_backend_func(POOL_QUERY_CONTEXT * query_context)125 static bool process_pg_terminate_backend_func(POOL_QUERY_CONTEXT *query_context)
126 {
127 	/*
128 	 * locate pg_terminate_backend and get the pid argument, if pg_terminate_backend
129 	 * is present in the query
130 	 */
131 	int backend_pid = pool_get_terminate_backend_pid(query_context->parse_tree);
132 	if (backend_pid > 0)
133 	{
134 		int backend_node = 0;
135 		ConnectionInfo* conn = pool_coninfo_backend_pid(backend_pid, &backend_node);
136 		if(conn == NULL)
137 		{
138 			ereport(LOG,
139 				(errmsg("found the pg_terminate_backend request for backend pid:%d, but the backend connection does not belong to pgpool-II",backend_pid)));
140 			/* we are not able to find the backend connection with the pid
141 			 * so there is not much left for us to do here
142 			 */
143 			return false;
144 		}
145 		ereport(LOG,
146 			(errmsg("found the pg_terminate_backend request for backend pid:%d on backend node:%d",backend_pid,backend_node),
147 				 errdetail("setting the connection flag")));
148 
149 		pool_set_connection_will_be_terminated(conn);
150 		/* It was the pg_terminate_backend call so send the query to appropriate backend */
151 		query_context->pg_terminate_backend_conn = conn;
152 		pool_force_query_node_to_backend(query_context, backend_node);
153 		return true;
154 	}
155 	return false;
156 }
157 
158 /*
159  * Process Query('Q') message
160  * Query messages include an SQL string.
161  */
SimpleQuery(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend,int len,char * contents)162 POOL_STATUS SimpleQuery(POOL_CONNECTION *frontend,
163 						POOL_CONNECTION_POOL *backend, int len, char *contents)
164 {
165 	static char *sq_config = "pool_status";
166 	static char *sq_pools = "pool_pools";
167 	static char *sq_processes = "pool_processes";
168  	static char *sq_nodes = "pool_nodes";
169  	static char *sq_version = "pool_version";
170  	static char *sq_cache = "pool_cache";
171 	int commit;
172 	List *parse_tree_list;
173 	Node *node = NULL;
174 	POOL_STATUS status;
175 	int lock_kind;
176 	bool is_likely_select = false;
177 	int specific_error = 0;
178 
179 	POOL_SESSION_CONTEXT *session_context;
180 	POOL_QUERY_CONTEXT *query_context;
181 
182 	bool error;
183 
184 	/* Get session context */
185 	session_context = pool_get_session_context(false);
186 
187 	/* save last query string for logging purpose */
188 	strlcpy(query_string_buffer, contents, sizeof(query_string_buffer));
189 
190 	/* show ps status */
191 	query_ps_status(contents, backend);
192 
193 	/* log query to log file if necessary */
194 	if (pool_config->log_statement)
195         ereport(pool_config->log_statement? LOG: DEBUG1,(errmsg("statement: %s", contents)));
196 
197 	/*
198 	 * Fetch memory cache if possible
199 	 */
200 	is_likely_select = pool_is_likely_select(contents);
201 
202 	/*
203 	 * If memory query cache enabled and the query seems to be a
204 	 * SELECT use query cache if possible. However if we are in an
205 	 * explicit transaction and we had writing query before, we should
206 	 * not use query cache. This means that even the writing query is
207 	 * not anything related to the table which is used the SELECT, we
208 	 * do not use cache. Of course we could analyze the SELECT to see
209 	 * if it uses the table modified in the transaction, but it will
210 	 * need parsing query and accessing to system catalog, which will
211 	 * add significant overhead. Moreover if we are in aborted
212 	 * transaction, commands should be ignored, so we should not use
213 	 * query cache.
214 	 */
215 	if (pool_config->memory_cache_enabled && is_likely_select &&
216 		!pool_is_writing_transaction() &&
217 		TSTATE(backend, MASTER_SLAVE ? PRIMARY_NODE_ID : REAL_MASTER_NODE_ID) != 'E')
218 	{
219 		bool foundp;
220 
221 		/* If the query is SELECT from table to cache, try to fetch cached result. */
222 		status = pool_fetch_from_memory_cache(frontend, backend, contents, &foundp);
223 
224 		if (status != POOL_CONTINUE)
225 			return status;
226 
227 		if (foundp)
228 		{
229 			pool_ps_idle_display(backend);
230 			pool_set_skip_reading_from_backends();
231 			pool_stats_count_up_num_cache_hits();
232 			return POOL_CONTINUE;
233 		}
234 	}
235 
236 	/* Create query context */
237 	query_context = pool_init_query_context();
238 	MemoryContext old_context = MemoryContextSwitchTo(query_context->memory_context);
239 
240 	/* parse SQL string */
241 	parse_tree_list = raw_parser(contents, &error);
242 
243 	if (parse_tree_list == NIL)
244 	{
245 		/* is the query empty? */
246 		if (*contents == '\0' || *contents == ';' || error == false)
247 		{
248 			/*
249 			 * JBoss sends empty queries for checking connections.
250 			 * We replace the empty query with SELECT command not
251 			 * to affect load balance.
252 			 * [Pgpool-general] Confused about JDBC and load balancing
253 			 */
254 			parse_tree_list = raw_parser(POOL_DUMMY_READ_QUERY, &error);
255 		}
256 		else
257 		{
258 			/*
259 			 * Unable to parse the query. Probably syntax error or the
260 			 * query is too new and our parser cannot understand. Treat as
261 			 * if it were an DELETE command. Note that the DELETE command
262 			 * does not execute, instead the original query will be sent
263 			 * to backends, which may or may not cause an actual syntax errors.
264 			 * The command will be sent to all backends in replication mode
265 			 * or master/primary in master/slave mode.
266 			 */
267 			if (!strcmp(remote_host, "[local]"))
268 			{
269 				ereport(LOG,
270 					(errmsg("Unable to parse the query: \"%s\" from local client", contents)));
271 			}
272 			else
273 			{
274 				ereport(LOG,
275 						(errmsg("Unable to parse the query: \"%s\" from client %s(%s)", contents, remote_host, remote_port)));
276 			}
277 			parse_tree_list = raw_parser(POOL_DUMMY_WRITE_QUERY, &error);
278 			query_context->is_parse_error = true;
279 		}
280 	}
281 	MemoryContextSwitchTo(old_context);
282 
283 	if (parse_tree_list != NIL)
284 	{
285 		/*
286 		 * XXX: Currently we only process the first element of the parse tree.
287 		 * rest of multiple statements are silently discarded.
288 		 */
289 		node = (Node *) lfirst(list_head(parse_tree_list));
290 		/*
291 		 * Start query context
292 		 */
293 		pool_start_query(query_context, contents, len, node);
294 
295 		/*
296 		 * If the query is DROP DATABASE, after executing it, cache files directory must be discarded.
297 		 * So we have to get the DB's oid before it will be DROPped.
298 		 */
299 		if (pool_config->memory_cache_enabled && is_drop_database(node))
300 		{
301 			DropdbStmt *stmt = (DropdbStmt *)node;
302 			query_context->dboid = pool_get_database_oid_from_dbname(stmt->dbname);
303 			if (query_context->dboid != 0)
304 			{
305 				ereport(DEBUG1,
306 						(errmsg("DB's oid to discard its cache directory: dboid = %d", query_context->dboid)));
307 			}
308 		}
309 
310 		/*
311 		 * Check if multi statement query
312 		 */
313 		if (parse_tree_list && list_length(parse_tree_list) > 1)
314 		{
315 			query_context->is_multi_statement = true;
316 		}
317 		else
318 		{
319 			query_context->is_multi_statement = false;
320 		}
321 
322 		/* check COPY FROM STDIN
323 		 * if true, set copy_* variable
324 		 */
325 		check_copy_from_stdin(node);
326 
327 		if (IsA(node, PgpoolVariableShowStmt))
328 		{
329 			VariableShowStmt *vnode = (VariableShowStmt *)node;
330 
331 			report_config_variable(frontend, backend, vnode->name);
332 
333 			pool_ps_idle_display(backend);
334 			pool_query_context_destroy(query_context);
335 			pool_set_skip_reading_from_backends();
336 			return POOL_CONTINUE;
337 		}
338 
339 		if (IsA(node, PgpoolVariableSetStmt))
340 		{
341 			VariableSetStmt *vnode = (VariableSetStmt *)node;
342 			const char *value = NULL;
343 			if (vnode->kind == VAR_SET_VALUE)
344 			{
345 				value = flatten_set_variable_args("name", vnode->args);
346 			}
347 			if (vnode->kind == VAR_RESET_ALL)
348 			{
349 				reset_all_variables(frontend, backend);
350 			}
351 			else
352 			{
353 				set_config_option_for_session(frontend, backend, vnode->name, value);
354 			}
355 
356 			pool_ps_idle_display(backend);
357 			pool_query_context_destroy(query_context);
358 			pool_set_skip_reading_from_backends();
359 			return POOL_CONTINUE;
360 		}
361 
362 		/* status reporting? */
363 		if (IsA(node, VariableShowStmt))
364 		{
365 			bool is_valid_show_command = false;
366 			VariableShowStmt *vnode = (VariableShowStmt *)node;
367 
368 			if (!strcmp(sq_config, vnode->name))
369             {
370 				is_valid_show_command = true;
371 				ereport(DEBUG1,
372 					(errmsg("SimpleQuery"),
373 						 errdetail("config reporting")));
374                 config_reporting(frontend, backend);
375             }
376 			else if (!strcmp(sq_pools, vnode->name))
377             {
378 				is_valid_show_command = true;
379 				ereport(DEBUG1,
380 					(errmsg("SimpleQuery"),
381 						 errdetail("pools reporting")));
382 
383                 pools_reporting(frontend, backend);
384             }
385 			else if (!strcmp(sq_processes, vnode->name))
386             {
387 				is_valid_show_command = true;
388 				ereport(DEBUG1,
389 					(errmsg("SimpleQuery"),
390 						 errdetail("processes reporting")));
391                 processes_reporting(frontend, backend);
392             }
393 			else if (!strcmp(sq_nodes, vnode->name))
394             {
395 				is_valid_show_command = true;
396 				ereport(DEBUG1,
397 					(errmsg("SimpleQuery"),
398 						 errdetail("nodes reporting")));
399                 nodes_reporting(frontend, backend);
400             }
401 			else if (!strcmp(sq_version, vnode->name))
402             {
403 				is_valid_show_command = true;
404 				ereport(DEBUG1,
405 					(errmsg("SimpleQuery"),
406 						 errdetail("version reporting")));
407                 version_reporting(frontend, backend);
408             }
409 			else if (!strcmp(sq_cache, vnode->name))
410             {
411 				is_valid_show_command = true;
412 				ereport(DEBUG1,
413 					(errmsg("SimpleQuery"),
414 						 errdetail("cache reporting")));
415                 cache_reporting(frontend, backend);
416             }
417 
418 			if (is_valid_show_command)
419 			{
420 				pool_ps_idle_display(backend);
421 				pool_query_context_destroy(query_context);
422 				pool_set_skip_reading_from_backends();
423 				return POOL_CONTINUE;
424 			}
425 		}
426 
427 		/*
428 		 * If the table is to be cached, set is_cache_safe TRUE and register table oids.
429 		 */
430 		if (pool_config->memory_cache_enabled && query_context->is_multi_statement == false)
431 		{
432 			bool is_select_query = false;
433 			int num_oids;
434 			int *oids;
435 			int i;
436 
437 			/* Check if the query is actually SELECT */
438 			if (is_likely_select && IsA(node, SelectStmt))
439 			{
440 				is_select_query = true;
441 			}
442 
443 			/* Switch the flag of is_cache_safe in session_context */
444 			if (is_select_query && !query_context->is_parse_error &&
445 				pool_is_allow_to_cache(query_context->parse_tree,
446 									   query_context->original_query))
447 			{
448 				pool_set_cache_safe();
449 			}
450 			else
451 			{
452 				pool_unset_cache_safe();
453 			}
454 
455 			/*
456 			 * If table is to be cached and the query is DML, save the table
457 			 * oid
458 			 */
459 			if (!query_context->is_parse_error)
460 			{
461 				num_oids = pool_extract_table_oids(node, &oids);
462 
463 				if (num_oids > 0)
464 				{
465 					/* Save to oid buffer */
466 					for (i=0;i<num_oids;i++)
467 					{
468 						pool_add_dml_table_oid(oids[i]);
469 					}
470 				}
471 			}
472 		}
473 
474 		/*
475 		 * pg_terminate function needs special handling, process it if the query
476 		 * contains one, otherwise use pool_where_to_send() to decide destination
477 		 * backend node for the query
478 		 */
479 		if (process_pg_terminate_backend_func(query_context) == false)
480 		{
481 			/*
482 			 * Decide where to send query
483 			 */
484 			pool_where_to_send(query_context, query_context->original_query,
485 							   query_context->parse_tree);
486 		}
487 		/*
488 		 * if this is DROP DATABASE command, send USR1 signal to parent and
489 		 * ask it to close all idle connections.
490 		 * XXX This is overkill. It would be better to close the idle
491 		 * connection for the database which DROP DATABASE command tries
492 		 * to drop. This is impossible at this point, since we have no way
493 		 * to pass such info to other processes.
494 		 */
495 		if (is_drop_database(node))
496 		{
497 			struct timeval stime;
498 
499 			stime.tv_usec = 0;
500 			stime.tv_sec = 5;	/* XXX give arbitrary time to allow
501 									 * closing idle connections */
502 			ereport(DEBUG1,
503 					(errmsg("Query: sending SIGUSR1 signal to parent")));
504 
505 			ignore_sigusr1 = 1;	/* disable SIGUSR1 handler */
506 			register_node_operation_request(CLOSE_IDLE_REQUEST, NULL, 0, false, 0);
507 
508 			/*
509 			 * We need to loop over here since we might get some signals while
510 			 * sleeping
511 			 */
512 			for (;;)
513 			{
514 				int sts;
515 
516 				errno = 0;
517 				sts = select(0, NULL, NULL, NULL, &stime);
518 				if (stime.tv_usec == 0 && stime.tv_sec == 0)
519 					break;
520 				if (sts != 0 && errno != EINTR)
521 				{
522 					elog(DEBUG1, "select(2) returns error: %s", strerror(errno));
523 					break;
524 				}
525 			}
526 
527 			ignore_sigusr1 = 0;
528 		}
529 
530 		/*
531 		 * determine if we need to lock the table
532 		 * to keep SERIAL data consistency among servers
533 		 * conditions:
534 		 * - replication is enabled
535 		 * - protocol is V3
536 		 * - statement is INSERT
537 		 * - either "INSERT LOCK" comment exists or insert_lock directive specified
538 		 */
539 		if (!RAW_MODE && !STREAM)
540 		{
541 			/*
542 			 * If there's only one node to send the command, there's no
543 			 * point to start a transaction.
544 			 */
545 			if (pool_multi_node_to_be_sent(query_context))
546 			{
547 				/* start a transaction if needed */
548 				start_internal_transaction(frontend, backend, (Node *)node);
549 
550 				/* check if need lock */
551 				lock_kind = need_insert_lock(backend, contents, node);
552 				if (lock_kind)
553 				{
554 					/* if so, issue lock command */
555 					status = insert_lock(frontend, backend, contents, (InsertStmt *)node, lock_kind);
556 					if (status != POOL_CONTINUE)
557 					{
558 						pool_query_context_destroy(query_context);
559 						return status;
560 					}
561 				}
562 			}
563 		}
564 		else if (REPLICATION && contents == NULL && start_internal_transaction(frontend, backend, node))
565 		{
566 			pool_query_context_destroy(query_context);
567 			return POOL_ERROR;
568 		}
569 	}
570 
571 	if (MAJOR(backend) == PROTO_MAJOR_V2 && is_start_transaction_query(node))
572 	{
573 		int i;
574 
575 		for (i=0;i<NUM_BACKENDS;i++)
576 		{
577 			if(VALID_BACKEND(i))
578 				TSTATE(backend, i) = 'T';
579 		}
580 	}
581 
582 	if (node)
583 	{
584 		POOL_SENT_MESSAGE *msg = NULL;
585 
586 		if (IsA(node, PrepareStmt))
587 		{
588 			msg = pool_create_sent_message('Q', len, contents, 0,
589 										   ((PrepareStmt *)node)->name,
590 										   query_context);
591 			session_context->uncompleted_message =  msg;
592 		}
593 		else
594 			session_context->uncompleted_message = NULL;
595 	}
596 
597 
598 	if (!RAW_MODE)
599 	{
600 		/* check if query is "COMMIT" or "ROLLBACK" */
601 		commit = is_commit_or_rollback_query(node);
602 
603 		/*
604 		 * Query is not commit/rollback
605 		 */
606 		if (!commit)
607 		{
608 			char	   *rewrite_query = NULL;
609 
610 			if (node)
611 		   	{
612 				POOL_SENT_MESSAGE *msg = NULL;
613 
614 				if (IsA(node, PrepareStmt))
615 				{
616 					msg = session_context->uncompleted_message;
617 				}
618 				else if (IsA(node, ExecuteStmt))
619 				{
620 					msg = pool_get_sent_message('Q', ((ExecuteStmt *)node)->name, POOL_SENT_MESSAGE_CREATED);
621 					if (!msg)
622 						msg = pool_get_sent_message('P', ((ExecuteStmt *)node)->name, POOL_SENT_MESSAGE_CREATED);
623 				}
624 
625 				/* rewrite `now()' to timestamp literal */
626 				if (!is_select_query(node, query_context->original_query) ||
627 					pool_has_function_call(node) || pool_config->replicate_select)
628 					rewrite_query = rewrite_timestamp(backend, query_context->parse_tree, false, msg);
629 
630 				/*
631 				 * If the query is BEGIN READ WRITE or
632 				 * BEGIN ... SERIALIZABLE in master/slave mode,
633 				 * we send BEGIN to slaves/standbys instead.
634 				 * original_query which is BEGIN READ WRITE is sent to primary.
635 				 * rewritten_query which is BEGIN is sent to standbys.
636 				 */
637 				if (pool_need_to_treat_as_if_default_transaction(query_context))
638 				{
639 					rewrite_query = pstrdup("BEGIN");
640 				}
641 
642 				if (rewrite_query != NULL)
643 				{
644 					query_context->rewritten_query = rewrite_query;
645 					query_context->rewritten_length = strlen(rewrite_query) + 1;
646 				}
647 			}
648 
649 			/*
650 			 * Optimization effort: If there's only one session, we do
651 			 * not need to wait for the master node's response, and
652 			 * could execute the query concurrently.
653 			 */
654 			if (pool_config->num_init_children == 1)
655 			{
656 				/* Send query to all DB nodes at once */
657 				status = pool_send_and_wait(query_context, 0, 0);
658 				/* free_parser(); */
659 				return status;
660 			}
661 
662 			/* Send the query to master node */
663 			pool_send_and_wait(query_context, 1, MASTER_NODE_ID);
664 
665 			/* Check specific errors */
666 			specific_error = check_errors(backend, MASTER_NODE_ID);
667 			if (specific_error)
668 			{
669 				/* log error message */
670 				generate_error_message("SimpleQuery: ", specific_error, contents);
671 			}
672 		}
673 
674 		if (specific_error)
675 		{
676 			char msg[1024] = POOL_ERROR_QUERY; /* large enough */
677 			int len = strlen(msg) + 1;
678 
679 			memset(msg + len, 0, sizeof(int));
680 
681 			/* send query to other nodes */
682 			query_context->rewritten_query = msg;
683 			query_context->rewritten_length = len;
684 			pool_send_and_wait(query_context, -1, MASTER_NODE_ID);
685 		}
686 		else
687 		{
688 			/*
689 			 * Send the query to other than master node.
690 			 */
691 			pool_send_and_wait(query_context, -1, MASTER_NODE_ID);
692 		}
693 
694 		/* Send "COMMIT" or "ROLLBACK" to only master node if query is "COMMIT" or "ROLLBACK" */
695 		if (commit)
696 			pool_send_and_wait(query_context, 1, MASTER_NODE_ID);
697 	}
698 	else
699 	{
700 		pool_send_and_wait(query_context, 1, MASTER_NODE_ID);
701 	}
702 
703 	return POOL_CONTINUE;
704 }
705 
706 /*
707  * process EXECUTE (V3 only)
708  */
Execute(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend,int len,char * contents)709 POOL_STATUS Execute(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend,
710 					int len, char *contents)
711 {
712 	int commit = 0;
713 	char *query = NULL;
714 	Node *node;
715 	int specific_error = 0;
716 	POOL_SESSION_CONTEXT *session_context;
717 	POOL_QUERY_CONTEXT *query_context;
718 	POOL_SENT_MESSAGE *bind_msg;
719 	bool foundp = false;
720 
721 	/* Get session context */
722 	session_context = pool_get_session_context(false);
723 
724 	ereport(DEBUG2,
725             (errmsg("Execute: portal name <%s>", contents)));
726 
727 	bind_msg = pool_get_sent_message('B', contents, POOL_SENT_MESSAGE_CREATED);
728 	if (!bind_msg)
729         ereport(FATAL,
730             (return_code(2),
731              errmsg("unable to Execute"),
732                 errdetail("unable to get bind message")));
733 
734 	if(!bind_msg->query_context)
735         ereport(FATAL,
736                 (return_code(2),
737                  errmsg("unable to Execute"),
738                  errdetail("unable to get query context")));
739 
740 	if (!bind_msg->query_context->original_query)
741         ereport(FATAL,
742             (return_code(2),
743                 errmsg("unable to Execute"),
744                  errdetail("unable to get original query")));
745 
746 	if (!bind_msg->query_context->parse_tree)
747         ereport(FATAL,
748             (return_code(2),
749                 errmsg("unable to Execute"),
750                  errdetail("unable to get parse tree")));
751 
752 	query_context = bind_msg->query_context;
753 	node = bind_msg->query_context->parse_tree;
754 	query = bind_msg->query_context->original_query;
755 
756 	strlcpy(query_string_buffer, query, sizeof(query_string_buffer));
757 
758 	ereport(DEBUG2,(errmsg("Execute: query string = <%s>", query)));
759 
760 	ereport(DEBUG1,(errmsg("Execute: pool_is_likely_select: %d pool_is_writing_transaction: %d TSTATE: %c",
761 						   pool_is_likely_select(query), pool_is_writing_transaction(),
762 						   TSTATE(backend, MASTER_SLAVE ? PRIMARY_NODE_ID : REAL_MASTER_NODE_ID))));
763 
764 	/*
765 	 * Fetch memory cache if possible
766 	 */
767 	if (pool_config->memory_cache_enabled && pool_is_likely_select(query) &&
768 		!pool_is_writing_transaction() &&
769 		(TSTATE(backend, MASTER_SLAVE ? PRIMARY_NODE_ID : REAL_MASTER_NODE_ID) != 'E'))
770 	{
771 		POOL_STATUS status;
772 		char *search_query = NULL;
773 		int len;
774 
775 #define STR_ALLOC_SIZE 1024
776 
777 		len = strlen(query)+1;
778 		search_query = MemoryContextStrdup(query_context->memory_context,query);
779 
780 		ereport(DEBUG1,(errmsg("Execute: checkig cache fetch condition")));
781 
782 		/*
783 		 * Add bind message's info to query to search.
784 		 */
785 		if (query_context->is_cache_safe && bind_msg->param_offset && bind_msg->contents)
786 		{
787 			/* Extract binary contents from bind message */
788 			char *query_in_bind_msg = bind_msg->contents + bind_msg->param_offset;
789 			char hex_str[4];  /* 02X chars + white space + null end */
790 			int i;
791 			int alloc_len;
792 
793 			alloc_len = (len/STR_ALLOC_SIZE+1)*STR_ALLOC_SIZE;
794 			search_query = repalloc(search_query, alloc_len);
795 
796 			for (i = 0; i < bind_msg->len - bind_msg->param_offset; i++)
797 			{
798 				int hexlen;
799 
800 				snprintf(hex_str, sizeof(hex_str), (i == 0) ? " %02X" : "%02X", 0xff & query_in_bind_msg[i]);
801 				hexlen = strlen(hex_str);
802 
803 				if ((len+hexlen) >= alloc_len)
804 				{
805 					alloc_len += STR_ALLOC_SIZE;
806 					search_query = repalloc(search_query, alloc_len);
807 				}
808 				strcat(search_query, hex_str);
809 				len += hexlen;
810 			}
811 
812 			/*
813 			 * If bind message is sent again to an existing prepared statement,
814 			 * it is possible that query_w_hex remains. Before setting newly
815 			 * allocated query_w_hex's pointer to the query context, free the
816 			 * previously allocated memory.
817 			 */
818 			if (query_context->query_w_hex)
819 			{
820 				pfree(query_context->query_w_hex);
821 			}
822 			query_context->query_w_hex = search_query;
823 
824 			/*
825 			 * When a transaction is committed, query_context->temp_cache->query is used
826 			 * to create md5 hash to search for query cache.
827 			 * So overwrite the query text in temp cache to the one with the hex of bind message.
828 			 * If not, md5 hash will be created by the query text without bind message, and
829 			 * it will happen to find cache never or to get a wrong result.
830 			 *
831 			 * However, It is possible that temp_cache does not exist.
832 			 * Consider following scenario:
833 			 * - In the previous execute cache is overflowed, and
834 			 *   temp_cache discarded.
835 			 * - In the subsequent bind/execute uses the same portal
836 			 */
837 			if (query_context->temp_cache)
838 			{
839 				pfree(query_context->temp_cache->query);
840 				query_context->temp_cache->query = MemoryContextStrdup(session_context->memory_context,search_query);
841 			}
842 		}
843 
844 		/* If the query is SELECT from table to cache, try to fetch cached result. */
845 		status = pool_fetch_from_memory_cache(frontend, backend, search_query, &foundp);
846 
847 		if (status != POOL_CONTINUE)
848 			return status;
849 
850 		if (foundp)
851 		{
852 #ifdef DEBUG
853 			extern bool stop_now;
854 #endif
855 			pool_ps_idle_display(backend);
856 			pool_stats_count_up_num_cache_hits();
857 			query_context->skip_cache_commit = true;
858 #ifdef DEBUG
859 			stop_now = true;
860 #endif
861 
862 			if (!STREAM || !pool_is_doing_extended_query_message())
863 			{
864 				pool_set_skip_reading_from_backends();
865 				pool_stats_count_up_num_cache_hits();
866 				pool_unset_query_in_progress();
867 				return POOL_CONTINUE;
868 			}
869 		}
870 		else
871 			query_context->skip_cache_commit = false;
872 	}
873 
874 	session_context->query_context = query_context;
875 	/*
876 	 * Calling pool_where_to_send here is dangerous because the node
877 	 * parse/bind has been sent could be change by
878 	 * pool_where_to_send() and it leads to "portal not found"
879 	 * etc. errors.
880 	 */
881 
882 	/* check if query is "COMMIT" or "ROLLBACK" */
883 	commit = is_commit_or_rollback_query(node);
884 
885 	if (!STREAM)
886 	{
887 		/*
888 		 * Query is not commit/rollback
889 		 */
890 		if (!commit)
891 		{
892 			/* Send the query to master node */
893 			pool_extended_send_and_wait(query_context, "E", len, contents, 1, MASTER_NODE_ID, false);
894 
895 			/* Check specific errors */
896 			specific_error = check_errors(backend, MASTER_NODE_ID);
897 			if (specific_error)
898 			{
899 				/* log error message */
900 				generate_error_message("Execute: ", specific_error, contents);
901 			}
902 		}
903 
904 		if (specific_error)
905 		{
906 			char msg[1024] = "pgpool_error_portal"; /* large enough */
907 			int len = strlen(msg);
908 
909 			memset(msg + len, 0, sizeof(int));
910 
911 			/* send query to other nodes */
912 			pool_extended_send_and_wait(query_context, "E", len, msg, -1, MASTER_NODE_ID, false);
913 		}
914 		else
915 		{
916 			pool_extended_send_and_wait(query_context, "E", len, contents, -1, MASTER_NODE_ID, false);
917 		}
918 
919 		/* send "COMMIT" or "ROLLBACK" to only master node if query is "COMMIT" or "ROLLBACK" */
920 		if (commit)
921 		{
922 			pool_extended_send_and_wait(query_context, "E", len, contents, 1, MASTER_NODE_ID, false);
923 		}
924 	}
925 	else		/* streaming replication mode */
926 	{
927 		POOL_PENDING_MESSAGE *pmsg;
928 
929 		if (!foundp)
930 		{
931 			pool_extended_send_and_wait(query_context, "E", len, contents, 1, MASTER_NODE_ID, true);
932 			pool_extended_send_and_wait(query_context, "E", len, contents, -1, MASTER_NODE_ID, true);
933 		}
934 
935 		/* Add pending message */
936 		pmsg = pool_pending_message_create('E', len, contents);
937 		pool_pending_message_dest_set(pmsg, query_context);
938 		pool_pending_message_query_set(pmsg, query_context);
939 		pool_pending_message_add(pmsg);
940 		pool_pending_message_free_pending_message(pmsg);
941 
942 		/* Various take care at the transaction start */
943 		handle_query_context(backend);
944 
945 		/*
946 		 * Take care of "writing transaction" flag.
947 		 */
948 		if (!is_select_query(node, query) && !is_start_transaction_query(node) &&
949 			!is_commit_or_rollback_query(node))
950 		{
951 			ereport(DEBUG1,
952 					(errmsg("Execute: TSTATE:%c",
953 							TSTATE(backend, MASTER_SLAVE ? PRIMARY_NODE_ID : REAL_MASTER_NODE_ID))));
954 			/*
955 			 * If the query was not READ SELECT, and we are in an
956 			 * explicit transaction, remember that we had a write
957 			 * query in this transaction.
958 			 */
959 			if (TSTATE(backend, MASTER_SLAVE ? PRIMARY_NODE_ID : REAL_MASTER_NODE_ID) == 'T')
960 			{
961 				/* However, if the query is "SET TRANSACTION READ ONLY" or its variant,
962 				 * don't set it.
963 				 */
964 				if (!pool_is_transaction_read_only(node))
965 				{
966 					pool_set_writing_transaction();
967 				}
968 			}
969 		}
970 		pool_unset_query_in_progress();
971 	}
972 
973 	return POOL_CONTINUE;
974 }
975 
976 /*
977  * process Parse (V3 only)
978  */
Parse(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend,int len,char * contents)979 POOL_STATUS Parse(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend,
980 				  int len, char *contents)
981 {
982 	int deadlock_detected = 0;
983 	int insert_stmt_with_lock = 0;
984 	char *name;
985 	char *stmt;
986 	List *parse_tree_list;
987 	Node *node = NULL;
988 	POOL_SENT_MESSAGE *msg;
989 	POOL_STATUS status;
990 	POOL_SESSION_CONTEXT *session_context;
991 	POOL_QUERY_CONTEXT *query_context;
992 
993 	bool error;
994 
995 	/* Get session context */
996 	session_context = pool_get_session_context(false);
997 
998 	/* Create query context */
999 	query_context = pool_init_query_context();
1000 
1001 	ereport(DEBUG1,
1002 			(errmsg("Parse: statement name <%s>", contents)));
1003 
1004 	name = contents;
1005 	stmt = contents + strlen(contents) + 1;
1006 
1007 	/* parse SQL string */
1008 	MemoryContext old_context = MemoryContextSwitchTo(query_context->memory_context);
1009 	parse_tree_list = raw_parser(stmt, &error);
1010 
1011 	if (parse_tree_list == NIL)
1012 	{
1013 		/* is the query empty? */
1014 		if (*stmt == '\0' || *stmt == ';' || error == false)
1015 		{
1016 			/*
1017 			 * JBoss sends empty queries for checking connections.
1018 			 * We replace the empty query with SELECT command not
1019 			 * to affect load balance.
1020 			 * [Pgpool-general] Confused about JDBC and load balancing
1021 			 */
1022 			parse_tree_list = raw_parser(POOL_DUMMY_READ_QUERY, &error);
1023 		}
1024 		else
1025 		{
1026 			/*
1027 			 * Unable to parse the query. Probably syntax error or the
1028 			 * query is too new and our parser cannot understand. Treat as
1029 			 * if it were an DELETE command. Note that the DELETE command
1030 			 * does not execute, instead the original query will be sent
1031 			 * to backends, which may or may not cause an actual syntax errors.
1032 			 * The command will be sent to all backends in replication mode
1033 			 * or master/primary in master/slave mode.
1034 			 */
1035 			if (!strcmp(remote_host, "[local]"))
1036 			{
1037 				ereport(LOG,
1038 						(errmsg("Unable to parse the query: \"%s\" from local client", stmt)));
1039 			}
1040 			else
1041 			{
1042 				ereport(LOG,
1043 						(errmsg("Unable to parse the query: \"%s\" from client %s(%s)", stmt, remote_host, remote_port)));
1044 			}
1045 			parse_tree_list = raw_parser(POOL_DUMMY_WRITE_QUERY, &error);
1046 			query_context->is_parse_error = true;
1047 		}
1048 	}
1049 	MemoryContextSwitchTo(old_context);
1050 
1051 	if (parse_tree_list != NIL)
1052 	{
1053 		/* Save last query string for logging purpose */
1054 		snprintf(query_string_buffer, sizeof(query_string_buffer), "Parse: %s", stmt);
1055 
1056 		node = (Node *) lfirst(list_head(parse_tree_list));
1057 
1058 		/* If replication mode, check to see what kind of insert lock is
1059 		 * neccessary.
1060 		 */
1061 		if (REPLICATION)
1062 			insert_stmt_with_lock = need_insert_lock(backend, stmt, node);
1063 
1064 		/*
1065 		 * Start query context
1066 		 */
1067 		pool_start_query(query_context, stmt, strlen(stmt) + 1, node);
1068 
1069 		msg = pool_create_sent_message('P', len, contents, 0, name, query_context);
1070 
1071 		session_context->uncompleted_message = msg;
1072 
1073 		/*
1074 		 * If the table is to be cached, set is_cache_safe TRUE and register table oids.
1075 		 */
1076 		if (pool_config->memory_cache_enabled)
1077 		{
1078 			bool is_likely_select = false;
1079 			bool is_select_query = false;
1080 			int num_oids;
1081 			int *oids;
1082 			int i;
1083 
1084 			/* Check if the query is actually SELECT */
1085 			is_likely_select = pool_is_likely_select(query_context->original_query);
1086 			if (is_likely_select && IsA(node, SelectStmt))
1087 			{
1088 				is_select_query = true;
1089 			}
1090 
1091 			/* Switch the flag of is_cache_safe in session_context */
1092 			if (is_select_query && !query_context->is_parse_error &&
1093 				pool_is_allow_to_cache(query_context->parse_tree,
1094 									   query_context->original_query))
1095 			{
1096 				pool_set_cache_safe();
1097 			}
1098 			else
1099 			{
1100 				pool_unset_cache_safe();
1101 			}
1102 
1103 			/* If table is to be cached and the query is DML, save the table oid */
1104 			if (!is_select_query && !query_context->is_parse_error)
1105 			{
1106 				num_oids = pool_extract_table_oids(node, &oids);
1107 
1108 				if (num_oids > 0)
1109 				{
1110 					/* Save to oid buffer */
1111 					for (i=0;i<num_oids;i++)
1112 					{
1113 						pool_add_dml_table_oid(oids[i]);
1114 					}
1115 				}
1116 			}
1117 		}
1118 
1119 		/*
1120 		 * Decide where to send query
1121 		 */
1122 		pool_where_to_send(query_context, query_context->original_query,
1123 						   query_context->parse_tree);
1124 
1125 		if (REPLICATION)
1126 		{
1127 			char *rewrite_query;
1128 			bool rewrite_to_params = true;
1129 
1130 			/*
1131 			 * rewrite `now()'.
1132 			 * if stmt is unnamed, we rewrite `now()' to timestamp constant.
1133 			 * else we rewrite `now()' to params and expand that at Bind
1134 			 * message.
1135 			 */
1136 			if (*name == '\0')
1137 				rewrite_to_params = false;
1138 			msg->num_tsparams = 0;
1139 			rewrite_query = rewrite_timestamp(backend, node, rewrite_to_params, msg);
1140 			if (rewrite_query != NULL)
1141 			{
1142 				int alloc_len = len - strlen(stmt) + strlen(rewrite_query);
1143                 /* switch memory context */
1144 				MemoryContext oldcxt = MemoryContextSwitchTo(session_context->memory_context);
1145 				contents = repalloc(msg->contents,alloc_len);
1146 				MemoryContextSwitchTo(oldcxt);
1147 
1148 				strcpy(contents, name);
1149 				strcpy(contents + strlen(name) + 1, rewrite_query);
1150 				memcpy(contents + strlen(name) + strlen(rewrite_query) + 2,
1151 					   stmt + strlen(stmt) + 1,
1152 					   len - (strlen(name) + strlen(stmt) + 2));
1153 
1154 				len = alloc_len;
1155 				name = contents;
1156 				stmt = contents + strlen(name) + 1;
1157 				ereport(DEBUG1,
1158 						(errmsg("Parse: rewrite query name:\"%s\" statement:\"%s\" len=%d", name, stmt, len)));
1159 
1160 				msg->len = len;
1161 				msg->contents = contents;
1162 
1163 				query_context->rewritten_query = rewrite_query;
1164 			}
1165 		}
1166 
1167 		/*
1168 		 * If the query is BEGIN READ WRITE in master/slave mode,
1169 		 * we send BEGIN instead of it to slaves/standbys.
1170 		 * original_query which is BEGIN READ WRITE is sent to primary.
1171 		 * rewritten_query which is BEGIN is sent to standbys.
1172 		 */
1173 		if (is_start_transaction_query(query_context->parse_tree) &&
1174 			is_read_write((TransactionStmt *)query_context->parse_tree) &&
1175 			MASTER_SLAVE)
1176 		{
1177 			query_context->rewritten_query = pstrdup("BEGIN");
1178 		}
1179 	}
1180 
1181 	/*
1182 	 * If in replication mode, send "SYNC" message if not in a transaction.
1183 	 */
1184 	if (REPLICATION)
1185 	{
1186 		char kind;
1187 
1188 		if (TSTATE(backend, MASTER_NODE_ID) != 'T')
1189 		{
1190 			int i;
1191 
1192 			/* synchronize transaction state */
1193 			for (i = 0; i < NUM_BACKENDS; i++)
1194 			{
1195 				if (!VALID_BACKEND(i))
1196 					continue;
1197 				/* send sync message */
1198 				send_extended_protocol_message(backend, i, "S", 0, "");
1199 			}
1200 
1201 			kind = pool_read_kind(backend);
1202 			if (kind != 'Z')
1203                 ereport(ERROR,
1204                     (errmsg("unable to parse the query"),
1205                          errdetail("invalid read kind")));
1206 
1207 			/*
1208 			 * SYNC message returns "Ready for Query" message.
1209 			 */
1210 			if (ReadyForQuery(frontend, backend, 0, false) != POOL_CONTINUE)
1211 			{
1212 				pool_query_context_destroy(query_context);
1213 				return POOL_END;
1214 			}
1215 
1216 			/*
1217 			 * set in_progress flag, because ReadyForQuery unset it.
1218 			 * in_progress flag influences VALID_BACKEND.
1219 			 */
1220 			if (!pool_is_query_in_progress())
1221 				pool_set_query_in_progress();
1222 		}
1223 
1224 		if (is_strict_query(query_context->parse_tree))
1225 		{
1226 			start_internal_transaction(frontend, backend, query_context->parse_tree);
1227 			allow_close_transaction = 1;
1228 		}
1229 
1230 		if (insert_stmt_with_lock)
1231 		{
1232 			/* start a transaction if needed and lock the table */
1233 			status = insert_lock(frontend, backend, stmt, (InsertStmt *)query_context->parse_tree, insert_stmt_with_lock);
1234 			if (status != POOL_CONTINUE)
1235                 ereport(ERROR,
1236                     (errmsg("unable to parse the query"),
1237                          errdetail("unable to get insert lock")));
1238 
1239 		}
1240 	}
1241 
1242 	if (REPLICATION || SLONY)
1243 	{
1244 		/*
1245 		 * We must synchronize because Parse message acquires table
1246 		 * locks.
1247 		 */
1248 		ereport(DEBUG1,
1249 				(errmsg("Parse: waiting for master completing the query")));
1250         pool_extended_send_and_wait(query_context, "P", len, contents, 1, MASTER_NODE_ID, false);
1251 
1252 
1253 		/*
1254 		 * We must check deadlock error because a aborted transaction
1255 		 * by detecting deadlock isn't same on all nodes.
1256 		 * If a transaction is aborted on master node, pgpool send a
1257 		 * error query to another nodes.
1258 		 */
1259 		deadlock_detected = detect_deadlock_error(MASTER(backend), MAJOR(backend));
1260         /*
1261          * Check if other than deadlock error detected.  If so, emit
1262          * log. This is useful when invalid encoding error occurs. In
1263          * this case, PostgreSQL does not report what statement caused
1264          * that error and make users confused.
1265          */
1266         per_node_error_log(backend, MASTER_NODE_ID, stmt, "Parse: Error or notice message from backend: ", true);
1267 
1268 		if (deadlock_detected)
1269 		{
1270 			POOL_QUERY_CONTEXT *error_qc;
1271 
1272 			error_qc = pool_init_query_context();
1273 
1274 
1275 			pool_start_query(error_qc, POOL_ERROR_QUERY, strlen(POOL_ERROR_QUERY) + 1, node);
1276 			pool_copy_prep_where(query_context->where_to_send, error_qc->where_to_send);
1277 
1278 			ereport(LOG,
1279 					(errmsg("Parse: received deadlock error message from master node")));
1280 
1281 			pool_send_and_wait(error_qc, -1, MASTER_NODE_ID);
1282 
1283 			pool_query_context_destroy(error_qc);
1284 
1285 
1286             pool_set_query_in_progress();
1287 			session_context->query_context = query_context;
1288 		}
1289 		else
1290 		{
1291 			pool_extended_send_and_wait(query_context, "P", len, contents, -1, MASTER_NODE_ID, false);
1292 		}
1293 	}
1294 	else if (STREAM)
1295 	{
1296 		POOL_PENDING_MESSAGE *pmsg;
1297 
1298 		/* XXX fix me:even with streaming replication mode, couldn't we have a deadlock */
1299 		pool_set_query_in_progress();
1300 #ifdef NOT_USED
1301 		pool_clear_sync_map();
1302 #endif
1303 		pool_extended_send_and_wait(query_context, "P", len, contents, 1, MASTER_NODE_ID, true);
1304 		pool_extended_send_and_wait(query_context, "P", len, contents, -1, MASTER_NODE_ID, true);
1305 		pool_add_sent_message(session_context->uncompleted_message);
1306 
1307 		/* Add pending message */
1308 		pmsg = pool_pending_message_create('P', len, contents);
1309 		pool_pending_message_dest_set(pmsg, query_context);
1310 		pool_pending_message_add(pmsg);
1311 		pool_pending_message_free_pending_message(pmsg);
1312 
1313 		pool_unset_query_in_progress();
1314 	}
1315 	else
1316 	{
1317 		pool_extended_send_and_wait(query_context, "P", len, contents, 1, MASTER_NODE_ID, false);
1318 	}
1319 
1320 	return POOL_CONTINUE;
1321 
1322 }
1323 
Bind(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend,int len,char * contents)1324 POOL_STATUS Bind(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend,
1325 				 int len, char *contents)
1326 {
1327 	char *pstmt_name;
1328 	char *portal_name;
1329 	char *rewrite_msg = NULL;
1330 	POOL_SENT_MESSAGE *parse_msg;
1331 	POOL_SENT_MESSAGE *bind_msg;
1332 	POOL_SESSION_CONTEXT *session_context;
1333 	POOL_QUERY_CONTEXT *query_context;
1334 	int insert_stmt_with_lock = 0;
1335 	bool nowait;
1336 
1337 	/* Get session context */
1338 	session_context = pool_get_session_context(false);
1339 	/*
1340 	 * Rewrite message
1341 	 */
1342 	portal_name = contents;
1343 	pstmt_name = contents + strlen(portal_name) + 1;
1344 
1345 	parse_msg = pool_get_sent_message('Q', pstmt_name, POOL_SENT_MESSAGE_CREATED);
1346 	if (!parse_msg)
1347 		parse_msg = pool_get_sent_message('P', pstmt_name, POOL_SENT_MESSAGE_CREATED);
1348 	if (!parse_msg)
1349 	{
1350         ereport(FATAL,
1351                 (errmsg("unable to bind"),
1352                  errdetail("cannot get parse message \"%s\"", pstmt_name)));
1353 	}
1354 
1355 	bind_msg = pool_create_sent_message('B', len, contents,
1356 										parse_msg->num_tsparams, portal_name,
1357 										parse_msg->query_context);
1358 
1359 	query_context = parse_msg->query_context;
1360 	if (!query_context)
1361 	{
1362         ereport(FATAL,
1363                 (errmsg("unable to bind"),
1364                  errdetail("cannot get the query context")));
1365 	}
1366 
1367 	/*
1368 	 * If the query can be cached, save its offset of query text in bind message's content.
1369 	 */
1370 	if (query_context->is_cache_safe)
1371 	{
1372 		bind_msg->param_offset = sizeof(char) * (strlen(portal_name) + strlen(pstmt_name) + 2);
1373 	}
1374 
1375 	session_context->uncompleted_message = bind_msg;
1376 
1377 	/* rewrite bind message */
1378 	if (REPLICATION && bind_msg->num_tsparams > 0)
1379 	{
1380 		rewrite_msg = bind_rewrite_timestamp(backend, bind_msg, contents, &len);
1381 		if (rewrite_msg != NULL)
1382 			contents = rewrite_msg;
1383 	}
1384 
1385 	session_context->query_context = query_context;
1386 
1387 	/*
1388 	 * Take care the case when the previous parse message has been sent to
1389 	 * other than primary node. In this case, we send a parse message to the
1390 	 * primary node.
1391 	 */
1392 	if (pool_config->load_balance_mode && pool_is_writing_transaction() &&
1393 		TSTATE(backend, MASTER_SLAVE ? PRIMARY_NODE_ID : REAL_MASTER_NODE_ID) == 'T')
1394 	{
1395 		if (!STREAM)
1396 		{
1397 			pool_where_to_send(query_context, query_context->original_query,
1398 							   query_context->parse_tree);
1399 		}
1400 
1401 		if (parse_before_bind(frontend, backend, parse_msg, bind_msg) != POOL_CONTINUE)
1402 			return POOL_END;
1403 	}
1404 
1405 	/*
1406 	 * Start a transaction if necessary in replication mode
1407 	 */
1408 	if (REPLICATION)
1409 	{
1410 		ereport(DEBUG1,
1411 				(errmsg("Bind: checking strict query")));
1412 		if (is_strict_query(query_context->parse_tree))
1413 		{
1414 			ereport(DEBUG1,
1415 					(errmsg("Bind: strict query")));
1416 			start_internal_transaction(frontend, backend, query_context->parse_tree);
1417 			allow_close_transaction = 1;
1418 		}
1419 
1420 		ereport(DEBUG1,
1421 				(errmsg("Bind: checking insert lock")));
1422 		insert_stmt_with_lock = need_insert_lock(backend, query_context->original_query, query_context->parse_tree);
1423 		if (insert_stmt_with_lock)
1424 		{
1425 			ereport(DEBUG1,
1426 					(errmsg("Bind: issuing insert lock")));
1427 			/* issue a LOCK command to keep consistency */
1428 			if (insert_lock(frontend, backend, query_context->original_query, (InsertStmt *)query_context->parse_tree, insert_stmt_with_lock) != POOL_CONTINUE)
1429 			{
1430 				pool_query_context_destroy(query_context);
1431 				return POOL_END;
1432 			}
1433 		}
1434 	}
1435 
1436 	ereport(DEBUG1,
1437 		(errmsg("Bind: waiting for master completing the query")));
1438 
1439 	pool_set_query_in_progress();
1440 
1441 	if (STREAM)
1442 	{
1443 		nowait = true;
1444 		session_context->query_context = query_context = bind_msg->query_context;
1445 	}
1446 	else
1447 		nowait = false;
1448 
1449 	pool_extended_send_and_wait(query_context, "B", len, contents, 1, MASTER_NODE_ID, nowait);
1450 	pool_extended_send_and_wait(query_context, "B", len, contents, -1, MASTER_NODE_ID, nowait);
1451 
1452 	if (STREAM)
1453 	{
1454 		POOL_PENDING_MESSAGE *pmsg;
1455 
1456 		pool_unset_query_in_progress();
1457 		pool_add_sent_message(session_context->uncompleted_message);
1458 
1459 		/* Add pending message */
1460 		pmsg = pool_pending_message_create('B', len, contents);
1461 		pool_pending_message_dest_set(pmsg, query_context);
1462 		pool_pending_message_query_set(pmsg, query_context);
1463 		pool_pending_message_add(pmsg);
1464 		pool_pending_message_free_pending_message(pmsg);
1465 	}
1466 
1467 	if(rewrite_msg)
1468 		pfree(rewrite_msg);
1469 	return POOL_CONTINUE;
1470 }
1471 
Describe(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend,int len,char * contents)1472 POOL_STATUS Describe(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend,
1473 							int len, char *contents)
1474 {
1475 	POOL_SENT_MESSAGE *msg;
1476 	POOL_SESSION_CONTEXT *session_context;
1477 	POOL_QUERY_CONTEXT *query_context;
1478 
1479 	bool nowait;
1480 
1481 	/* Get session context */
1482 	session_context = pool_get_session_context(false);
1483 
1484 	/* Prepared Statement */
1485 	if (*contents == 'S')
1486 	{
1487 		msg = pool_get_sent_message('Q', contents+1, POOL_SENT_MESSAGE_CREATED);
1488 		if (!msg)
1489 			msg = pool_get_sent_message('P', contents+1, POOL_SENT_MESSAGE_CREATED);
1490 		if (!msg)
1491             ereport(FATAL,
1492                 (return_code(2),
1493                      errmsg("unable to execute Describe"),
1494                      errdetail("unable to get the parse message")));
1495 	}
1496 	/* Portal */
1497 	else
1498 	{
1499 		msg = pool_get_sent_message('B', contents+1, POOL_SENT_MESSAGE_CREATED);
1500 		if (!msg)
1501             ereport(FATAL,
1502                     (return_code(2),
1503                      errmsg("unable to execute Describe"),
1504                      errdetail("unable to get the bind message")));
1505 	}
1506 
1507 	query_context = msg->query_context;
1508 
1509 	if (query_context == NULL)
1510         ereport(FATAL,
1511                 (return_code(2),
1512                  errmsg("unable to execute Describe"),
1513                  errdetail("unable to get the query context")));
1514 
1515 	session_context->query_context = query_context;
1516 
1517 	/*
1518 	 * Calling pool_where_to_send here is dangerous because the node
1519 	 * parse/bind has been sent could be change by
1520 	 * pool_where_to_send() and it leads to "portal not found"
1521 	 * etc. errors.
1522 	 */
1523     ereport(DEBUG1,
1524             (errmsg("Describe: waiting for master completing the query")));
1525 
1526 	nowait = (STREAM? true: false);
1527 
1528 	pool_set_query_in_progress();
1529 	pool_extended_send_and_wait(query_context, "D", len, contents, 1, MASTER_NODE_ID, nowait);
1530 	pool_extended_send_and_wait(query_context, "D", len, contents, -1, MASTER_NODE_ID, nowait);
1531 
1532 	if (STREAM)
1533 	{
1534 		POOL_PENDING_MESSAGE *pmsg;
1535 
1536 		/* Add pending message */
1537 		pmsg = pool_pending_message_create('D', len, contents);
1538 		pool_pending_message_dest_set(pmsg, query_context);
1539 		pool_pending_message_query_set(pmsg, query_context);
1540 		pool_pending_message_add(pmsg);
1541 		pool_pending_message_free_pending_message(pmsg);
1542 
1543 		pool_unset_query_in_progress();
1544 	}
1545 
1546 	return POOL_CONTINUE;
1547 }
1548 
1549 
Close(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend,int len,char * contents)1550 POOL_STATUS Close(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend,
1551 				  int len, char *contents)
1552 {
1553 	POOL_SENT_MESSAGE *msg;
1554 	POOL_SESSION_CONTEXT *session_context;
1555 	POOL_QUERY_CONTEXT *query_context;
1556 
1557 	/* Get session context */
1558 	session_context = pool_get_session_context(false);
1559 
1560 	/* Prepared Statement */
1561 	if (*contents == 'S')
1562 	{
1563 		msg = pool_get_sent_message('Q', contents+1, POOL_SENT_MESSAGE_CREATED);
1564 		if (!msg)
1565 			msg = pool_get_sent_message('P', contents+1, POOL_SENT_MESSAGE_CREATED);
1566 	}
1567 	/* Portal */
1568 	else if (*contents == 'P')
1569 	{
1570 		msg = pool_get_sent_message('B', contents+1, POOL_SENT_MESSAGE_CREATED);
1571 	}
1572 	else
1573         ereport(FATAL,
1574                 (return_code(2),
1575                     errmsg("unable to execute close, invalid message")));
1576 	/*
1577 	 * As per the postgresql, calling close on non existing portals or
1578 	 * statements is not an error. So on the same footings we will ignore all
1579 	 * such calls and return the close complete message to clients with out
1580 	 * going to backend
1581 	 */
1582 	if (!msg)
1583 	{
1584 		int len = htonl(4);
1585 		pool_set_command_success();
1586 		pool_unset_query_in_progress();
1587 
1588 		pool_write(frontend, "3", 1);
1589 		pool_write_and_flush(frontend, &len, sizeof(len));
1590 
1591 		return POOL_CONTINUE;
1592 	}
1593 
1594 	session_context->uncompleted_message = msg;
1595 	query_context = msg->query_context;
1596 
1597 	if (!query_context)
1598         ereport(FATAL,
1599 			(return_code(2),
1600                  errmsg("unable to execute close"),
1601 					errdetail("unable to get the query context")));
1602 
1603 	session_context->query_context = query_context;
1604 	/* pool_where_to_send(query_context, query_context->original_query, query_context->parse_tree); */
1605 
1606     ereport(DEBUG1,
1607             (errmsg("Close: waiting for master completing the query")));
1608 
1609 	pool_set_query_in_progress();
1610 
1611 	if (!STREAM)
1612 	{
1613 		pool_extended_send_and_wait(query_context, "C", len, contents, 1, MASTER_NODE_ID, false);
1614 		pool_extended_send_and_wait(query_context, "C", len, contents, -1, MASTER_NODE_ID, false);
1615 	}
1616 	else
1617 	{
1618 		POOL_PENDING_MESSAGE *pmsg;
1619 		bool where_to_send_save[MAX_NUM_BACKENDS];
1620 
1621 		/* Parse_before_bind() may have sent a bind message to the primary
1622 		 * node id. So send the close message to the primary node as well.
1623 		 * Even if not, sending a close message for non existing
1624 		 * statement/portal is harmless. No error will happen.
1625 		 */
1626 		if (session_context->load_balance_node_id != PRIMARY_NODE_ID)
1627 		{
1628 			/* save where_to_send map */
1629 			memcpy(where_to_send_save, query_context->where_to_send, sizeof(where_to_send_save));
1630 
1631 			query_context->where_to_send[PRIMARY_NODE_ID] = true;
1632 			query_context->where_to_send[session_context->load_balance_node_id] = true;
1633 		}
1634 
1635 		pool_extended_send_and_wait(query_context, "C", len, contents, 1, MASTER_NODE_ID, true);
1636 		pool_extended_send_and_wait(query_context, "C", len, contents, -1, MASTER_NODE_ID, true);
1637 
1638 		/* Add pending message */
1639 		pmsg = pool_pending_message_create('C', len, contents);
1640 		pool_pending_message_dest_set(pmsg, query_context);
1641 		pool_pending_message_query_set(pmsg, query_context);
1642 		pool_pending_message_add(pmsg);
1643 		pool_pending_message_free_pending_message(pmsg);
1644 
1645 		if (session_context->load_balance_node_id != PRIMARY_NODE_ID)
1646 		{
1647 			/* Restore where_to_send map */
1648 			memcpy(query_context->where_to_send, where_to_send_save, sizeof(where_to_send_save));
1649 		}
1650 
1651 #ifdef NOT_USED
1652 		dump_pending_message();
1653 #endif
1654 		pool_unset_query_in_progress();
1655 
1656 		/*
1657 		 * Remove sent message
1658 		 */
1659 		ereport(DEBUG1,
1660 				(errmsg("Close: removing sent message %c %s", *contents, contents+1)));
1661 		pool_set_sent_message_state(msg);
1662 	}
1663 
1664 	return POOL_CONTINUE;
1665 }
1666 
1667 
FunctionCall3(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend,int len,char * contents)1668 POOL_STATUS FunctionCall3(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend,
1669 						  int len, char *contents)
1670 {
1671 	/*
1672 	 * If Function call message for lo_creat, rewrite it
1673 	 */
1674 	char *rewrite_lo;
1675 	int rewrite_len;
1676 
1677 	rewrite_lo = pool_rewrite_lo_creat('F', contents, len, frontend,
1678 									   backend, &rewrite_len);
1679 
1680 	if (rewrite_lo != NULL)
1681 	{
1682 		contents = rewrite_lo;
1683 		len = rewrite_len;
1684 	}
1685 	return  SimpleForwardToBackend('F', frontend, backend, len, contents);
1686 }
1687 
1688 /*
1689  * Process ReadyForQuery('Z') message.
1690  * If send_ready is true, send 'Z' message to frontend.
1691  * If cache_commit is true, commit or discard query cache according to
1692  * transaction state.
1693  *
1694  * - if the error status "mismatch_ntuples" is set, send an error query
1695  *	 to all DB nodes to abort transaction or do failover.
1696  * - internal transaction is closed
1697  */
ReadyForQuery(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend,bool send_ready,bool cache_commit)1698 POOL_STATUS ReadyForQuery(POOL_CONNECTION *frontend,
1699 						  POOL_CONNECTION_POOL *backend, bool send_ready, bool cache_commit)
1700 {
1701 	int i;
1702 	int len;
1703 	signed char kind;
1704 	signed char state = 0;
1705 	POOL_SESSION_CONTEXT *session_context;
1706 	Node *node = NULL;
1707 	char *query = NULL;
1708 	bool got_estate = false;
1709 
1710 	/*
1711 	 * It is possible that the "ignore until sync is received" flag was set if
1712 	 * we send sync to backend and the backend returns error. Let's reset the
1713 	 * flag unconditionally because we apparently have received a "ready for
1714 	 * query" message from backend.
1715 	 */
1716 	pool_unset_ignore_till_sync();
1717 
1718 	/* Reset previous message */
1719 	pool_pending_message_reset_previous_message();
1720 
1721 	/* Get session context */
1722 	session_context = pool_get_session_context(false);
1723 
1724 	/*
1725 	 * If the numbers of update tuples are differ and
1726 	 * failover_if_affected_tuples_mismatch is false, we abort
1727 	 * transactions by using do_error_command.  If
1728 	 * failover_if_affected_tuples_mismatch is true, trigger failover.
1729 	 * This only works with PROTO_MAJOR_V3.
1730 	 */
1731 	if (session_context->mismatch_ntuples && MAJOR(backend) == PROTO_MAJOR_V3)
1732 	{
1733 		int i;
1734 		char kind;
1735 
1736 		/*
1737 		 * If failover_if_affected_tuples_mismatch, is true, then
1738 		 * decide victim nodes by using find_victim_nodes and
1739 		 * degenerate them.
1740 		 */
1741 		if (pool_config->failover_if_affected_tuples_mismatch)
1742 		{
1743 			int *victim_nodes;
1744 			int number_of_nodes;
1745 			char msgbuf[128];
1746 
1747 			victim_nodes = find_victim_nodes(session_context->ntuples, NUM_BACKENDS,
1748 											 MASTER_NODE_ID, &number_of_nodes);
1749 			if (victim_nodes)
1750 			{
1751 				int i;
1752 				String *msg;
1753 
1754 				msg = init_string("ReadyForQuery: Degenerate backends:");
1755 
1756 				for (i=0;i<number_of_nodes;i++)
1757 				{
1758 					snprintf(msgbuf, sizeof(msgbuf), " %d", victim_nodes[i]);
1759 					string_append_char(msg, msgbuf);
1760 				}
1761 				ereport(LOG,
1762 					(errmsg("processing ready for query message"),
1763 						 errdetail("%s", msg->data)));
1764 
1765 				free_string(msg);
1766 
1767 				msg = init_string("ReadyForQuery: Number of affected tuples are:");
1768 
1769 				for (i=0;i<NUM_BACKENDS;i++)
1770 				{
1771 					snprintf(msgbuf, sizeof(msgbuf), " %d", session_context->ntuples[i]);
1772 					string_append_char(msg, msgbuf);
1773 				}
1774 				ereport(LOG,
1775 					(errmsg("processing ready for query message"),
1776 						 errdetail("%s", msg->data)));
1777 
1778 				free_string(msg);
1779 
1780 				degenerate_backend_set(victim_nodes, number_of_nodes, true, 0);
1781 				child_exit(POOL_EXIT_AND_RESTART);
1782 			}
1783 			else
1784 			{
1785 				ereport(LOG,
1786 					(errmsg("processing ready for query message"),
1787 						 errdetail("find_victim_nodes returned no victim node")));
1788 			}
1789 		}
1790 
1791 		/*
1792 		 * XXX: discard rest of ReadyForQuery packet
1793 		 */
1794 
1795 		if (pool_read_message_length(backend) < 0)
1796 			return POOL_END;
1797 
1798 		state = pool_read_kind(backend);
1799 		if (state < 0)
1800 			return POOL_END;
1801 
1802 		for (i = 0; i < NUM_BACKENDS; i++)
1803 		{
1804 			if (VALID_BACKEND(i))
1805 			{
1806 				/* abort transaction on all nodes. */
1807 				do_error_command(CONNECTION(backend, i), PROTO_MAJOR_V3);
1808 			}
1809 		}
1810 
1811 		/* loop through until we get ReadyForQuery */
1812 		for(;;)
1813 		{
1814 			kind = pool_read_kind(backend);
1815 			if (kind < 0)
1816 				return POOL_END;
1817 
1818 			if (kind == 'Z')
1819 				break;
1820 
1821 
1822 			/* put the message back to read buffer */
1823 			for (i=0;i<NUM_BACKENDS;i++)
1824 			{
1825 				if (VALID_BACKEND(i))
1826 				{
1827 					pool_unread(CONNECTION(backend,i), &kind, 1);
1828 				}
1829 			}
1830 
1831 			/* discard rest of the packet */
1832 			if (pool_discard_packet(backend) != POOL_CONTINUE)
1833 				return POOL_END;
1834 		}
1835 		session_context->mismatch_ntuples = false;
1836 	}
1837 
1838 	/*
1839 	 * if a transaction is started for insert lock, we need to close
1840 	 * the transaction.
1841 	 */
1842 	/* if (pool_is_query_in_progress() && allow_close_transaction) */
1843 	if (allow_close_transaction)
1844 	{
1845 		if (end_internal_transaction(frontend, backend) != POOL_CONTINUE)
1846 			return POOL_END;
1847 	}
1848 
1849 	if (MAJOR(backend) == PROTO_MAJOR_V3)
1850 	{
1851 		if ((len = pool_read_message_length(backend)) < 0)
1852 			return POOL_END;
1853 		/*
1854 		 * Set transaction state for each node
1855 		 */
1856 		state = TSTATE(backend,
1857 					   MASTER_SLAVE ? PRIMARY_NODE_ID : REAL_MASTER_NODE_ID);
1858 
1859 		for (i=0;i<NUM_BACKENDS;i++)
1860 		{
1861 			if (!VALID_BACKEND(i))
1862 				continue;
1863 
1864 			if (pool_read(CONNECTION(backend, i), &kind, sizeof(kind)))
1865 				return POOL_END;
1866 
1867 			TSTATE(backend, i) = kind;
1868 			ereport(DEBUG1,
1869 				(errmsg("processing ReadyForQuery"),
1870 					 errdetail("transaction state '%c'(%02x)", state,state)));
1871 			/*
1872 			 * The transaction state to be returned to frontend is
1873 			 * master's.
1874 			 */
1875 			if (i == (MASTER_SLAVE ? PRIMARY_NODE_ID : REAL_MASTER_NODE_ID))
1876 			{
1877 				state = kind;
1878 			}
1879 			/*
1880 			 * However, if the state is 'E', then frontend should had been
1881 			 * already reported an ERROR. So, to match with that, let be the
1882 			 * state to be returned to frontend.
1883 			 */
1884 			if (kind == 'E')
1885 				got_estate = true;
1886 		}
1887 	}
1888 
1889 	/*
1890 	 * Make sure that no message remains in the backend buffer.  If something
1891 	 * remains, it could be an "out of band" ERROR or FATAL error, or a NOTICE
1892 	 * message, which was generated by backend itself for some reasons like
1893 	 * recovery conflict or SIGTERM received. If so, let's consume it and emit
1894 	 * a log message so that next read_kind_from_backend() will not hang in
1895 	 * trying to read from backend which may have not produced such a message.
1896 	 */
1897 	if (pool_is_query_in_progress())
1898 	{
1899 		for (i = 0; i < NUM_BACKENDS; i++)
1900 		{
1901 			if (!VALID_BACKEND(i))
1902 				continue;
1903 			if (!pool_read_buffer_is_empty(CONNECTION(backend, i)))
1904 				per_node_error_log(backend, i,
1905 								   "(out of band message)",
1906 								   "ReadyForQuery: Error or notice message from backend: ", false);
1907 		}
1908 	}
1909 
1910 	if (send_ready)
1911 	{
1912 		pool_write(frontend, "Z", 1);
1913 
1914 		if (MAJOR(backend) == PROTO_MAJOR_V3)
1915 		{
1916 			len = htonl(len);
1917 			pool_write(frontend, &len, sizeof(len));
1918 			if (got_estate)
1919 				state = 'E';
1920 			pool_write(frontend, &state, 1);
1921 		}
1922 		pool_flush(frontend);
1923 	}
1924 
1925 	if (pool_is_query_in_progress())
1926 	{
1927 		node = pool_get_parse_tree();
1928 		query = pool_get_query_string();
1929 
1930 		if (pool_is_command_success())
1931 		{
1932 			if (node)
1933 				pool_at_command_success(frontend, backend);
1934 
1935 			/* Memory cache enabled? */
1936 			if (cache_commit && pool_config->memory_cache_enabled)
1937 			{
1938 
1939 				/* If we are doing extended query and the state is after EXECUTE,
1940 				 * then we can commit cache.
1941 				 * We check latter condition by looking at query_context->query_w_hex.
1942 				 * This check is necessary for certain frame work such as PHP PDO.
1943 				 * It sends Sync message right after PARSE and it produces
1944 				 * "Ready for query" message from backend.
1945 				 */
1946 				if (pool_is_doing_extended_query_message())
1947 				{
1948 					if (session_context->query_context &&
1949 						session_context->query_context->query_state[MASTER_NODE_ID] == POOL_EXECUTE_COMPLETE)
1950 					{
1951 						pool_handle_query_cache(backend, session_context->query_context->query_w_hex, node, state);
1952 						if(session_context->query_context->query_w_hex)
1953                             pfree(session_context->query_context->query_w_hex);
1954 						session_context->query_context->query_w_hex = NULL;
1955 					}
1956 				}
1957 				else
1958 				{
1959 					if (MAJOR(backend) != PROTO_MAJOR_V3)
1960 					{
1961 						state = 'I';	/* XXX I don't think query cache works with PROTO2 protocol */
1962 					}
1963 					pool_handle_query_cache(backend, query, node, state);
1964 				}
1965 			}
1966 		}
1967 		/*
1968 		 * If PREPARE or extended query protocol commands caused error,
1969 		 * remove the temporary saved message.
1970 		 * (except when ReadyForQuery() is called during Parse() of extended queries)
1971 		 */
1972 		else
1973 		{
1974 			if ((pool_is_doing_extended_query_message() &&
1975 				 session_context->query_context &&
1976 				 session_context->query_context->query_state[MASTER_NODE_ID] != POOL_UNPARSED &&
1977 			     session_context->uncompleted_message) ||
1978 			    (!pool_is_doing_extended_query_message() && session_context->uncompleted_message &&
1979 				 session_context->uncompleted_message->kind != 0))
1980 			{
1981 				pool_add_sent_message(session_context->uncompleted_message);
1982 				pool_remove_sent_message(session_context->uncompleted_message->kind,
1983 										 session_context->uncompleted_message->name);
1984 				session_context->uncompleted_message = NULL;
1985 			}
1986 		}
1987 
1988 		pool_unset_query_in_progress();
1989 	}
1990 	if (!pool_is_doing_extended_query_message())
1991 	{
1992 		if (!(node && IsA(node, PrepareStmt)))
1993         {
1994 			pool_query_context_destroy(pool_get_session_context(false)->query_context);
1995         }
1996 	}
1997 
1998 	/*
1999 	 * Show ps idle status
2000 	 */
2001 	pool_ps_idle_display(backend);
2002 
2003 	return POOL_CONTINUE;
2004 }
2005 
2006 /*
2007  * Close running transactions on standbys.
2008  */
close_standby_transactions(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend)2009 static POOL_STATUS close_standby_transactions(POOL_CONNECTION *frontend,
2010 											  POOL_CONNECTION_POOL *backend)
2011 {
2012 	int i;
2013 
2014 	for (i=0;i<NUM_BACKENDS;i++)
2015 	{
2016 		if (CONNECTION_SLOT(backend, i) &&
2017 			TSTATE(backend, i) == 'T' &&
2018 			BACKEND_INFO(i).backend_status == CON_UP &&
2019 			(MASTER_SLAVE ? PRIMARY_NODE_ID : REAL_MASTER_NODE_ID) != i)
2020 		{
2021 			per_node_statement_log(backend, i, "COMMIT");
2022 			if (do_command(frontend, CONNECTION(backend, i), "COMMIT", MAJOR(backend),
2023 						   MASTER_CONNECTION(backend)->pid,
2024 						   MASTER_CONNECTION(backend)->key, 0) != POOL_CONTINUE)
2025                 ereport(ERROR,
2026                         (errmsg("unable to close standby transactions"),
2027                          errdetail("do_command returned DEADLOCK status")));
2028 		}
2029 	}
2030 	return POOL_CONTINUE;
2031 }
2032 
ParseComplete(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend)2033 POOL_STATUS ParseComplete(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend)
2034 {
2035 	POOL_SESSION_CONTEXT *session_context;
2036 
2037 	/* Get session context */
2038 	session_context = pool_get_session_context(false);
2039 
2040 	if (!STREAM && session_context->uncompleted_message)
2041 	{
2042 		POOL_QUERY_CONTEXT *qc;
2043 
2044 		pool_add_sent_message(session_context->uncompleted_message);
2045 
2046 		qc = session_context->uncompleted_message->query_context;
2047 		if (qc)
2048 			pool_set_query_state(qc, POOL_PARSE_COMPLETE);
2049 
2050 		session_context->uncompleted_message = NULL;
2051 	}
2052 
2053 	return SimpleForwardToFrontend('1', frontend, backend);
2054 }
2055 
BindComplete(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend)2056 POOL_STATUS BindComplete(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend)
2057 {
2058 	POOL_SESSION_CONTEXT *session_context;
2059 
2060 	/* Get session context */
2061 	session_context = pool_get_session_context(false);
2062 
2063 	if (!STREAM && session_context->uncompleted_message)
2064 	{
2065 		POOL_QUERY_CONTEXT *qc;
2066 
2067 		pool_add_sent_message(session_context->uncompleted_message);
2068 
2069 		qc = session_context->uncompleted_message->query_context;
2070 		if (qc)
2071 			pool_set_query_state(qc, POOL_BIND_COMPLETE);
2072 
2073 		session_context->uncompleted_message = NULL;
2074 	}
2075 
2076 	return SimpleForwardToFrontend('2', frontend, backend);
2077 }
2078 
CloseComplete(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend)2079 POOL_STATUS CloseComplete(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend)
2080 {
2081 	POOL_SESSION_CONTEXT *session_context;
2082 	POOL_STATUS status;
2083 	char kind = ' ';
2084 	char *name = "";
2085 
2086 	/* Get session context */
2087 	session_context = pool_get_session_context(false);
2088 
2089 	/* Send CloseComplete(3) to frontend before removing the target message */
2090 	status = SimpleForwardToFrontend('3', frontend, backend);
2091 
2092 	/* Remove the target message */
2093 	if (STREAM)
2094 	{
2095 		POOL_PENDING_MESSAGE *pmsg;
2096 
2097 		pmsg = pool_pending_message_pull_out();
2098 
2099 		if (pmsg)
2100 		{
2101 			/* Sanity check */
2102 			if (pmsg->type != POOL_CLOSE)
2103 			{
2104 				ereport(LOG,
2105 						(errmsg("CloseComplete: pending messge was not Close request: %s", pool_pending_message_type_to_string(pmsg->type))));
2106 			}
2107 			else
2108 			{
2109 				kind = pool_get_close_message_spec(pmsg);
2110 				name = pool_get_close_message_name(pmsg);
2111 				kind = kind=='S'?'P':'B';
2112 			}
2113 			pool_pending_message_free_pending_message(pmsg);
2114 		}
2115 	}
2116 	else
2117 	{
2118 		if (session_context->uncompleted_message)
2119 		{
2120 			kind = session_context->uncompleted_message->kind;
2121 			name = session_context->uncompleted_message->name;
2122 			session_context->uncompleted_message = NULL;
2123 		}
2124 		else
2125 		{
2126 			ereport(ERROR,
2127 					(errmsg("processing CloseComplete, uncompleted message not found")));
2128 		}
2129 	}
2130 
2131 	if (kind != ' ')
2132 	{
2133 		pool_remove_sent_message(kind, name);
2134 		ereport(DEBUG1,
2135 				(errmsg("CloseComplete: remove sent message. kind:%c, name:%s",
2136 						kind, name)));
2137 		if (pool_config->memory_cache_enabled)
2138 		{
2139 			POOL_QUERY_CONTEXT *query_context;
2140 			POOL_TEMP_QUERY_CACHE *temp_cache;
2141 
2142 			query_context = session_context->query_context;
2143 			if (query_context)
2144 			{
2145 				temp_cache = query_context->temp_cache;
2146 				if (temp_cache)
2147 				{
2148 					pool_discard_temp_query_cache(temp_cache);
2149 					query_context->temp_cache = NULL;
2150 				}
2151 			}
2152 		}
2153 	}
2154 
2155 	return status;
2156 }
2157 
ParameterDescription(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend)2158 POOL_STATUS ParameterDescription(POOL_CONNECTION *frontend,
2159 								 POOL_CONNECTION_POOL *backend)
2160 {
2161 	int len, len1 = 0;
2162 	char *p = NULL;
2163 	char *p1 = NULL;
2164 	int sendlen;
2165 	int i;
2166 
2167 	POOL_SESSION_CONTEXT *session_context;
2168 	int num_params, send_num_params, num_dmy;
2169 	char kind = 't';
2170 
2171 	session_context = pool_get_session_context(false);
2172 
2173 	/* only in replication mode and rewritten query */
2174 	if (!REPLICATION || !session_context->query_context->rewritten_query)
2175 		return SimpleForwardToFrontend('t', frontend, backend);
2176 
2177 	/* get number of parameters in original query */
2178 	num_params = session_context->query_context->num_original_params;
2179 
2180 	pool_read(MASTER(backend), &len, sizeof(len));
2181 
2182 	len = ntohl(len);
2183 	len -= sizeof(int32);
2184 	len1 = len;
2185 
2186 	/* number of parameters in rewritten query is just discarded */
2187 	pool_read(MASTER(backend), &num_dmy, sizeof(int16));
2188 	len -= sizeof(int16);
2189 
2190 	p = pool_read2(MASTER(backend), len);
2191 	if (p == NULL)
2192         ereport(ERROR,
2193 				(errmsg("ParameterDescription. connection error"),
2194                  errdetail("read from backend failed")));
2195 
2196 
2197 	p1 = palloc(len);
2198 	memcpy(p1, p, len);
2199 
2200 	for (i=0;i<NUM_BACKENDS;i++)
2201 	{
2202 		if (VALID_BACKEND(i) && !IS_MASTER_NODE_ID(i))
2203 		{
2204 			pool_read(CONNECTION(backend, i), &len, sizeof(len));
2205 
2206 			len = ntohl(len);
2207 			len -= sizeof(int32);
2208 
2209 			p = pool_read2(CONNECTION(backend, i), len);
2210 			if (p == NULL)
2211 				ereport(ERROR,
2212 						(errmsg("ParameterDescription. connection error"),
2213 						 errdetail("read from backend no %d failed",i)));
2214 
2215 			if (len != len1)
2216 				ereport(DEBUG1,
2217 						(errmsg("ParameterDescription. backends does not match"),
2218 						 errdetail("length does not match between backends master(%d) %d th backend(%d) kind:(%c)",len, i, len1, kind)));
2219 		}
2220 	}
2221 
2222 	pool_write(frontend, &kind, 1);
2223 
2224 	/* send back OIDs of parameters in original query and left are discarded */
2225 	len = sizeof(int16) + num_params * sizeof(int32);
2226 	sendlen = htonl(len + sizeof(int32));
2227 	pool_write(frontend, &sendlen, sizeof(int32));
2228 
2229 	send_num_params = htons(num_params);
2230 	pool_write(frontend, &send_num_params, sizeof(int16));
2231 
2232 	pool_write_and_flush(frontend, p1, num_params * sizeof(int32));
2233 
2234 	pfree(p1);
2235 	return POOL_CONTINUE;
2236 }
2237 
ErrorResponse3(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend)2238 POOL_STATUS ErrorResponse3(POOL_CONNECTION *frontend,
2239 						   POOL_CONNECTION_POOL *backend)
2240 {
2241 	POOL_STATUS ret;
2242 
2243 	ret = SimpleForwardToFrontend('E', frontend, backend);
2244 	if (ret != POOL_CONTINUE)
2245 		return ret;
2246 
2247 	if (!STREAM)
2248 		raise_intentional_error_if_need(backend);
2249 
2250 	return POOL_CONTINUE;
2251 }
2252 
FunctionCall(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend)2253 POOL_STATUS FunctionCall(POOL_CONNECTION *frontend,
2254 								POOL_CONNECTION_POOL *backend)
2255 {
2256 	char dummy[2];
2257 	int oid;
2258 	int argn;
2259 	int i;
2260 
2261 	for (i=0;i<NUM_BACKENDS;i++)
2262 	{
2263 		if (VALID_BACKEND(i))
2264 		{
2265 			pool_write(CONNECTION(backend, i), "F", 1);
2266 		}
2267 	}
2268 
2269 	/* dummy */
2270 	pool_read(frontend, dummy, sizeof(dummy));
2271 
2272 	for (i=0;i<NUM_BACKENDS;i++)
2273 	{
2274 		if (VALID_BACKEND(i))
2275 		{
2276 			pool_write(CONNECTION(backend, i), dummy, sizeof(dummy));
2277 		}
2278 	}
2279 
2280 	/* function object id */
2281 	pool_read(frontend, &oid, sizeof(oid));
2282 
2283 	for (i=0;i<NUM_BACKENDS;i++)
2284 	{
2285 		if (VALID_BACKEND(i))
2286 		{
2287 			pool_write(CONNECTION(backend, i), &oid, sizeof(oid));
2288 		}
2289 	}
2290 
2291 	/* number of arguments */
2292 	pool_read(frontend, &argn, sizeof(argn));
2293 
2294 	for (i=0;i<NUM_BACKENDS;i++)
2295 	{
2296 		if (VALID_BACKEND(i))
2297 		{
2298 			pool_write(CONNECTION(backend, i), &argn, sizeof(argn));
2299 		}
2300 	}
2301 
2302 	argn = ntohl(argn);
2303 
2304 	for (i=0;i<argn;i++)
2305 	{
2306 		int len;
2307 		char *arg;
2308 
2309 		/* length of each argument in bytes */
2310 		pool_read(frontend, &len, sizeof(len));
2311 
2312 		for (i=0;i<NUM_BACKENDS;i++)
2313 		{
2314 			if (VALID_BACKEND(i))
2315 			{
2316 				pool_write(CONNECTION(backend, i), &len, sizeof(len));
2317 			}
2318 		}
2319 
2320 		len = ntohl(len);
2321 
2322 		/* argument value itself */
2323 		if ((arg = pool_read2(frontend, len)) == NULL)
2324             ereport(FATAL,
2325                 (return_code(2),
2326                     errmsg("failed to process function call"),
2327                         errdetail("read from frontend failed")));
2328 
2329 		for (i=0;i<NUM_BACKENDS;i++)
2330 		{
2331 			if (VALID_BACKEND(i))
2332 			{
2333 				pool_write(CONNECTION(backend, i), arg, len);
2334 			}
2335 		}
2336 	}
2337 
2338 	for (i=0;i<NUM_BACKENDS;i++)
2339 	{
2340 		if (VALID_BACKEND(i))
2341 		{
2342 			pool_flush(CONNECTION(backend, i));
2343 		}
2344 	}
2345 	return POOL_CONTINUE;
2346 }
2347 
ProcessFrontendResponse(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend)2348 POOL_STATUS ProcessFrontendResponse(POOL_CONNECTION *frontend,
2349 									POOL_CONNECTION_POOL *backend)
2350 {
2351 	char fkind;
2352 	char *bufp = NULL;
2353 	char *contents;
2354 	POOL_STATUS status;
2355 	int len = 0;
2356 
2357 	/* Get session context */
2358 	pool_get_session_context(false);
2359 
2360 	if (pool_read_buffer_is_empty(frontend) && frontend->no_forward != 0)
2361 		return POOL_CONTINUE;
2362 
2363 	pool_read(frontend, &fkind, 1);
2364 
2365 	ereport(DEBUG1,
2366 		(errmsg("processing frontend response"),
2367 			 errdetail("received kind '%c'(%02x) from frontend",fkind,fkind)));
2368 
2369 
2370 	if (MAJOR(backend) == PROTO_MAJOR_V3)
2371 	{
2372 		if (pool_read(frontend, &len, sizeof(len)) < 0)
2373             ereport(ERROR,
2374                 (errmsg("unable to process frontend response"),
2375                      errdetail("failed to read message length from frontend. frontend abnormally exited")));
2376 
2377 		len = ntohl(len) - 4;
2378 		if (len > 0)
2379 			bufp = pool_read2(frontend, len);
2380 		else if (len < 0)
2381 			ereport(ERROR,
2382 					(errmsg("frontend message length is less than 4 (kind: %c)", fkind)));
2383 	}
2384 	else
2385 	{
2386 		if (fkind != 'F')
2387 			bufp = pool_read_string(frontend, &len, 0);
2388 	}
2389 
2390 	if (len > 0 && bufp == NULL)
2391         ereport(ERROR,
2392             (errmsg("unable to process frontend response"),
2393                  errdetail("failed to read message from frontend. frontend abnormally exited")));
2394 
2395 	if (fkind != 'S' && pool_is_ignore_till_sync())
2396 	{
2397 		/*
2398 		 * Flag setting for calling ProcessBackendResponse()
2399 		 * in pool_process_query().
2400 		 */
2401 		if (!pool_is_query_in_progress())
2402 			pool_set_query_in_progress();
2403 
2404 		return POOL_CONTINUE;
2405 	}
2406 
2407 	pool_unset_doing_extended_query_message();
2408 
2409 	/*
2410 	 * Allocate buffer and copy the packet contents.  Because inside
2411 	 * these protocol modules, pool_read2 maybe called and modify its
2412 	 * buffer contents.
2413 	 */
2414 	if (len > 0)
2415 	{
2416 		contents = palloc(len);
2417 		memcpy(contents, bufp, len);
2418 	}
2419 	else
2420 	{
2421 		/*
2422 		 * Set dummy content if len <= 0.
2423 		 * this happens only when protocol version is 2.
2424 		 */
2425 		contents = palloc(1);
2426 		memcpy(contents, "", 1);
2427 	}
2428 
2429 	switch (fkind)
2430 	{
2431 		POOL_QUERY_CONTEXT *query_context;
2432 		char *query;
2433 		Node *node;
2434 		List *parse_tree_list;
2435 		bool error;
2436 
2437 		case 'X':	/* Terminate */
2438 			if(contents)
2439 				pfree(contents);
2440             ereport(DEBUG1,
2441                 (errmsg("Frontend terminated"),
2442                      errdetail("received message kind 'X' from frontend")));
2443             return POOL_END;
2444 
2445 		case 'Q':	/* Query */
2446 			allow_close_transaction = 1;
2447 			status = SimpleQuery(frontend, backend, len, contents);
2448 			break;
2449 
2450 		case 'E':	/* Execute */
2451 			allow_close_transaction = 1;
2452 			pool_set_doing_extended_query_message();
2453 			if (!pool_is_query_in_progress() && !pool_is_ignore_till_sync())
2454 				pool_set_query_in_progress();
2455 			status = Execute(frontend, backend, len, contents);
2456 			break;
2457 
2458 		case 'P':	/* Parse */
2459 			allow_close_transaction = 0;
2460 			pool_set_doing_extended_query_message();
2461 			status = Parse(frontend, backend, len, contents);
2462 			break;
2463 
2464 		case 'B':	/* Bind */
2465 			pool_set_doing_extended_query_message();
2466 			status = Bind(frontend, backend, len, contents);
2467 			break;
2468 
2469 		case 'C':	/* Close */
2470 			pool_set_doing_extended_query_message();
2471 			if (!pool_is_query_in_progress() && !pool_is_ignore_till_sync())
2472 				pool_set_query_in_progress();
2473 			status = Close(frontend, backend, len, contents);
2474 			break;
2475 
2476 		case 'D':	/* Describe */
2477 			pool_set_doing_extended_query_message();
2478 			status = Describe(frontend, backend, len, contents);
2479 			break;
2480 
2481 		case 'S':  /* Sync */
2482 			pool_set_doing_extended_query_message();
2483 			if (pool_is_ignore_till_sync())
2484 				pool_unset_ignore_till_sync();
2485 
2486 			if (STREAM)
2487 			{
2488 				POOL_PENDING_MESSAGE *msg;
2489 
2490 				pool_unset_query_in_progress();
2491 				msg = pool_pending_message_create('S', 0, NULL);
2492 				pool_pending_message_add(msg);
2493 				pool_pending_message_free_pending_message(msg);
2494 			}
2495 			else if (!pool_is_query_in_progress())
2496 				pool_set_query_in_progress();
2497 			status = SimpleForwardToBackend(fkind, frontend, backend, len, contents);
2498 
2499 			if (STREAM)
2500 			{
2501 				/* Wait till Ready for query received */
2502 				pool_wait_till_ready_for_query(backend);
2503 			}
2504 			break;
2505 
2506 		case 'F':	/* FunctionCall */
2507 			/*
2508 			 * Create dummy query context as if it were an INSERT.
2509 			 */
2510 			query_context = pool_init_query_context();
2511 			query = "INSERT INTO foo VALUES(1)";
2512 			MemoryContext old_context = MemoryContextSwitchTo(query_context->memory_context);
2513 
2514 			parse_tree_list = raw_parser(query, &error);
2515 			node = (Node *) lfirst(list_head(parse_tree_list));
2516 			pool_start_query(query_context, query, strlen(query) + 1, node);
2517 
2518 			MemoryContextSwitchTo(old_context);
2519 
2520 			pool_where_to_send(query_context, query_context->original_query,
2521 							   query_context->parse_tree);
2522 
2523 			if (MAJOR(backend) == PROTO_MAJOR_V3)
2524 				status = FunctionCall3(frontend, backend, len, contents);
2525 			else
2526 				status = FunctionCall(frontend, backend);
2527 
2528 			break;
2529 
2530 		case 'c':	/* CopyDone */
2531 		case 'd':	/* CopyData */
2532 		case 'f':	/* CopyFail */
2533 		case 'H':	/* Flush */
2534 			if (MAJOR(backend) == PROTO_MAJOR_V3)
2535 			{
2536 				if (fkind == 'H')
2537 				{
2538 					pool_set_doing_extended_query_message();
2539 				}
2540 				status = SimpleForwardToBackend(fkind, frontend, backend, len, contents);
2541 
2542 				/*
2543 				 * After flush message received, extended query mode should be
2544 				 * continued.
2545 				 */
2546 				if (fkind != 'H' && pool_is_doing_extended_query_message())
2547 				{
2548 					pool_unset_doing_extended_query_message();
2549 				}
2550 
2551 				break;
2552 			}
2553 
2554 		default:
2555             ereport(FATAL,
2556                 (return_code(2),
2557                  errmsg("unable to process frontend response"),
2558                      errdetail("unknown message type %c(%02x)", fkind, fkind)));
2559 
2560 	}
2561 	if(contents)
2562 		pfree(contents);
2563 
2564 	if (status != POOL_CONTINUE)
2565         ereport(FATAL,
2566             (return_code(2),
2567                 errmsg("unable to process frontend response")));
2568 
2569 	return status;
2570 }
2571 
ProcessBackendResponse(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend,int * state,short * num_fields)2572 POOL_STATUS ProcessBackendResponse(POOL_CONNECTION *frontend,
2573 								   POOL_CONNECTION_POOL *backend,
2574 								   int *state, short *num_fields)
2575 {
2576 	int status = POOL_CONTINUE;
2577 	char kind;
2578 
2579 	/* Get session context */
2580 	pool_get_session_context(false);
2581 
2582 	if (pool_is_ignore_till_sync())
2583 	{
2584 		if (pool_is_query_in_progress())
2585 			pool_unset_query_in_progress();
2586 
2587 		/*
2588 		 * Check if we have pending data in backend connection cache. If we
2589 		 * do, it is likely that a sync message has been sent to backend and
2590 		 * the backend replied back to us. So we need to process it.
2591 		 */
2592 		if (is_backend_cache_empty(backend))
2593 		{
2594 			return POOL_CONTINUE;
2595 		}
2596 	}
2597 
2598 	if (pool_is_skip_reading_from_backends())
2599 	{
2600 		pool_unset_skip_reading_from_backends();
2601 		return POOL_CONTINUE;
2602 	}
2603 
2604     read_kind_from_backend(frontend, backend, &kind);
2605 
2606 	/*
2607 	 * Sanity check
2608 	 */
2609 	if (kind == 0)
2610 	{
2611         ereport(FATAL,
2612             (return_code(2),
2613                 errmsg("unable to process backend response"),
2614                  errdetail("invalid message kind sent by backend connection")));
2615 	}
2616 	ereport(DEBUG1,
2617 		(errmsg("processing backend response"),
2618 			 errdetail("received kind '%c'(%02x) from backend",kind,kind)));
2619 
2620 
2621 	if (MAJOR(backend) == PROTO_MAJOR_V3)
2622 	{
2623 		switch (kind)
2624 		{
2625 			case 'G':	/* CopyInResponse */
2626 				status = CopyInResponse(frontend, backend);
2627 				break;
2628 
2629 			case 'S':	/* ParameterStatus */
2630 				status = ParameterStatus(frontend, backend);
2631 				break;
2632 
2633 			case 'Z':	/* ReadyForQuery */
2634 				ereport(DEBUG1,
2635 					(errmsg("processing backend response"),
2636 						 errdetail("Ready For Query received")));
2637 				status = ReadyForQuery(frontend, backend, true, true);
2638 #ifdef DEBUG
2639 				extern bool stop_now;
2640 				if (stop_now)
2641 					exit(0);
2642 #endif
2643 				break;
2644 
2645 			case '1':	/* ParseComplete */
2646 				if (STREAM)
2647 				{
2648 					POOL_PENDING_MESSAGE *pmsg;
2649 					pmsg = pool_pending_message_get_previous_message();
2650 					if (pmsg && pmsg->not_forward_to_frontend)
2651 					{
2652 						/* parse_before_bind() was called. Do not foward the
2653 						 * parse complete message to frontend. */
2654 						ereport(DEBUG1,
2655 								(errmsg("processing backend response"),
2656 								 errdetail("do not forward parse complete message to frontend")));
2657 						pool_discard_packet_contents(backend);
2658 						pool_unset_query_in_progress();
2659 						pool_set_command_success();
2660 						status = POOL_CONTINUE;
2661 						break;
2662 					}
2663 				}
2664 				status = ParseComplete(frontend, backend);
2665 				pool_set_command_success();
2666 				pool_unset_query_in_progress();
2667 				break;
2668 
2669 			case '2':	/* BindComplete */
2670 				status = BindComplete(frontend, backend);
2671 				pool_set_command_success();
2672 				pool_unset_query_in_progress();
2673 				break;
2674 
2675 			case '3':	/* CloseComplete */
2676 				if (STREAM)
2677 				{
2678 					POOL_PENDING_MESSAGE *pmsg;
2679 					pmsg = pool_pending_message_get_previous_message();
2680 					if (pmsg && pmsg->not_forward_to_frontend)
2681 					{
2682 						/* parse_before_bind() was called. Do not foward the
2683 						 * close complete message to frontend. */
2684 						ereport(DEBUG1,
2685 								(errmsg("processing backend response"),
2686 								 errdetail("do not forward close complete message to frontend")));
2687 						pool_discard_packet_contents(backend);
2688 
2689 						/* Remove pending message here because
2690 						 * read_kind_from_backend() did not do it.
2691 						 */
2692 						pmsg = pool_pending_message_pull_out();
2693 						pool_pending_message_free_pending_message(pmsg);
2694 
2695 						if (pool_is_doing_extended_query_message())
2696 							pool_unset_query_in_progress();
2697 						pool_set_command_success();
2698 						status = POOL_CONTINUE;
2699 						break;
2700 					}
2701 				}
2702 
2703 				status = CloseComplete(frontend, backend);
2704 				pool_set_command_success();
2705 				if (pool_is_doing_extended_query_message())
2706 					pool_unset_query_in_progress();
2707 				break;
2708 
2709 			case 'E':	/* ErrorResponse */
2710 				status = ErrorResponse3(frontend, backend);
2711 				pool_unset_command_success();
2712 				if (TSTATE(backend, MASTER_SLAVE ? PRIMARY_NODE_ID :
2713 						   REAL_MASTER_NODE_ID) != 'I')
2714 					pool_set_failed_transaction();
2715 				if (pool_is_doing_extended_query_message())
2716 				{
2717 					pool_set_ignore_till_sync();
2718 					pool_unset_query_in_progress();
2719 					if (STREAM)
2720 						pool_discard_except_sync_and_ready_for_query(frontend, backend);
2721 				}
2722 				break;
2723 
2724 			case 'C':	/* CommandComplete */
2725 				status = CommandComplete(frontend, backend, true);
2726 				pool_set_command_success();
2727 				if (pool_is_doing_extended_query_message())
2728 					pool_unset_query_in_progress();
2729 				break;
2730 
2731 			case 't':	/* ParameterDescription */
2732 				status = ParameterDescription(frontend, backend);
2733 				break;
2734 
2735 			case 'I':	/* EmptyQueryResponse */
2736 				status = CommandComplete(frontend, backend, false);
2737 				/* Empty query response message should be treated same as
2738 				 * Command complete message. When we receive the Command
2739 				 * complete message, we unset the query in progress flag if
2740 				 * operated in streaming replication mode. So we unset the
2741 				 * flag as well. See bug 190 for more details.
2742 				 */
2743 				if (pool_is_doing_extended_query_message())
2744 					pool_unset_query_in_progress();
2745 				break;
2746 
2747 			case 'T':	/* RowDescription */
2748 				status = SimpleForwardToFrontend(kind, frontend, backend);
2749 				if (pool_is_doing_extended_query_message())
2750 					pool_unset_query_in_progress();
2751 				break;
2752 
2753 			case 'n':	/* NoData */
2754 				status = SimpleForwardToFrontend(kind, frontend, backend);
2755 				if (pool_is_doing_extended_query_message())
2756 					pool_unset_query_in_progress();
2757 				break;
2758 
2759 			case 's':	/* PortalSuspended */
2760 				status = SimpleForwardToFrontend(kind, frontend, backend);
2761 				if (pool_is_doing_extended_query_message())
2762 					pool_unset_query_in_progress();
2763 				break;
2764 
2765 			default:
2766 				status = SimpleForwardToFrontend(kind, frontend, backend);
2767 				break;
2768 		}
2769 
2770 		if (STREAM && pool_is_doing_extended_query_message())
2771 			pool_reset_preferred_master_node_id();
2772 	}
2773 	else
2774 	{
2775 		switch (kind)
2776 		{
2777 			case 'A':	/* NotificationResponse */
2778 				status = NotificationResponse(frontend, backend);
2779 				break;
2780 
2781 			case 'B':	/* BinaryRow */
2782 				status = BinaryRow(frontend, backend, *num_fields);
2783 				break;
2784 
2785 			case 'C':	/* CompletedResponse */
2786 				status = CompletedResponse(frontend, backend);
2787 				break;
2788 
2789 			case 'D':	/* AsciiRow */
2790 				status = AsciiRow(frontend, backend, *num_fields);
2791 				break;
2792 
2793 			case 'E':	/* ErrorResponse */
2794 				status = ErrorResponse(frontend, backend);
2795 				if (TSTATE(backend, MASTER_SLAVE ? PRIMARY_NODE_ID :
2796 						   REAL_MASTER_NODE_ID) != 'I')
2797 					pool_set_failed_transaction();
2798 				break;
2799 
2800 			case 'G':	/* CopyInResponse */
2801 				status = CopyInResponse(frontend, backend);
2802 				break;
2803 
2804 			case 'H':	/* CopyOutResponse */
2805 				status = CopyOutResponse(frontend, backend);
2806 				break;
2807 
2808 			case 'I':	/* EmptyQueryResponse */
2809 				EmptyQueryResponse(frontend, backend);
2810 				break;
2811 
2812 			case 'N':	/* NoticeResponse */
2813 				NoticeResponse(frontend, backend);
2814 				break;
2815 
2816 			case 'P':	/* CursorResponse */
2817 				status = CursorResponse(frontend, backend);
2818 				break;
2819 
2820 			case 'T':	/* RowDescription */
2821 				status = RowDescription(frontend, backend, num_fields);
2822 				break;
2823 
2824 			case 'V':	/* FunctionResultResponse and FunctionVoidResponse */
2825 				status = FunctionResultResponse(frontend, backend);
2826 				break;
2827 
2828 			case 'Z':	/* ReadyForQuery */
2829 				status = ReadyForQuery(frontend, backend, true, true);
2830 				break;
2831 
2832 			default:
2833                 ereport(FATAL,
2834                         (return_code(1),
2835                          errmsg("Unknown message type %c(%02x)", kind, kind)));
2836 		}
2837 	}
2838 
2839 	/* Do we receive ready for query while processing reset
2840 	 * request?
2841 	 */
2842 	if (kind == 'Z' && frontend->no_forward && *state == 1)
2843 	{
2844 		*state = 0;
2845 	}
2846 
2847 	if (status != POOL_CONTINUE)
2848         ereport(FATAL,
2849             (return_code(2),
2850                  errmsg("unable to process backend response for message kind '%c'",kind)));
2851 
2852 	return status;
2853 }
2854 
CopyInResponse(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend)2855 POOL_STATUS CopyInResponse(POOL_CONNECTION *frontend,
2856 								  POOL_CONNECTION_POOL *backend)
2857 {
2858 	POOL_STATUS status;
2859 
2860 	/* forward to the frontend */
2861 	if (MAJOR(backend) == PROTO_MAJOR_V3)
2862 	{
2863 		SimpleForwardToFrontend('G', frontend, backend);
2864 		pool_flush(frontend);
2865 	}
2866 	else
2867 		pool_write_and_flush(frontend, "G", 1);
2868 
2869 	status = CopyDataRows(frontend, backend, 1);
2870 	return status;
2871 }
2872 
CopyOutResponse(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend)2873 POOL_STATUS CopyOutResponse(POOL_CONNECTION *frontend,
2874 								   POOL_CONNECTION_POOL *backend)
2875 {
2876 	POOL_STATUS status;
2877 
2878 	/* forward to the frontend */
2879 	if (MAJOR(backend) == PROTO_MAJOR_V3)
2880 	{
2881 		SimpleForwardToFrontend('H', frontend, backend);
2882 		pool_flush(frontend);
2883 	}
2884 	else
2885 		pool_write_and_flush(frontend, "H", 1);
2886 
2887 	status = CopyDataRows(frontend, backend, 0);
2888 	return status;
2889 }
2890 
CopyDataRows(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend,int copyin)2891 POOL_STATUS CopyDataRows(POOL_CONNECTION *frontend,
2892 								POOL_CONNECTION_POOL *backend, int copyin)
2893 {
2894 	char *string = NULL;
2895 	int len;
2896 	int i;
2897 
2898 #ifdef DEBUG
2899 	int j = 0;
2900 	char buf[1024];
2901 #endif
2902 
2903 	for (;;)
2904 	{
2905 		if (copyin)
2906 		{
2907 			if (MAJOR(backend) == PROTO_MAJOR_V3)
2908 			{
2909 				char kind;
2910 				char *contents = NULL;
2911 
2912 				pool_read(frontend, &kind, 1);
2913 
2914 				ereport(DEBUG1,
2915 					(errmsg("copy data rows"),
2916 						 errdetail("read kind from frontend %c(%02x)", kind, kind)));
2917 
2918 				pool_read(frontend, &len, sizeof(len));
2919 				len = ntohl(len) - 4;
2920 				if (len > 0)
2921 					contents = pool_read2(frontend, len);
2922 
2923 				SimpleForwardToBackend(kind, frontend, backend, len, contents);
2924 
2925 				/* CopyData? */
2926 				if (kind == 'd')
2927 					continue;
2928 				else
2929 				{
2930 					ereport(DEBUG1,
2931 						(errmsg("copy data rows"),
2932 							 errdetail("invalid copyin kind. expected 'd' got '%c'", kind)));
2933 					break;
2934 				}
2935 			}
2936 			else
2937 				string = pool_read_string(frontend, &len, 1);
2938 		}
2939 		else
2940 		{
2941 			/* CopyOut */
2942 			if (MAJOR(backend) == PROTO_MAJOR_V3)
2943 			{
2944 				signed char kind;
2945 
2946 				kind = pool_read_kind(backend);
2947 
2948 				SimpleForwardToFrontend(kind, frontend, backend);
2949 
2950 				/* CopyData? */
2951 				if (kind == 'd')
2952 					continue;
2953 				else
2954 					break;
2955 			}
2956 			else
2957 			{
2958 				for (i=0;i<NUM_BACKENDS;i++)
2959 				{
2960 					if (VALID_BACKEND(i))
2961 					{
2962 						string = pool_read_string(CONNECTION(backend, i), &len, 1);
2963 					}
2964 				}
2965 			}
2966 		}
2967 
2968 		if (string == NULL)
2969             ereport(FATAL,
2970                 (errmsg("unable to copy data rows"),
2971                      errdetail("cannot read string message from backend")));
2972 
2973 #ifdef DEBUG
2974 		strlcpy(buf, string, sizeof(buf));
2975 		ereport(DEBUG1,
2976 			(errmsg("copy data rows"),
2977 				 errdetail("copy line %d %d bytes :%s:", j++, len, buf)));
2978 #endif
2979 
2980 		if (copyin)
2981 		{
2982 			for (i=0;i<NUM_BACKENDS;i++)
2983 			{
2984 				if (VALID_BACKEND(i))
2985 				{
2986 					pool_write(CONNECTION(backend, i), string, len);
2987 				}
2988 			}
2989 		}
2990 		else
2991 			pool_write(frontend, string, len);
2992 
2993 		if (len == PROTO_MAJOR_V3)
2994 		{
2995 			/* end of copy? */
2996 			if (string[0] == '\\' &&
2997 				string[1] == '.' &&
2998 				string[2] == '\n')
2999 			{
3000 				break;
3001 			}
3002 		}
3003 	}
3004 
3005 	/*
3006 	 * Wait till backend responds
3007 	 */
3008 	if (copyin)
3009 	{
3010 		for (i=0;i<NUM_BACKENDS;i++)
3011 		{
3012 			if (VALID_BACKEND(i))
3013 			{
3014 				pool_flush(CONNECTION(backend, i));
3015 
3016 				/*
3017 				 * Check response from the backend.  First check SSL and read
3018 				 * buffer of the backend. It is possible that there's an error
3019 				 * message in the buffer if the COPY command went wrong.
3020 				 * Otherwise wait for data arrival to the backend socket.
3021 				 */
3022 				if (!pool_ssl_pending(CONNECTION(backend, i)) &&
3023 					pool_read_buffer_is_empty(CONNECTION(backend, i)) &&
3024 					synchronize(CONNECTION(backend, i)))
3025 					ereport(FATAL,
3026 							(return_code(2),
3027 							 errmsg("unable to copy data rows"),
3028 							 errdetail("failed to synchronize")));
3029 			}
3030 		}
3031 	}
3032 	else
3033 		pool_flush(frontend);
3034 
3035 	return POOL_CONTINUE;
3036 }
3037 
3038 /*
3039  * This function raises intentional error to make backends the same
3040  * transaction state.
3041  */
raise_intentional_error_if_need(POOL_CONNECTION_POOL * backend)3042 void raise_intentional_error_if_need(POOL_CONNECTION_POOL *backend)
3043 {
3044 	int i;
3045 	POOL_SESSION_CONTEXT *session_context;
3046 	POOL_QUERY_CONTEXT *query_context;
3047 
3048 	/* Get session context */
3049 	session_context = pool_get_session_context(false);
3050 
3051 	query_context = session_context->query_context;
3052 
3053 	if (MASTER_SLAVE &&
3054 		TSTATE(backend, PRIMARY_NODE_ID) == 'T' &&
3055 		PRIMARY_NODE_ID != MASTER_NODE_ID &&
3056 		query_context &&
3057 		is_select_query(query_context->parse_tree, query_context->original_query))
3058 	{
3059 		pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
3060 		if (pool_is_doing_extended_query_message())
3061 		{
3062 			do_error_execute_command(backend, PRIMARY_NODE_ID, PROTO_MAJOR_V3);
3063 		}
3064 		else
3065 		{
3066 			do_error_command(CONNECTION(backend, PRIMARY_NODE_ID), MAJOR(backend));
3067 		}
3068 		ereport(DEBUG1,
3069 			(errmsg("raising intentional error"),
3070 				errdetail("generating intentional error to sync backends transaction states")));
3071 	}
3072 
3073 	if (REPLICATION &&
3074 		TSTATE(backend, REAL_MASTER_NODE_ID) == 'T' &&
3075 		!pool_config->replicate_select &&
3076 		query_context &&
3077 		is_select_query(query_context->parse_tree, query_context->original_query))
3078 	{
3079 		for (i = 0; i < NUM_BACKENDS; i++)
3080 		{
3081 			/*
3082 			 * Send a syntax error query to all backends except the node
3083 			 * which the original query was sent.
3084 			 */
3085 			if (pool_is_node_to_be_sent(query_context, i))
3086 				continue;
3087 			else
3088 				pool_set_node_to_be_sent(query_context, i);
3089 
3090 			if (VALID_BACKEND(i))
3091 			{
3092 				/*
3093 				 * We must abort transaction to sync transaction state.
3094 				 * If the error was caused by an Execute message,
3095 				 * we must send invalid Execute message to abort
3096 				 * transaction.
3097 				 *
3098 				 * Because extended query protocol ignores all
3099 				 * messages before receiving Sync message inside error state.
3100 				 */
3101 				if (pool_is_doing_extended_query_message())
3102 				{
3103 					do_error_execute_command(backend, i, PROTO_MAJOR_V3);
3104 				}
3105 				else
3106 				{
3107 					do_error_command(CONNECTION(backend, i), MAJOR(backend));
3108 				}
3109 			}
3110 		}
3111 	}
3112 }
3113 
3114 /*
3115  * Check various errors from backend.  return values:
3116  *  0: no error
3117  *  1: deadlock detected
3118  *  2: serialization error detected
3119  *  3: query cancel
3120  * detected: 4
3121  */
check_errors(POOL_CONNECTION_POOL * backend,int backend_id)3122 static int check_errors(POOL_CONNECTION_POOL *backend, int backend_id)
3123 {
3124 
3125 	/*
3126 	 * Check dead lock error on the master node and abort
3127 	 * transactions on all nodes if so.
3128 	 */
3129 	if (detect_deadlock_error(CONNECTION(backend, backend_id), MAJOR(backend)) == SPECIFIED_ERROR)
3130 		return 1;
3131 
3132 	/*
3133 	 * Check serialization failure error and abort
3134 	 * transactions on all nodes if so. Otherwise we allow
3135 	 * data inconsistency among DB nodes. See following
3136 	 * scenario: (M:master, S:slave)
3137 	 *
3138 	 * M:S1:BEGIN;
3139 	 * M:S2:BEGIN;
3140 	 * S:S1:BEGIN;
3141 	 * S:S2:BEGIN;
3142 	 * M:S1:SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
3143 	 * M:S2:SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
3144 	 * S:S1:SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
3145 	 * S:S2:SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
3146 	 * M:S1:UPDATE t1 SET i = i + 1;
3147 	 * S:S1:UPDATE t1 SET i = i + 1;
3148 	 * M:S2:UPDATE t1 SET i = i + 1; <-- blocked
3149 	 * S:S1:COMMIT;
3150 	 * M:S1:COMMIT;
3151 	 * M:S2:ERROR:  could not serialize access due to concurrent update
3152 	 * S:S2:UPDATE t1 SET i = i + 1; <-- success in UPDATE and data becomes inconsistent!
3153 	 */
3154 	if (detect_serialization_error(CONNECTION(backend, backend_id), MAJOR(backend), true) == SPECIFIED_ERROR)
3155 		return 2;
3156 
3157 	/*
3158 	 * check "SET TRANSACTION ISOLATION LEVEL must be called before any query" error.
3159 	 * This happens in following scenario:
3160 	 *
3161 	 * M:S1:BEGIN;
3162 	 * S:S1:BEGIN;
3163 	 * M:S1:SELECT 1; <-- only sent to MASTER
3164 	 * M:S1:SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
3165 	 * S:S1:SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
3166 	 * M: <-- error
3167 	 * S: <-- ok since no previous SELECT is sent. kind mismatch error occurs!
3168 	 */
3169 	if (detect_active_sql_transaction_error(CONNECTION(backend, backend_id), MAJOR(backend)) == SPECIFIED_ERROR)
3170 		return 3;
3171 
3172 	/* check query cancel error */
3173 	if (detect_query_cancel_error(CONNECTION(backend, backend_id), MAJOR(backend)) == SPECIFIED_ERROR)
3174 		return 4;
3175 
3176 	return 0;
3177 }
3178 
generate_error_message(char * prefix,int specific_error,char * query)3179 static void generate_error_message(char *prefix, int specific_error, char *query)
3180 {
3181 	POOL_SESSION_CONTEXT *session_context;
3182 
3183 	session_context = pool_get_session_context(true);
3184 	if (!session_context)
3185 		return;
3186 
3187 	static char *error_messages[] = {
3188 		"received deadlock error message from master node. query: %s",
3189 		"received serialization failure error message from master node. query: %s",
3190 		"received SET TRANSACTION ISOLATION LEVEL must be called before any query error. query: %s",
3191 		"received query cancel error message from master node. query: %s"
3192 	};
3193 
3194 	String *msg;
3195 
3196 	if (specific_error < 1 || specific_error > sizeof(error_messages)/sizeof(char *))
3197 	{
3198 		ereport(LOG,
3199 				(errmsg("generate_error_message: invalid specific_error: %d", specific_error)));
3200 		return;
3201 	}
3202 
3203 	specific_error--;
3204 
3205 	msg = init_string(prefix);
3206 	string_append_char(msg, error_messages[specific_error]);
3207 	ereport(LOG,
3208 			(errmsg(msg->data, query)));
3209 	free_string(msg);
3210 }
3211 
3212 /*
3213  * Make per DB node statement log
3214  */
per_node_statement_log(POOL_CONNECTION_POOL * backend,int node_id,char * query)3215 void per_node_statement_log(POOL_CONNECTION_POOL *backend, int node_id, char *query)
3216 {
3217 	POOL_CONNECTION_POOL_SLOT *slot = backend->slots[node_id];
3218 
3219 	if (pool_config->log_per_node_statement)
3220 		ereport(LOG,
3221 			(errmsg("DB node id: %d backend pid: %d statement: %s", node_id, ntohl(slot->pid), query)));
3222 }
3223 
3224 /*
3225  * Check kind and produce error message
3226  * All data read in this function is returned to stream.
3227  */
per_node_error_log(POOL_CONNECTION_POOL * backend,int node_id,char * query,char * prefix,bool unread)3228 void per_node_error_log(POOL_CONNECTION_POOL *backend, int node_id, char *query, char *prefix, bool unread)
3229 {
3230 	POOL_CONNECTION_POOL_SLOT *slot = backend->slots[node_id];
3231 	char *message;
3232 
3233 	if (pool_extract_error_message(true, CONNECTION(backend, node_id), MAJOR(backend), unread, &message) == 1)
3234 	{
3235 		ereport(LOG,
3236 			(errmsg("%s: DB node id: %d backend pid: %d statement: \"%s\" message: \"%s\"",
3237 				 prefix, node_id, ntohl(slot->pid), query, message)));
3238 		pfree(message);
3239 	}
3240 }
3241 
3242 /*
3243  * Send parse message to primary/master node and wait for reply if particular
3244  * message is not yet parsed on the primary/master node but parsed on other
3245  * node. Caller must provide the parse message data as "message".
3246  */
parse_before_bind(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend,POOL_SENT_MESSAGE * message,POOL_SENT_MESSAGE * bind_message)3247 static POOL_STATUS parse_before_bind(POOL_CONNECTION *frontend,
3248 									 POOL_CONNECTION_POOL *backend,
3249 									 POOL_SENT_MESSAGE *message,
3250 									 POOL_SENT_MESSAGE *bind_message)
3251 {
3252 	int i;
3253 	int len = message->len;
3254 	char kind = '\0';
3255 	char *contents = message->contents;
3256 	bool parse_was_sent = false;
3257 	bool backup[MAX_NUM_BACKENDS];
3258 	POOL_QUERY_CONTEXT *qc = message->query_context;
3259 
3260 	memcpy(backup, qc->where_to_send, sizeof(qc->where_to_send));
3261 
3262 	if (STREAM)
3263 	{
3264 		if (message->kind == 'P' && qc->where_to_send[PRIMARY_NODE_ID] == 0)
3265 		{
3266 			POOL_PENDING_MESSAGE *pmsg;
3267 			POOL_QUERY_CONTEXT *new_qc;
3268 			char message_body[1024];
3269 			int offset;
3270 			int message_len;
3271 
3272 			/* we are in streaming replication mode and the parse message has not
3273 			 * been sent to primary yet */
3274 
3275 			/* Prepare modified query context */
3276 			new_qc = pool_query_context_shallow_copy(qc);
3277 			memset(new_qc->where_to_send, 0, sizeof(new_qc->where_to_send));
3278 			new_qc->where_to_send[PRIMARY_NODE_ID] = 1;
3279 			new_qc->virtual_master_node_id = PRIMARY_NODE_ID;
3280 
3281 			/* Before sending the parse message to the primary, we need to
3282 			 * close the named statement. Otherwise we will get an error from
3283 			 * backend if the named statement already exists. This could
3284 			 * happend if parse_before_bind is called with a bind message
3285 			 * using same named statement. If the named statement does not
3286 			 * exist, it's fine. PostgreSQL just ignores a request trying to
3287 			 * close a non-existing statement. If the statement is unnamed
3288 			 * one, we do not need it because unnamed statement can be
3289 			 * overwritten anytime.
3290 			 */
3291 			message_body[0] = 'S';
3292 			offset = strlen(bind_message->contents)+1;
3293 
3294 			ereport(DEBUG1,
3295 					(errmsg("parse before bind"),
3296 					 errdetail("close statement: %s", bind_message->contents+offset)));
3297 
3298 			if (bind_message->contents[offset] != '\0')
3299 			{
3300 				message_len = 1 + strlen(bind_message->contents+offset) + 1;
3301 				StrNCpy(message_body+1, bind_message->contents+offset, sizeof(message_body)-1);
3302 				pool_extended_send_and_wait(qc, "C", message_len, message_body, 1, PRIMARY_NODE_ID, false);
3303 				/* Add pending message */
3304 				pmsg = pool_pending_message_create('C', message_len, message_body);
3305 				pmsg->not_forward_to_frontend = true;
3306 				pool_pending_message_dest_set(pmsg, new_qc);
3307 				pool_pending_message_add(pmsg);
3308 				pool_pending_message_free_pending_message(pmsg);
3309 			}
3310 
3311 			/* Send parse message to primary node */
3312 			ereport(DEBUG1,
3313 					(errmsg("parse before bind"),
3314 					 errdetail("waiting for primary completing parse")));
3315 
3316 			pool_extended_send_and_wait(qc, "P", len, contents, 1, PRIMARY_NODE_ID, false);
3317 
3318 			/* Add pending message */
3319 			pmsg = pool_pending_message_create('P', len, contents);
3320 			pmsg->not_forward_to_frontend = true;
3321 			pool_pending_message_dest_set(pmsg, new_qc);
3322 			pool_pending_message_add(pmsg);
3323 			pool_pending_message_free_pending_message(pmsg);
3324 
3325 			/* Replace the query context of bind message */
3326 			bind_message->query_context = new_qc;
3327 
3328 #ifdef NOT_USED
3329 			/*
3330 			 * XXX 	pool_remove_sent_message() will pfree memory allocated by "contents".
3331 			 */
3332 
3333 			/* Remove old sent message */
3334 			pool_remove_sent_message('P', contents);
3335 			/* Create and add sent message of this parse message */
3336 			msg = pool_create_sent_message('P', len, contents, 0, contents, new_qc);
3337 			pool_add_sent_message(msg);
3338 #endif
3339 			/* Replace the query context of parse message */
3340 			message->query_context = new_qc;
3341 
3342 			return POOL_CONTINUE;
3343 		}
3344 		else
3345 		{
3346 			ereport(DEBUG1,
3347 					(errmsg("parse before bind"),
3348 					 errdetail("no need to re-send parse")));
3349 			return POOL_CONTINUE;
3350 		}
3351 	}
3352 	else
3353 	{
3354 		/* expect to send to master node only */
3355 		for (i = 0; i < NUM_BACKENDS; i++)
3356 		{
3357 			if (qc->where_to_send[i] && statecmp(qc->query_state[i], POOL_PARSE_COMPLETE) < 0)
3358 			{
3359 				ereport(DEBUG1,
3360 						(errmsg("parse before bind"),
3361 						 errdetail("waiting for backend %d completing parse", i)));
3362 
3363 				pool_extended_send_and_wait(qc, "P", len, contents, 1, i, false);
3364 			}
3365 			else
3366 			{
3367 				qc->where_to_send[i] = 0;
3368 			}
3369 		}
3370 	}
3371 
3372 	for (i = 0; i < NUM_BACKENDS; i++)
3373 	{
3374 		if (qc->where_to_send[i])
3375 		{
3376 			parse_was_sent = true;
3377 			break;
3378 		}
3379 	}
3380 
3381 	if (!STREAM && parse_was_sent)
3382 	{
3383 		pool_set_query_in_progress();
3384 
3385 		while (kind != '1')
3386 		{
3387 			PG_TRY();
3388 			{
3389 				read_kind_from_backend(frontend, backend, &kind);
3390 				pool_discard_packet_contents(backend);
3391 			}
3392 			PG_CATCH();
3393 			{
3394 				memcpy(qc->where_to_send, backup, sizeof(backup));
3395 				PG_RE_THROW();
3396 			}
3397 			PG_END_TRY();
3398 		}
3399 	}
3400 
3401 	memcpy(qc->where_to_send, backup, sizeof(backup));
3402 	return POOL_CONTINUE;
3403 }
3404 
3405 /*
3406  * Find victim nodes by "decide by majority" rule and returns array
3407  * of victim node ids. If no victim is found, return NULL.
3408  *
3409  * Arguments:
3410  * ntuples: Array of number of affected tuples. -1 represents down node.
3411  * nmembers: Number of elements in ntuples.
3412  * master_node: The master node id. Less than 0 means ignore this parameter.
3413  * number_of_nodes: Number of elements in victim nodes array.
3414  *
3415  * Note: If no one wins and master_node >= 0, winner would be the
3416  * master and other nodes who has same number of tuples as the master.
3417  *
3418  * Caution: Returned victim node array is allocated in static memory
3419  * of this function. Subsequent calls to this function will overwrite
3420  * the memory.
3421  */
find_victim_nodes(int * ntuples,int nmembers,int master_node,int * number_of_nodes)3422 static int* find_victim_nodes(int *ntuples, int nmembers, int master_node, int *number_of_nodes)
3423 {
3424 	static int victim_nodes[MAX_NUM_BACKENDS];
3425 	static int votes[MAX_NUM_BACKENDS];
3426 	int maxvotes;
3427 	int majority_ntuples;
3428 	int me;
3429 	int cnt;
3430 	int healthy_nodes;
3431 	int i, j;
3432 
3433 	healthy_nodes = 0;
3434 	*number_of_nodes = 0;
3435 	maxvotes = 0;
3436 	majority_ntuples = 0;
3437 
3438 	for (i=0;i<nmembers;i++)
3439 	{
3440 		me = ntuples[i];
3441 
3442 		/* Health node? */
3443 		if (me < 0)
3444 		{
3445 			votes[i] = -1;
3446 			continue;
3447 		}
3448 
3449 		healthy_nodes++;
3450 		votes[i] = 1;
3451 
3452 		for (j=0;j<nmembers;j++)
3453 		{
3454 			if (i != j && me == ntuples[j])
3455 			{
3456 				votes[i]++;
3457 
3458 				if (votes[i] > maxvotes)
3459 				{
3460 					maxvotes = votes[i];
3461 					majority_ntuples = me;
3462 				}
3463 			}
3464 		}
3465 	}
3466 
3467 	/* Everyone is different */
3468 	if (maxvotes == 1)
3469 	{
3470 		/* Master node is specified? */
3471 		if (master_node < 0)
3472 			return NULL;
3473 
3474 		/*
3475 		 * If master node is specified, let it and others who has same
3476 		 * ntuples win.
3477 		 */
3478 		majority_ntuples = ntuples[master_node];
3479 	}
3480 	else
3481 	{
3482 		/* Find number of majority */
3483 		cnt = 0;
3484 		for (i=0;i<nmembers;i++)
3485 		{
3486 			if (votes[i] == maxvotes)
3487 			{
3488 				cnt++;
3489 			}
3490 		}
3491 
3492 		if (cnt <= healthy_nodes / 2.0)
3493 		{
3494 			/* No one wins */
3495 
3496 			/* Master node is specified? */
3497 			if (master_node < 0)
3498 				return NULL;
3499 
3500 			/*
3501 			 * If master node is specified, let it and others who has same
3502 			 * ntuples win.
3503 			 */
3504 			majority_ntuples = ntuples[master_node];
3505 		}
3506 	}
3507 
3508 	/* Make victim nodes list */
3509 	for (i=0;i<nmembers;i++)
3510 	{
3511 		if (ntuples[i] >= 0 && ntuples[i] != majority_ntuples)
3512 		{
3513 			victim_nodes[(*number_of_nodes)++] = i;
3514 		}
3515 	}
3516 
3517 	return victim_nodes;
3518 }
3519 
3520 
3521 /*
3522  * flatten_set_variable_args
3523  *		Given a parsenode List as emitted by the grammar for SET,
3524  *		convert to the flat string representation used by GUC.
3525  *
3526  * We need to be told the name of the variable the args are for, because
3527  * the flattening rules vary (ugh).
3528  *
3529  * The result is NULL if args is NIL (ie, SET ... TO DEFAULT), otherwise
3530  * a palloc'd string.
3531  */
3532 static char *
flatten_set_variable_args(const char * name,List * args)3533 flatten_set_variable_args(const char *name, List *args)
3534 {
3535 	StringInfoData buf;
3536 	ListCell   *l;
3537 
3538 	/* Fast path if just DEFAULT */
3539 	if (args == NIL)
3540 		return NULL;
3541 
3542 	initStringInfo(&buf);
3543 
3544 	/*
3545 	 * Each list member may be a plain A_Const node, or an A_Const within a
3546 	 * TypeCast; the latter case is supported only for ConstInterval arguments
3547 	 * (for SET TIME ZONE).
3548 	 */
3549 	foreach(l, args)
3550 	{
3551 		Node	   *arg = (Node *) lfirst(l);
3552 		char	   *val;
3553 		A_Const    *con;
3554 
3555 		if (l != list_head(args))
3556 			appendStringInfoString(&buf, ", ");
3557 
3558 		if (IsA(arg, TypeCast))
3559 		{
3560 			TypeCast   *tc = (TypeCast *) arg;
3561 			arg = tc->arg;
3562 		}
3563 
3564 		if (!IsA(arg, A_Const))
3565 			elog(ERROR, "unrecognized node type: %d", (int) nodeTag(arg));
3566 		con = (A_Const *) arg;
3567 
3568 		switch (nodeTag(&con->val))
3569 		{
3570 			case T_Integer:
3571 				appendStringInfo(&buf, "%ld", intVal(&con->val));
3572 				break;
3573 			case T_Float:
3574 				/* represented as a string, so just copy it */
3575 				appendStringInfoString(&buf, strVal(&con->val));
3576 				break;
3577 			case T_String:
3578 				val = strVal(&con->val);
3579 				appendStringInfoString(&buf, val);
3580 				break;
3581 			default:
3582 				ereport(ERROR, (errmsg("unrecognized node type: %d",
3583 					 (int) nodeTag(&con->val))));
3584 				break;
3585 		}
3586 	}
3587 
3588 	return buf.data;
3589 }
3590 
3591 /* Called when sync message is received.
3592  * Wait till ready for query received.
3593  */
pool_wait_till_ready_for_query(POOL_CONNECTION_POOL * backend)3594 static void pool_wait_till_ready_for_query(POOL_CONNECTION_POOL *backend)
3595 {
3596 	char kind;
3597 	int len;
3598 	int poplen;
3599 	char *buf;
3600 	int i;
3601 
3602 	for (i=0;i<NUM_BACKENDS;i++)
3603 	{
3604 		if (VALID_BACKEND(i))
3605 		{
3606 			for (;;)
3607 			{
3608 				pool_read(CONNECTION(backend, i), &kind, sizeof(kind));
3609 				ereport(DEBUG1,
3610 						(errmsg("pool_wait_till_ready_for_query: kind: %c", kind)));
3611 				pool_push(CONNECTION(backend, i), &kind, sizeof(kind));
3612 				pool_read(CONNECTION(backend, i), &len, sizeof(len));
3613 				pool_push(CONNECTION(backend, i), &len, sizeof(len));
3614 				if ((ntohl(len)-sizeof(len)) > 0)
3615 				{
3616 					buf = pool_read2(CONNECTION(backend, i), ntohl(len)-sizeof(len));
3617 					pool_push(CONNECTION(backend, i), buf, ntohl(len)-sizeof(len));
3618 				}
3619 
3620 				if (kind == 'Z')	/* Ready for query? */
3621 				{
3622 					pool_pop(CONNECTION(backend, i), &poplen);
3623 					ereport(DEBUG1,
3624 							(errmsg("pool_wait_till_ready_for_query: backend:%d ready for query found. buffer length:%d",
3625 									i, CONNECTION(backend, i)->len)));
3626 					break;
3627 				}
3628 			}
3629 		}
3630 	}
3631 }
3632 
3633 /*
3634  * Called when error response received in streaming replication mode and doing
3635  * extended query. Remove all pending messages and backend message buffer data
3636  * except POOL_SYNC pending message and ready for query.  If sync message is
3637  * not received yet, continue to read data from frontend until a sync message
3638  * is read.
3639  */
pool_discard_except_sync_and_ready_for_query(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend)3640 static void pool_discard_except_sync_and_ready_for_query(POOL_CONNECTION *frontend,
3641 														 POOL_CONNECTION_POOL *backend)
3642 {
3643 	POOL_PENDING_MESSAGE *pmsg;
3644 	int i;
3645 
3646 	if (!pool_is_doing_extended_query_message() || !STREAM)
3647 		return;
3648 
3649 	/*
3650 	 * Check to see if we aready received a sync
3651 	 * message. If not, call ProcessFrontendResponse() to
3652 	 * get the sync message from client.
3653 	 */
3654 	pmsg = pool_pending_message_get(POOL_SYNC);
3655 	if (pmsg == NULL)
3656 	{
3657 		char kind;
3658 		int len;
3659 		POOL_PENDING_MESSAGE *msg;
3660 		char *contents;
3661 
3662 		for(;;)
3663 		{
3664 			pool_read(frontend, &kind, sizeof(kind));
3665 			pool_read(frontend, &len, sizeof(len));
3666 			len = ntohl(len) - sizeof(len);
3667 			if (len > 0)
3668 				contents = pool_read2(frontend, len);
3669 			if (kind == 'S')
3670 			{
3671 				msg = pool_pending_message_create('S', 0, NULL);
3672 				pool_pending_message_add(msg);
3673 				pool_pending_message_free_pending_message(msg);
3674 				SimpleForwardToBackend(kind, frontend, backend, len, contents);
3675 				break;
3676 			}
3677 		}
3678 	}
3679 	else
3680 		pool_pending_message_free_pending_message(pmsg);
3681 
3682 	/* Remove all pending messages except sync message */
3683 	do
3684 	{
3685 		pmsg = pool_pending_message_head_message();
3686 		if (pmsg && pmsg->type == POOL_SYNC)
3687 		{
3688 			ereport(DEBUG1,
3689 					(errmsg("Process backend response: sync pending message found after receiving error response")));
3690 			pool_unset_ignore_till_sync();
3691 			pool_pending_message_free_pending_message(pmsg);
3692 			break;
3693 		}
3694 		pool_pending_message_free_pending_message(pmsg);
3695 		pmsg = pool_pending_message_pull_out();
3696 		pool_pending_message_free_pending_message(pmsg);
3697 	}
3698 	while (pmsg);
3699 
3700 	pool_pending_message_reset_previous_message();
3701 
3702 	/* Discard read buffer execpt "Ready for query" */
3703 	for (i=0;i<NUM_BACKENDS;i++)
3704 	{
3705 		if (VALID_BACKEND(i))
3706 		{
3707 			char kind;
3708 			int len;
3709 			int sts;
3710 
3711 			while (!pool_read_buffer_is_empty(CONNECTION(backend, i)))
3712 			{
3713 				sts = pool_read(CONNECTION(backend, i), &kind, sizeof(kind));
3714 				if (sts < 0 || kind == '\0')
3715 				{
3716 					ereport(DEBUG1,
3717 							(errmsg("pool_discard_except_sync_and_ready_for_query: EOF detected while reading from backend: %d buffer length: %d sts: %d",
3718 									i, CONNECTION(backend, i)->len, sts)));
3719 					pool_unread(CONNECTION(backend, i), &kind, sizeof(kind));
3720 					break;
3721 				}
3722 
3723 				if (kind == 'Z')	/* Ready for query? */
3724 				{
3725 					pool_unread(CONNECTION(backend, i), &kind, sizeof(kind));
3726 					ereport(DEBUG1,
3727 							(errmsg("pool_discard_except_sync_and_ready_for_query: Ready for query found. backend:%d",
3728 									i)));
3729 					break;
3730 				}
3731 				else
3732 				{
3733 					/* Read and discard packet */
3734 					pool_read(CONNECTION(backend, i), &len, sizeof(len));
3735 					if ((ntohl(len)-sizeof(len)) > 0)
3736 					{
3737 						pool_read2(CONNECTION(backend, i), ntohl(len)-sizeof(len));
3738 					}
3739 					ereport(DEBUG1,
3740 							(errmsg("pool_discard_except_sync_and_ready_for_query: discarding packet %c (len:%lu) of backend:%d", kind, ntohl(len)-sizeof(len), i)));
3741 				}
3742 			}
3743 		}
3744 	}
3745 }
3746 
3747 /*
3748  * Handle misc treatment when a command successfully completed.
3749  * Preconditions: query is in progress. The command is succeeded.
3750  */
pool_at_command_success(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend)3751 void pool_at_command_success(POOL_CONNECTION *frontend, POOL_CONNECTION_POOL *backend)
3752 {
3753 	Node *node;
3754 	char *query;
3755 
3756 	/* Sanity checks */
3757 	if (!pool_is_query_in_progress())
3758 	{
3759 		ereport(ERROR,
3760 				(errmsg("pool_at_command_success: query is not in progress")));
3761 	}
3762 
3763 	if (!pool_is_command_success())
3764 	{
3765 		ereport(ERROR,
3766 				(errmsg("pool_at_command_success: command did not succeed")));
3767 	}
3768 
3769 	node = pool_get_parse_tree();
3770 
3771 	if (!node)
3772 	{
3773 		ereport(ERROR,
3774 				(errmsg("pool_at_command_success: no parse tree found")));
3775 	}
3776 
3777 	query = pool_get_query_string();
3778 
3779 	if (query == NULL)
3780 	{
3781 		ereport(ERROR,
3782 				(errmsg("pool_at_command_success: no query found")));
3783 	}
3784 
3785 	/*
3786 	 * If the query was BEGIN/START TRANSACTION, clear the
3787 	 * history that we had a writing command in the transaction
3788 	 * and forget the transaction isolation level.
3789 	 *
3790 	 * XXX If BEGIN is received while we are already in an
3791 	 * explicit transaction, the command *successes*
3792 	 * (just with a NOTICE message). In this case we lose
3793 	 * "writing_transaction" etc. info.
3794 	 */
3795 	if (is_start_transaction_query(node))
3796 	{
3797 		pool_unset_writing_transaction();
3798 		pool_unset_failed_transaction();
3799 		pool_unset_transaction_isolation();
3800 	}
3801 
3802 	/*
3803 	 * If the query was COMMIT/ABORT, clear the history
3804 	 * that we had a writing command in the transaction
3805 	 * and forget the transaction isolation level.  This
3806 	 * is necessary if succeeding transaction is not an
3807 	 * explicit one.
3808 	 */
3809 	else if (is_commit_or_rollback_query(node))
3810 	{
3811 		pool_unset_writing_transaction();
3812 		pool_unset_failed_transaction();
3813 		pool_unset_transaction_isolation();
3814 	}
3815 
3816 	/*
3817 	 * SET TRANSACTION ISOLATION LEVEL SERIALIZABLE or SET
3818 	 * SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL
3819 	 * SERIALIZABLE, remember it.
3820 	 */
3821 	else if (is_set_transaction_serializable(node))
3822 	{
3823 		pool_set_transaction_isolation(POOL_SERIALIZABLE);
3824 	}
3825 
3826 	/*
3827 	 * If 2PC commands has been executed, automatically close
3828 	 * transactions on standbys if there is any open
3829 	 * transaction since 2PC commands close transaction on
3830 	 * primary.
3831 	 */
3832 	else if (is_2pc_transaction_query(node))
3833 	{
3834 		close_standby_transactions(frontend, backend);
3835 	}
3836 
3837 	else if (!is_select_query(node, query))
3838 	{
3839 		/*
3840 		 * If the query was not READ SELECT, and we are in an
3841 		 * explicit transaction, remember that we had a write
3842 		 * query in this transaction.
3843 		 */
3844 		if (TSTATE(backend, MASTER_SLAVE ? PRIMARY_NODE_ID : REAL_MASTER_NODE_ID) == 'T')
3845 		{
3846 			/* However, if the query is "SET TRANSACTION READ ONLY" or its variant,
3847 			 * don't set it.
3848 			 */
3849 			if (!pool_is_transaction_read_only(node))
3850 			{
3851 				ereport(DEBUG1,
3852 						(errmsg("not SET TRANSACTION READ ONLY")));
3853 
3854 				pool_set_writing_transaction();
3855 			}
3856 		}
3857 
3858 		/*
3859 		 * If the query was CREATE TEMP TABLE, discard
3860 		 * temp table relcache because we might have had
3861 		 * persistent table relation cache which has table
3862 		 * name as the temp table.
3863 		 */
3864 		if (IsA(node, CreateStmt))
3865 		{
3866 			CreateStmt *create_table_stmt = (CreateStmt *)node;
3867 			if (create_table_stmt->relation->relpersistence == 't')
3868 				discard_temp_table_relcache();
3869 		}
3870 	}
3871 }
3872