1 /* -*-pgsql-c-*- */
2 /*
3  * pgpool: a language independent connection pool server for PostgreSQL
4  * written by Tatsuo Ishii
5  *
6  * Copyright (c) 2003-2021	PgPool Global Development Group
7  *
8  * Permission to use, copy, modify, and distribute this software and
9  * its documentation for any purpose and without fee is hereby
10  * granted, provided that the above copyright notice appear in all
11  * copies and that both that copyright notice and this permission
12  * notice appear in supporting documentation, and that the name of the
13  * author not be used in advertising or publicity pertaining to
14  * distribution of the software without specific, written prior
15  * permission. The author makes no representations about the
16  * suitability of this software for any purpose.  It is provided "as
17  * is" without express or implied warranty.
18  *
19  *---------------------------------------------------------------------
20  * pool_proto_modules.c: modules corresponding to message protocols.
21  * used by pool_process_query()
22  *---------------------------------------------------------------------
23  */
24 #include "config.h"
25 #include <errno.h>
26 
27 #ifdef HAVE_SYS_TYPES_H
28 #include <sys/types.h>
29 #endif
30 #ifdef HAVE_SYS_TIME_H
31 #include <sys/time.h>
32 #endif
33 #ifdef HAVE_SYS_SELECT_H
34 #include <sys/select.h>
35 #endif
36 
37 
38 #include <stdlib.h>
39 #include <unistd.h>
40 #include <string.h>
41 #include <netinet/in.h>
42 #include <ctype.h>
43 
44 #include "pool.h"
45 #include "rewrite/pool_timestamp.h"
46 #include "rewrite/pool_lobj.h"
47 #include "protocol/pool_proto_modules.h"
48 #include "protocol/pool_process_query.h"
49 #include "protocol/pool_pg_utils.h"
50 #include "pool_config.h"
51 #include "parser/pool_string.h"
52 #include "context/pool_session_context.h"
53 #include "context/pool_query_context.h"
54 #include "utils/elog.h"
55 #include "utils/pool_select_walker.h"
56 #include "utils/pool_relcache.h"
57 #include "utils/pool_stream.h"
58 #include "utils/ps_status.h"
59 #include "utils/pool_signal.h"
60 #include "utils/pool_ssl.h"
61 #include "utils/palloc.h"
62 #include "utils/memutils.h"
63 #include "query_cache/pool_memqcache.h"
64 #include "main/pool_internal_comms.h"
65 #include "pool_config_variables.h"
66 
67 char	   *copy_table = NULL;	/* copy table name */
68 char	   *copy_schema = NULL; /* copy table name */
69 char		copy_delimiter;		/* copy delimiter char */
70 char	   *copy_null = NULL;	/* copy null string */
71 
72 /*
73  * Non 0 if allow to close internal transaction.  This variable was
74  * introduced on 2008/4/3 not to close an internal transaction when
75  * Sync message is received after receiving Parse message. This hack
76  * is for PHP-PDO.
77  */
78 static int	allow_close_transaction = 1;
79 
80 int			is_select_pgcatalog = 0;
81 int			is_select_for_update = 0;	/* 1 if SELECT INTO or SELECT FOR
82 										 * UPDATE */
83 
84 /*
85  * last query string sent to simpleQuery()
86  */
87 char		query_string_buffer[QUERY_STRING_BUFFER_LEN];
88 
89 static int	check_errors(POOL_CONNECTION_POOL * backend, int backend_id);
90 static void generate_error_message(char *prefix, int specific_error, char *query);
91 static POOL_STATUS parse_before_bind(POOL_CONNECTION * frontend,
92 									 POOL_CONNECTION_POOL * backend,
93 									 POOL_SENT_MESSAGE * message,
94 									 POOL_SENT_MESSAGE * bind_message);
95 static int *find_victim_nodes(int *ntuples, int nmembers, int main_node, int *number_of_nodes);
96 static POOL_STATUS close_standby_transactions(POOL_CONNECTION * frontend,
97 											  POOL_CONNECTION_POOL * backend);
98 
99 static char *flatten_set_variable_args(const char *name, List *args);
100 static bool
101 			process_pg_terminate_backend_func(POOL_QUERY_CONTEXT * query_context);
102 static void pool_discard_except_sync_and_ready_for_query(POOL_CONNECTION * frontend,
103 											 POOL_CONNECTION_POOL * backend);
104 static void si_get_snapshot(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, Node *node, bool tstate_check);
105 
106 /*
107  * This is the workhorse of processing the pg_terminate_backend function to
108  * make sure that the use of function should not trigger the backend node failover.
109  *
110  * The function searches for the pg_terminate_backend() function call in the
111  * query parse tree and if the search comes out to be successful,
112  * the next step is to locate the pgpool-II child process and a backend node
113  * of that connection whose PID is specified in pg_terminate_backend argument.
114  *
115  * Once the connection is identified, we set the swallow_termination flag of
116  * that connection (in shared memory) and also sets the query destination to
117  * the same backend that hosts the connection.
118  *
119  * The function returns true on success, i.e. when the query contains the
120  * pg_terminate_backend call and that call refers to the backend
121  * connection that belongs to pgpool-II.
122  *
123  * Note:  Since upon successful return this function has already
124  * set the destination backend node for the current query,
125  * so for that case pool_where_to_send() should not be called.
126  *
127  */
128 static bool
process_pg_terminate_backend_func(POOL_QUERY_CONTEXT * query_context)129 process_pg_terminate_backend_func(POOL_QUERY_CONTEXT * query_context)
130 {
131 	/*
132 	 * locate pg_terminate_backend and get the pid argument, if
133 	 * pg_terminate_backend is present in the query
134 	 */
135 	int			backend_pid = pool_get_terminate_backend_pid(query_context->parse_tree);
136 
137 	if (backend_pid > 0)
138 	{
139 		int			backend_node = 0;
140 		ConnectionInfo *conn = pool_coninfo_backend_pid(backend_pid, &backend_node);
141 
142 		if (conn == NULL)
143 		{
144 			ereport(LOG,
145 					(errmsg("found the pg_terminate_backend request for backend pid:%d, but the backend connection does not belong to pgpool-II", backend_pid)));
146 
147 			/*
148 			 * we are not able to find the backend connection with the pid so
149 			 * there is not much left for us to do here
150 			 */
151 			return false;
152 		}
153 		ereport(LOG,
154 				(errmsg("found the pg_terminate_backend request for backend pid:%d on backend node:%d", backend_pid, backend_node),
155 				 errdetail("setting the connection flag")));
156 
157 		pool_set_connection_will_be_terminated(conn);
158 
159 		/*
160 		 * It was the pg_terminate_backend call so send the query to
161 		 * appropriate backend
162 		 */
163 		query_context->pg_terminate_backend_conn = conn;
164 		pool_force_query_node_to_backend(query_context, backend_node);
165 		return true;
166 	}
167 	return false;
168 }
169 
170 /*
171  * Process Query('Q') message
172  * Query messages include an SQL string.
173  */
174 POOL_STATUS
SimpleQuery(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend,int len,char * contents)175 SimpleQuery(POOL_CONNECTION * frontend,
176 			POOL_CONNECTION_POOL * backend, int len, char *contents)
177 {
178 	static char *sq_config = "pool_status";
179 	static char *sq_pools = "pool_pools";
180 	static char *sq_processes = "pool_processes";
181 	static char *sq_nodes = "pool_nodes";
182 	static char *sq_version = "pool_version";
183 	static char *sq_cache = "pool_cache";
184 	static char *sq_health_check_stats = "pool_health_check_stats";
185 	static char *sq_backend_stats = "pool_backend_stats";
186 	int			commit;
187 	List	   *parse_tree_list;
188 	Node	   *node = NULL;
189 	POOL_STATUS status;
190 	int			lock_kind;
191 	bool		is_likely_select = false;
192 	int			specific_error = 0;
193 
194 	POOL_SESSION_CONTEXT *session_context;
195 	POOL_QUERY_CONTEXT *query_context;
196 
197 	bool		error;
198 
199 	/* Get session context */
200 	session_context = pool_get_session_context(false);
201 
202 	/* save last query string for logging purpose */
203 	strlcpy(query_string_buffer, contents, sizeof(query_string_buffer));
204 
205 	/* show ps status */
206 	query_ps_status(contents, backend);
207 
208 	/* log query to log file if necessary */
209 	if (pool_config->log_statement)
210 		ereport(LOG, (errmsg("statement: %s", contents)));
211 
212 	/*
213 	 * Fetch memory cache if possible
214 	 */
215 	if (pool_config->memory_cache_enabled)
216 	    is_likely_select = pool_is_likely_select(contents);
217 
218 	/*
219 	 * If memory query cache enabled and the query seems to be a SELECT use
220 	 * query cache if possible. However if we are in an explicit transaction
221 	 * and we had writing query before, we should not use query cache. This
222 	 * means that even the writing query is not anything related to the table
223 	 * which is used the SELECT, we do not use cache. Of course we could
224 	 * analyze the SELECT to see if it uses the table modified in the
225 	 * transaction, but it will need parsing query and accessing to system
226 	 * catalog, which will add significant overhead. Moreover if we are in
227 	 * aborted transaction, commands should be ignored, so we should not use
228 	 * query cache.
229 	 */
230 	if (pool_config->memory_cache_enabled && is_likely_select &&
231 		!pool_is_writing_transaction() &&
232 		TSTATE(backend, MAIN_REPLICA ? PRIMARY_NODE_ID : REAL_MAIN_NODE_ID) != 'E')
233 	{
234 		bool		foundp;
235 
236 		/*
237 		 * If the query is SELECT from table to cache, try to fetch cached
238 		 * result.
239 		 */
240 		status = pool_fetch_from_memory_cache(frontend, backend, contents, &foundp);
241 
242 		if (status != POOL_CONTINUE)
243 			return status;
244 
245 		if (foundp)
246 		{
247 			pool_ps_idle_display(backend);
248 			pool_set_skip_reading_from_backends();
249 			pool_stats_count_up_num_cache_hits();
250 			return POOL_CONTINUE;
251 		}
252 	}
253 
254 	/* Create query context */
255 	query_context = pool_init_query_context();
256 	MemoryContext old_context = MemoryContextSwitchTo(query_context->memory_context);
257 
258 	/* parse SQL string */
259 	parse_tree_list = raw_parser(contents, RAW_PARSE_DEFAULT, len, &error, !REPLICATION);
260 
261 	if (parse_tree_list == NIL)
262 	{
263 		/* is the query empty? */
264 		if (*contents == '\0' || *contents == ';' || error == false)
265 		{
266 			/*
267 			 * JBoss sends empty queries for checking connections. We replace
268 			 * the empty query with SELECT command not to affect load balance.
269 			 * [Pgpool-general] Confused about JDBC and load balancing
270 			 */
271 			parse_tree_list = get_dummy_read_query_tree();
272 		}
273 		else
274 		{
275 			/*
276 			 * Unable to parse the query. Probably syntax error or the query
277 			 * is too new and our parser cannot understand. Treat as if it
278 			 * were an DELETE command. Note that the DELETE command does not
279 			 * execute, instead the original query will be sent to backends,
280 			 * which may or may not cause an actual syntax errors. The command
281 			 * will be sent to all backends in replication mode or
282 			 * primary in native replication mode.
283 			 */
284 			if (!strcmp(remote_host, "[local]"))
285 			{
286 				ereport(LOG,
287 						(errmsg("Unable to parse the query: \"%s\" from local client", contents)));
288 			}
289 			else
290 			{
291 				ereport(LOG,
292 						(errmsg("Unable to parse the query: \"%s\" from client %s(%s)", contents, remote_host, remote_port)));
293 			}
294 			parse_tree_list = get_dummy_write_query_tree();
295 			query_context->is_parse_error = true;
296 		}
297 	}
298 	MemoryContextSwitchTo(old_context);
299 
300 	if (parse_tree_list != NIL)
301 	{
302 		node = raw_parser2(parse_tree_list);
303 
304 		/*
305 		 * Start query context
306 		 */
307 		pool_start_query(query_context, contents, len, node);
308 
309 		/*
310 		 * Create PostgreSQL version cache.  Since the provided query might
311 		 * cause a syntax error, we want to issue "SELECT version()" which is
312 		 * called inside Pgversion() here.
313 		 */
314 		Pgversion(backend);
315 
316 		/*
317 		 * If the query is DROP DATABASE, after executing it, cache files
318 		 * directory must be discarded. So we have to get the DB's oid before
319 		 * it will be DROPped.
320 		 */
321 		if (pool_config->memory_cache_enabled && is_drop_database(node))
322 		{
323 			DropdbStmt *stmt = (DropdbStmt *) node;
324 
325 			query_context->dboid = pool_get_database_oid_from_dbname(stmt->dbname);
326 			if (query_context->dboid != 0)
327 			{
328 				ereport(DEBUG1,
329 						(errmsg("DB's oid to discard its cache directory: dboid = %d", query_context->dboid)));
330 			}
331 		}
332 
333 		/*
334 		 * Check if multi statement query
335 		 */
336 		if (parse_tree_list && list_length(parse_tree_list) > 1)
337 		{
338 			query_context->is_multi_statement = true;
339 		}
340 		else
341 		{
342 			query_context->is_multi_statement = false;
343 		}
344 
345 		/*
346 		 * check COPY FROM STDIN if true, set copy_* variable
347 		 */
348 		check_copy_from_stdin(node);
349 
350 		if (IsA(node, PgpoolVariableShowStmt))
351 		{
352 			VariableShowStmt *vnode = (VariableShowStmt *) node;
353 
354 			report_config_variable(frontend, backend, vnode->name);
355 
356 			pool_ps_idle_display(backend);
357 			pool_query_context_destroy(query_context);
358 			pool_set_skip_reading_from_backends();
359 			return POOL_CONTINUE;
360 		}
361 
362 		if (IsA(node, PgpoolVariableSetStmt))
363 		{
364 			VariableSetStmt *vnode = (VariableSetStmt *) node;
365 			const char *value = NULL;
366 
367 			if (vnode->kind == VAR_SET_VALUE)
368 			{
369 				value = flatten_set_variable_args("name", vnode->args);
370 			}
371 			if (vnode->kind == VAR_RESET_ALL)
372 			{
373 				reset_all_variables(frontend, backend);
374 			}
375 			else
376 			{
377 				set_config_option_for_session(frontend, backend, vnode->name, value);
378 			}
379 
380 			pool_ps_idle_display(backend);
381 			pool_query_context_destroy(query_context);
382 			pool_set_skip_reading_from_backends();
383 			return POOL_CONTINUE;
384 		}
385 
386 		/* status reporting? */
387 		if (IsA(node, VariableShowStmt))
388 		{
389 			bool		is_valid_show_command = false;
390 			VariableShowStmt *vnode = (VariableShowStmt *) node;
391 
392 			if (!strcmp(sq_config, vnode->name))
393 			{
394 				is_valid_show_command = true;
395 				ereport(DEBUG1,
396 						(errmsg("SimpleQuery"),
397 						 errdetail("config reporting")));
398 				config_reporting(frontend, backend);
399 			}
400 			else if (!strcmp(sq_pools, vnode->name))
401 			{
402 				is_valid_show_command = true;
403 				ereport(DEBUG1,
404 						(errmsg("SimpleQuery"),
405 						 errdetail("pools reporting")));
406 
407 				pools_reporting(frontend, backend);
408 			}
409 			else if (!strcmp(sq_processes, vnode->name))
410 			{
411 				is_valid_show_command = true;
412 				ereport(DEBUG1,
413 						(errmsg("SimpleQuery"),
414 						 errdetail("processes reporting")));
415 				processes_reporting(frontend, backend);
416 			}
417 			else if (!strcmp(sq_nodes, vnode->name))
418 			{
419 				is_valid_show_command = true;
420 				ereport(DEBUG1,
421 						(errmsg("SimpleQuery"),
422 						 errdetail("nodes reporting")));
423 				nodes_reporting(frontend, backend);
424 			}
425 			else if (!strcmp(sq_version, vnode->name))
426 			{
427 				is_valid_show_command = true;
428 				ereport(DEBUG1,
429 						(errmsg("SimpleQuery"),
430 						 errdetail("version reporting")));
431 				version_reporting(frontend, backend);
432 			}
433 			else if (!strcmp(sq_cache, vnode->name))
434 			{
435 				is_valid_show_command = true;
436 				ereport(DEBUG1,
437 						(errmsg("SimpleQuery"),
438 						 errdetail("cache reporting")));
439 				cache_reporting(frontend, backend);
440 			}
441 			else if (!strcmp(sq_health_check_stats, vnode->name))
442 			{
443 				is_valid_show_command = true;
444 				ereport(DEBUG1,
445 						(errmsg("SimpleQuery"),
446 						 errdetail("health check stats")));
447 				show_health_check_stats(frontend, backend);
448 			}
449 
450 			else if (!strcmp(sq_backend_stats, vnode->name))
451 			{
452 				is_valid_show_command = true;
453 				ereport(DEBUG1,
454 						(errmsg("SimpleQuery"),
455 						 errdetail("backend stats")));
456 				show_backend_stats(frontend, backend);
457 			}
458 
459 			if (is_valid_show_command)
460 			{
461 				pool_ps_idle_display(backend);
462 				pool_query_context_destroy(query_context);
463 				pool_set_skip_reading_from_backends();
464 				return POOL_CONTINUE;
465 			}
466 		}
467 
468 		/*
469 		 * If the table is to be cached, set is_cache_safe TRUE and register
470 		 * table oids.
471 		 */
472 		if (pool_config->memory_cache_enabled && query_context->is_multi_statement == false)
473 		{
474 			bool		is_select_query = false;
475 			int			num_oids;
476 			int		   *oids;
477 			int			i;
478 
479 			/* Check if the query is actually SELECT */
480 			if (is_likely_select && IsA(node, SelectStmt))
481 			{
482 				is_select_query = true;
483 			}
484 
485 			/* Switch the flag of is_cache_safe in session_context */
486 			if (is_select_query && !query_context->is_parse_error &&
487 				pool_is_allow_to_cache(query_context->parse_tree,
488 									   query_context->original_query))
489 			{
490 				pool_set_cache_safe();
491 			}
492 			else
493 			{
494 				pool_unset_cache_safe();
495 			}
496 
497 			/*
498 			 * If table is to be cached and the query is DML, save the table
499 			 * oid
500 			 */
501 			if (!query_context->is_parse_error)
502 			{
503 				num_oids = pool_extract_table_oids(node, &oids);
504 
505 				if (num_oids > 0)
506 				{
507 					/* Save to oid buffer */
508 					for (i = 0; i < num_oids; i++)
509 					{
510 						pool_add_dml_table_oid(oids[i]);
511 					}
512 				}
513 			}
514 		}
515 
516 		/*
517 		 * If we are in SI mode, start an internal transaction if needed.
518 		 */
519 		if (pool_config->backend_clustering_mode == CM_SNAPSHOT_ISOLATION)
520 		{
521 			start_internal_transaction(frontend, backend, node);
522 		}
523 
524 		/*
525 		 * From now on it is possible that query is actually sent to backend.
526 		 * So we need to acquire snapshot while there's no committing backend
527 		 * in snapshot isolation mode except while processing reset queries.
528 		 * For this purpose, we send a query to know whether the transaction
529 		 * is READ ONLY or not.  Sending actual user's query is not possible
530 		 * because it might cause rw-conflict, which in turn causes a
531 		 * deadlock.
532 		 */
533 		si_get_snapshot(frontend, backend, node,
534 						!INTERNAL_TRANSACTION_STARTED(backend, MAIN_NODE_ID));
535 
536 		/*
537 		 * pg_terminate function needs special handling, process it if the
538 		 * query contains one, otherwise use pool_where_to_send() to decide
539 		 * destination backend node for the query
540 		 */
541 		if (process_pg_terminate_backend_func(query_context) == false)
542 		{
543 			/*
544 			 * Decide where to send query
545 			 */
546 			pool_where_to_send(query_context, query_context->original_query,
547 							   query_context->parse_tree);
548 		}
549 
550 		/*
551 		 * if this is DROP DATABASE command, send USR1 signal to parent and
552 		 * ask it to close all idle connections. XXX This is overkill. It
553 		 * would be better to close the idle connection for the database which
554 		 * DROP DATABASE command tries to drop. This is impossible at this
555 		 * point, since we have no way to pass such info to other processes.
556 		 */
557 		if (is_drop_database(node))
558 		{
559 			struct timeval stime;
560 
561 			stime.tv_usec = 0;
562 			stime.tv_sec = 5;	/* XXX give arbitrary time to allow
563 									 * closing idle connections */
564 
565 			ereport(DEBUG1,
566 					(errmsg("Query: sending SIGUSR1 signal to parent")));
567 
568 			ignore_sigusr1 = 1;	/* disable SIGUSR1 handler */
569 			close_idle_connections();
570 
571 			/*
572 			 * We need to loop over here since we might get some signals while
573 			 * sleeping
574 			 */
575 			for (;;)
576 			{
577 				int sts;
578 
579 				errno = 0;
580 				sts = select(0, NULL, NULL, NULL, &stime);
581 				if (stime.tv_usec == 0 && stime.tv_sec == 0)
582 					break;
583 				if (sts != 0 && errno != EINTR)
584 				{
585 					elog(DEBUG1, "select(2) returns error: %s", strerror(errno));
586 					break;
587 				}
588 			}
589 
590 			ignore_sigusr1 = 0;
591 		}
592 
593 		/*---------------------------------------------------------------------------
594 		 * determine if we need to lock the table
595 		 * to keep SERIAL data consistency among servers
596 		 * conditions:
597 		 * - replication is enabled
598 		 * - protocol is V3
599 		 * - statement is INSERT
600 		 * - either "INSERT LOCK" comment exists or insert_lock directive specified
601 		 *---------------------------------------------------------------------------
602 		 */
603 		if (!RAW_MODE && !SL_MODE)
604 		{
605 			/*
606 			 * If there's only one node to send the command, there's no point
607 			 * to start a transaction.
608 			 */
609 			if (pool_multi_node_to_be_sent(query_context))
610 			{
611 				/* start a transaction if needed */
612 				start_internal_transaction(frontend, backend, (Node *) node);
613 
614 				/* check if need lock */
615 				lock_kind = need_insert_lock(backend, contents, node);
616 				if (lock_kind)
617 				{
618 					/* if so, issue lock command */
619 					status = insert_lock(frontend, backend, contents, (InsertStmt *) node, lock_kind);
620 					if (status != POOL_CONTINUE)
621 					{
622 						pool_query_context_destroy(query_context);
623 						return status;
624 					}
625 				}
626 			}
627 		}
628 	}
629 
630 	if (MAJOR(backend) == PROTO_MAJOR_V2 && is_start_transaction_query(node))
631 	{
632 		int			i;
633 
634 		for (i = 0; i < NUM_BACKENDS; i++)
635 		{
636 			if (VALID_BACKEND(i))
637 				TSTATE(backend, i) = 'T';
638 		}
639 	}
640 
641 	if (node)
642 	{
643 		POOL_SENT_MESSAGE *msg = NULL;
644 
645 		if (IsA(node, PrepareStmt))
646 		{
647 			msg = pool_create_sent_message('Q', len, contents, 0,
648 										   ((PrepareStmt *) node)->name,
649 										   query_context);
650 			session_context->uncompleted_message = msg;
651 		}
652 		else
653 			session_context->uncompleted_message = NULL;
654 	}
655 
656 
657 	if (!RAW_MODE)
658 	{
659 		/* check if query is "COMMIT" or "ROLLBACK" */
660 		commit = is_commit_or_rollback_query(node);
661 
662 		/*
663 		 * Query is not commit/rollback
664 		 */
665 		if (!commit)
666 		{
667 			char	   *rewrite_query = NULL;
668 
669 			if (node)
670 			{
671 				POOL_SENT_MESSAGE *msg = NULL;
672 
673 				if (IsA(node, PrepareStmt))
674 				{
675 					msg = session_context->uncompleted_message;
676 				}
677 				else if (IsA(node, ExecuteStmt))
678 				{
679 					msg = pool_get_sent_message('Q', ((ExecuteStmt *) node)->name, POOL_SENT_MESSAGE_CREATED);
680 					if (!msg)
681 						msg = pool_get_sent_message('P', ((ExecuteStmt *) node)->name, POOL_SENT_MESSAGE_CREATED);
682 				}
683 
684 				/* rewrite `now()' to timestamp literal */
685 				if (!is_select_query(node, query_context->original_query) ||
686 					pool_has_function_call(node) || pool_config->replicate_select)
687 					rewrite_query = rewrite_timestamp(backend, query_context->parse_tree, false, msg);
688 
689 				/*
690 				 * If the query is BEGIN READ WRITE or BEGIN ... SERIALIZABLE
691 				 * in native replication mode, we send BEGIN to standbys
692 				 * instead. original_query which is BEGIN READ WRITE is sent
693 				 * to primary. rewritten_query which is BEGIN is sent to
694 				 * standbys.
695 				 */
696 				if (pool_need_to_treat_as_if_default_transaction(query_context))
697 				{
698 					rewrite_query = pstrdup("BEGIN");
699 				}
700 
701 				if (rewrite_query != NULL)
702 				{
703 					query_context->rewritten_query = rewrite_query;
704 					query_context->rewritten_length = strlen(rewrite_query) + 1;
705 				}
706 			}
707 
708 			/*
709 			 * Optimization effort: If there's only one session, we do not
710 			 * need to wait for the main node's response, and could execute
711 			 * the query concurrently.  In snapshot isolation mode we cannot
712 			 * do this optimization because we need to wait for main node's
713 			 * response first.
714 			 */
715 			if (pool_config->num_init_children == 1 &&
716 				pool_config->backend_clustering_mode != CM_SNAPSHOT_ISOLATION)
717 			{
718 				/* Send query to all DB nodes at once */
719 				status = pool_send_and_wait(query_context, 0, 0);
720 				/* free_parser(); */
721 				return status;
722 			}
723 
724 			/* Send the query to main node */
725 			pool_send_and_wait(query_context, 1, MAIN_NODE_ID);
726 
727 			/* Check specific errors */
728 			specific_error = check_errors(backend, MAIN_NODE_ID);
729 			if (specific_error)
730 			{
731 				/* log error message */
732 				generate_error_message("SimpleQuery: ", specific_error, contents);
733 			}
734 		}
735 
736 		if (specific_error)
737 		{
738 			char		msg[1024] = POOL_ERROR_QUERY;	/* large enough */
739 			int			len = strlen(msg) + 1;
740 
741 			memset(msg + len, 0, sizeof(int));
742 
743 			/* send query to other nodes */
744 			query_context->rewritten_query = msg;
745 			query_context->rewritten_length = len;
746 			pool_send_and_wait(query_context, -1, MAIN_NODE_ID);
747 		}
748 		else
749 		{
750 			/*
751 			 * If commit command and Snapshot Isolation mode, wait for until
752 			 * snapshot prepared.
753 			 */
754 			if (commit && pool_config->backend_clustering_mode == CM_SNAPSHOT_ISOLATION)
755 			{
756 				si_commit_request();
757 			}
758 
759 			/*
760 			 * Send the query to other than main node.
761 			 */
762 			pool_send_and_wait(query_context, -1, MAIN_NODE_ID);
763 
764 		}
765 
766 		/*
767 		 * Send "COMMIT" or "ROLLBACK" to only main node if query is
768 		 * "COMMIT" or "ROLLBACK"
769 		 */
770 		if (commit)
771 		{
772 			pool_send_and_wait(query_context, 1, MAIN_NODE_ID);
773 
774 			/*
775 			 * If we are in the snapshot isolation mode, we need to declare
776 			 * that commit has been done. This would wake up other children
777 			 * waiting for acquiring snapshot.
778 			 */
779 			if (pool_config->backend_clustering_mode == CM_SNAPSHOT_ISOLATION)
780 			{
781 				si_commit_done();
782 				session_context->transaction_read_only = false;
783 			}
784 		}
785 
786 	}
787 	else
788 	{
789 		pool_send_and_wait(query_context, 1, MAIN_NODE_ID);
790 	}
791 
792 	return POOL_CONTINUE;
793 }
794 
795 /*
796  * process EXECUTE (V3 only)
797  */
798 POOL_STATUS
Execute(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend,int len,char * contents)799 Execute(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend,
800 		int len, char *contents)
801 {
802 	int			commit = 0;
803 	char	   *query = NULL;
804 	Node	   *node;
805 	int			specific_error = 0;
806 	POOL_SESSION_CONTEXT *session_context;
807 	POOL_QUERY_CONTEXT *query_context;
808 	POOL_SENT_MESSAGE *bind_msg;
809 	bool		foundp = false;
810 
811 	/* Get session context */
812 	session_context = pool_get_session_context(false);
813 
814 	if (pool_config->log_client_messages)
815 		ereport(LOG,
816 				(errmsg("Execute message from frontend."),
817 				 errdetail("portal: \"%s\"", contents)));
818 	ereport(DEBUG2,
819 			(errmsg("Execute: portal name <%s>", contents)));
820 
821 	bind_msg = pool_get_sent_message('B', contents, POOL_SENT_MESSAGE_CREATED);
822 	if (!bind_msg)
823 		ereport(FATAL,
824 				(return_code(2),
825 				 errmsg("unable to Execute"),
826 				 errdetail("unable to get bind message")));
827 
828 	if (!bind_msg->query_context)
829 		ereport(FATAL,
830 				(return_code(2),
831 				 errmsg("unable to Execute"),
832 				 errdetail("unable to get query context")));
833 
834 	if (!bind_msg->query_context->original_query)
835 		ereport(FATAL,
836 				(return_code(2),
837 				 errmsg("unable to Execute"),
838 				 errdetail("unable to get original query")));
839 
840 	if (!bind_msg->query_context->parse_tree)
841 		ereport(FATAL,
842 				(return_code(2),
843 				 errmsg("unable to Execute"),
844 				 errdetail("unable to get parse tree")));
845 
846 	query_context = bind_msg->query_context;
847 	node = bind_msg->query_context->parse_tree;
848 	query = bind_msg->query_context->original_query;
849 
850 	strlcpy(query_string_buffer, query, sizeof(query_string_buffer));
851 
852 	ereport(DEBUG2, (errmsg("Execute: query string = <%s>", query)));
853 
854 	ereport(DEBUG1, (errmsg("Execute: pool_is_writing_transaction: %d TSTATE: %c",
855 							pool_is_writing_transaction(),
856 							TSTATE(backend, MAIN_REPLICA ? PRIMARY_NODE_ID : REAL_MAIN_NODE_ID))));
857 
858 	/* log query to log file if necessary */
859 	if (pool_config->log_statement)
860 		ereport(LOG, (errmsg("statement: %s", query)));
861 
862 	/*
863 	 * Fetch memory cache if possible
864 	 */
865 	if (pool_config->memory_cache_enabled && !pool_is_writing_transaction() &&
866 		(TSTATE(backend, MAIN_REPLICA ? PRIMARY_NODE_ID : REAL_MAIN_NODE_ID) != 'E')
867 		&& pool_is_likely_select(query))
868 	{
869 		POOL_STATUS status;
870 		char	   *search_query = NULL;
871 		int			len;
872 
873 #define STR_ALLOC_SIZE 1024
874 		ereport(DEBUG1, (errmsg("Execute: pool_is_likely_select: true pool_is_writing_transaction: %d TSTATE: %c",
875 								pool_is_writing_transaction(),
876 								TSTATE(backend, MAIN_REPLICA ? PRIMARY_NODE_ID : REAL_MAIN_NODE_ID))));
877 
878 		len = strlen(query) + 1;
879 		search_query = MemoryContextStrdup(query_context->memory_context, query);
880 
881 		ereport(DEBUG1, (errmsg("Execute: checking cache fetch condition")));
882 
883 		/*
884 		 * Add bind message's info to query to search.
885 		 */
886 		if (query_context->is_cache_safe && bind_msg->param_offset && bind_msg->contents)
887 		{
888 			/* Extract binary contents from bind message */
889 			char	   *query_in_bind_msg = bind_msg->contents + bind_msg->param_offset;
890 			char		hex_str[4]; /* 02X chars + white space + null end */
891 			int			i;
892 			int			alloc_len;
893 
894 			alloc_len = (len / STR_ALLOC_SIZE + 1) * STR_ALLOC_SIZE;
895 			search_query = repalloc(search_query, alloc_len);
896 
897 			for (i = 0; i < bind_msg->len - bind_msg->param_offset; i++)
898 			{
899 				int			hexlen;
900 
901 				snprintf(hex_str, sizeof(hex_str), (i == 0) ? " %02X" : "%02X", 0xff & query_in_bind_msg[i]);
902 				hexlen = strlen(hex_str);
903 
904 				if ((len + hexlen) >= alloc_len)
905 				{
906 					alloc_len += STR_ALLOC_SIZE;
907 					search_query = repalloc(search_query, alloc_len);
908 				}
909 				strcat(search_query, hex_str);
910 				len += hexlen;
911 			}
912 
913 			/*
914 			 * If bind message is sent again to an existing prepared statement,
915 			 * it is possible that query_w_hex remains. Before setting newly
916 			 * allocated query_w_hex's pointer to the query context, free the
917 			 * previously allocated memory.
918 			 */
919 			if (query_context->query_w_hex)
920 			{
921 				pfree(query_context->query_w_hex);
922 			}
923 			query_context->query_w_hex = search_query;
924 
925 			/*
926 			 * When a transaction is committed,
927 			 * query_context->temp_cache->query is used to create md5 hash to
928 			 * search for query cache. So overwrite the query text in temp
929 			 * cache to the one with the hex of bind message. If not, md5 hash
930 			 * will be created by the query text without bind message, and it
931 			 * will happen to find cache never or to get a wrong result.
932 			 *
933 			 * However, It is possible that temp_cache does not exist.
934 			 * Consider following scenario: - In the previous execute cache is
935 			 * overflowed, and temp_cache discarded. - In the subsequent
936 			 * bind/execute uses the same portal
937 			 */
938 			if (query_context->temp_cache)
939 			{
940 				pfree(query_context->temp_cache->query);
941 				query_context->temp_cache->query = MemoryContextStrdup(session_context->memory_context, search_query);
942 			}
943 		}
944 
945 		/*
946 		 * If the query is SELECT from table to cache, try to fetch cached
947 		 * result.
948 		 */
949 		status = pool_fetch_from_memory_cache(frontend, backend, search_query, &foundp);
950 
951 		if (status != POOL_CONTINUE)
952 			return status;
953 
954 		if (foundp)
955 		{
956 #ifdef DEBUG
957 			extern bool stop_now;
958 #endif
959 			pool_stats_count_up_num_cache_hits();
960 			query_context->skip_cache_commit = true;
961 #ifdef DEBUG
962 			stop_now = true;
963 #endif
964 
965 			if (!SL_MODE || !pool_is_doing_extended_query_message())
966 			{
967 				pool_set_skip_reading_from_backends();
968 				pool_stats_count_up_num_cache_hits();
969 				pool_unset_query_in_progress();
970 				return POOL_CONTINUE;
971 			}
972 		}
973 		else
974 			query_context->skip_cache_commit = false;
975 	}
976 
977 	/* show ps status */
978 	query_ps_status(query, backend);
979 
980 	session_context->query_context = query_context;
981 
982 	/*
983 	 * Calling pool_where_to_send here is dangerous because the node
984 	 * parse/bind has been sent could be change by pool_where_to_send() and it
985 	 * leads to "portal not found" etc. errors.
986 	 */
987 
988 	/* check if query is "COMMIT" or "ROLLBACK" */
989 	commit = is_commit_or_rollback_query(node);
990 
991 	if (!SL_MODE)
992 	{
993 		/*
994 		 * Query is not commit/rollback
995 		 */
996 		if (!commit)
997 		{
998 			/* Send the query to main node */
999 			pool_extended_send_and_wait(query_context, "E", len, contents, 1, MAIN_NODE_ID, false);
1000 
1001 			/* Check specific errors */
1002 			specific_error = check_errors(backend, MAIN_NODE_ID);
1003 			if (specific_error)
1004 			{
1005 				/* log error message */
1006 				generate_error_message("Execute: ", specific_error, contents);
1007 			}
1008 		}
1009 
1010 		if (specific_error)
1011 		{
1012 			char		msg[1024] = "pgpool_error_portal";	/* large enough */
1013 			int			len = strlen(msg);
1014 
1015 			memset(msg + len, 0, sizeof(int));
1016 
1017 			/* send query to other nodes */
1018 			pool_extended_send_and_wait(query_context, "E", len, msg, -1, MAIN_NODE_ID, false);
1019 		}
1020 		else
1021 		{
1022 			/*
1023 			 * If commit command and Snapshot Isolation mode, wait for until
1024 			 * snapshot prepared.
1025 			 */
1026 			if (commit && pool_config->backend_clustering_mode == CM_SNAPSHOT_ISOLATION)
1027 			{
1028 				si_commit_request();
1029 			}
1030 
1031 			pool_extended_send_and_wait(query_context, "E", len, contents, -1, MAIN_NODE_ID, false);
1032 		}
1033 
1034 		/*
1035 		 * send "COMMIT" or "ROLLBACK" to only main node if query is
1036 		 * "COMMIT" or "ROLLBACK"
1037 		 */
1038 		if (commit)
1039 		{
1040 			pool_extended_send_and_wait(query_context, "E", len, contents, 1, MAIN_NODE_ID, false);
1041 
1042 			/*
1043 			 * If we are in the snapshot isolation mode, we need to declare
1044 			 * that commit has been done. This would wake up other children
1045 			 * waiting for acquiring snapshot.
1046 			 */
1047 			if (pool_config->backend_clustering_mode == CM_SNAPSHOT_ISOLATION)
1048 			{
1049 				si_commit_done();
1050 				session_context->transaction_read_only = false;
1051 			}
1052 		}
1053 	}
1054 	else						/* streaming replication mode */
1055 	{
1056 		POOL_PENDING_MESSAGE *pmsg;
1057 
1058 		if (!foundp)
1059 		{
1060 			pool_extended_send_and_wait(query_context, "E", len, contents, 1, MAIN_NODE_ID, true);
1061 			pool_extended_send_and_wait(query_context, "E", len, contents, -1, MAIN_NODE_ID, true);
1062 		}
1063 
1064 		/* Add pending message */
1065 		pmsg = pool_pending_message_create('E', len, contents);
1066 		pool_pending_message_dest_set(pmsg, query_context);
1067 		pool_pending_message_query_set(pmsg, query_context);
1068 		pool_pending_message_add(pmsg);
1069 		pool_pending_message_free_pending_message(pmsg);
1070 
1071 		/* Various take care at the transaction start */
1072 		handle_query_context(backend);
1073 
1074 		/*
1075 		 * Take care of "writing transaction" flag.
1076 		 */
1077 		if ((!is_select_query(node, query) || pool_has_function_call(node)) &&
1078 			 !is_start_transaction_query(node) &&
1079 			 !is_commit_or_rollback_query(node))
1080 		{
1081 			ereport(DEBUG1,
1082 					(errmsg("Execute: TSTATE:%c",
1083 							TSTATE(backend, MAIN_REPLICA ? PRIMARY_NODE_ID : REAL_MAIN_NODE_ID))));
1084 
1085 			/*
1086 			 * If the query was not READ SELECT, and we are in an explicit
1087 			 * transaction, remember that we had a write query in this
1088 			 * transaction.
1089 			 */
1090 			if (TSTATE(backend, MAIN_REPLICA ? PRIMARY_NODE_ID : REAL_MAIN_NODE_ID) == 'T' ||
1091 				pool_config->disable_load_balance_on_write == DLBOW_ALWAYS)
1092 			{
1093 				/*
1094 				 * However, if the query is "SET TRANSACTION READ ONLY" or its
1095 				 * variant, don't set it.
1096 				 */
1097 				if (!pool_is_transaction_read_only(node))
1098 				{
1099 					pool_set_writing_transaction();
1100 				}
1101 			}
1102 		}
1103 		pool_unset_query_in_progress();
1104 	}
1105 
1106 	return POOL_CONTINUE;
1107 }
1108 
1109 /*
1110  * process Parse (V3 only)
1111  */
1112 POOL_STATUS
Parse(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend,int len,char * contents)1113 Parse(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend,
1114 	  int len, char *contents)
1115 {
1116 	int			deadlock_detected = 0;
1117 	int			insert_stmt_with_lock = 0;
1118 	char	   *name;
1119 	char	   *stmt;
1120 	List	   *parse_tree_list;
1121 	Node	   *node = NULL;
1122 	POOL_SENT_MESSAGE *msg;
1123 	POOL_STATUS status;
1124 	POOL_SESSION_CONTEXT *session_context;
1125 	POOL_QUERY_CONTEXT *query_context;
1126 
1127 	bool		error;
1128 
1129 	/* Get session context */
1130 	session_context = pool_get_session_context(false);
1131 
1132 	/* Create query context */
1133 	query_context = pool_init_query_context();
1134 
1135 	ereport(DEBUG1,
1136 			(errmsg("Parse: statement name <%s>", contents)));
1137 
1138 	name = contents;
1139 	stmt = contents + strlen(contents) + 1;
1140 
1141 	if (pool_config->log_client_messages)
1142 		ereport(LOG,
1143 				(errmsg("Parse message from frontend."),
1144 				 errdetail("statement: \"%s\", query: \"%s\"", name, stmt)));
1145 
1146 	/* parse SQL string */
1147 	MemoryContext old_context = MemoryContextSwitchTo(query_context->memory_context);
1148 
1149 	parse_tree_list = raw_parser(stmt, RAW_PARSE_DEFAULT, strlen(stmt),&error,!REPLICATION);
1150 
1151 	if (parse_tree_list == NIL)
1152 	{
1153 		/* is the query empty? */
1154 		if (*stmt == '\0' || *stmt == ';' || error == false)
1155 		{
1156 			/*
1157 			 * JBoss sends empty queries for checking connections. We replace
1158 			 * the empty query with SELECT command not to affect load balance.
1159 			 * [Pgpool-general] Confused about JDBC and load balancing
1160 			 */
1161 			parse_tree_list = get_dummy_read_query_tree();
1162 		}
1163 		else
1164 		{
1165 			/*
1166 			 * Unable to parse the query. Probably syntax error or the query
1167 			 * is too new and our parser cannot understand. Treat as if it
1168 			 * were an DELETE command. Note that the DELETE command does not
1169 			 * execute, instead the original query will be sent to backends,
1170 			 * which may or may not cause an actual syntax errors. The command
1171 			 * will be sent to all backends in replication mode or
1172 			 * primary in native replication mode.
1173 			 */
1174 			if (!strcmp(remote_host, "[local]"))
1175 			{
1176 				ereport(LOG,
1177 						(errmsg("Unable to parse the query: \"%s\" from local client", stmt)));
1178 			}
1179 			else
1180 			{
1181 				ereport(LOG,
1182 						(errmsg("Unable to parse the query: \"%s\" from client %s(%s)", stmt, remote_host, remote_port)));
1183 			}
1184 			parse_tree_list = get_dummy_write_query_tree();
1185 			query_context->is_parse_error = true;
1186 		}
1187 	}
1188 	MemoryContextSwitchTo(old_context);
1189 
1190 	if (parse_tree_list != NIL)
1191 	{
1192 		/* Save last query string for logging purpose */
1193 		snprintf(query_string_buffer, sizeof(query_string_buffer), "Parse: %s", stmt);
1194 
1195 		node = raw_parser2(parse_tree_list);
1196 
1197 		/*
1198 		 * If replication mode, check to see what kind of insert lock is
1199 		 * necessary.
1200 		 */
1201 		if (REPLICATION)
1202 			insert_stmt_with_lock = need_insert_lock(backend, stmt, node);
1203 
1204 		/*
1205 		 * Start query context
1206 		 */
1207 		pool_start_query(query_context, stmt, strlen(stmt) + 1, node);
1208 
1209 		/*
1210 		 * Create PostgreSQL version cache.  Since the provided query might
1211 		 * cause a syntax error, we want to issue "SELECT version()" which is
1212 		 * called inside Pgversion() here.
1213 		 */
1214 		Pgversion(backend);
1215 
1216 		msg = pool_create_sent_message('P', len, contents, 0, name, query_context);
1217 
1218 		session_context->uncompleted_message = msg;
1219 
1220 		/*
1221 		 * If the table is to be cached, set is_cache_safe TRUE and register
1222 		 * table oids.
1223 		 */
1224 		if (pool_config->memory_cache_enabled)
1225 		{
1226 			bool		is_likely_select = false;
1227 			bool		is_select_query = false;
1228 			int			num_oids;
1229 			int		   *oids;
1230 			int			i;
1231 
1232 			/* Check if the query is actually SELECT */
1233 			is_likely_select = pool_is_likely_select(query_context->original_query);
1234 			if (is_likely_select && IsA(node, SelectStmt))
1235 			{
1236 				is_select_query = true;
1237 			}
1238 
1239 			/* Switch the flag of is_cache_safe in session_context */
1240 			if (is_select_query && !query_context->is_parse_error &&
1241 				pool_is_allow_to_cache(query_context->parse_tree,
1242 									   query_context->original_query))
1243 			{
1244 				pool_set_cache_safe();
1245 			}
1246 			else
1247 			{
1248 				pool_unset_cache_safe();
1249 			}
1250 
1251 			/*
1252 			 * If table is to be cached and the query is DML, save the table
1253 			 * oid
1254 			 */
1255 			if (!is_select_query && !query_context->is_parse_error)
1256 			{
1257 				num_oids = pool_extract_table_oids(node, &oids);
1258 
1259 				if (num_oids > 0)
1260 				{
1261 					/* Save to oid buffer */
1262 					for (i = 0; i < num_oids; i++)
1263 					{
1264 						pool_add_dml_table_oid(oids[i]);
1265 					}
1266 				}
1267 			}
1268 		}
1269 
1270 		/*
1271 		 * If we are in SI mode, start an internal transaction if needed.
1272 		 */
1273 		if (pool_config->backend_clustering_mode == CM_SNAPSHOT_ISOLATION)
1274 		{
1275 			start_internal_transaction(frontend, backend, node);
1276 		}
1277 
1278 		/*
1279 		 * Get snapshot if needed.
1280 		 */
1281 		si_get_snapshot(frontend, backend, node,
1282 						!INTERNAL_TRANSACTION_STARTED(backend, MAIN_NODE_ID));
1283 
1284 		/*
1285 		 * Decide where to send query
1286 		 */
1287 		pool_where_to_send(query_context, query_context->original_query,
1288 						   query_context->parse_tree);
1289 
1290 		if (pool_config->disable_load_balance_on_write == DLBOW_DML_ADAPTIVE && strlen(name) != 0)
1291 			pool_setall_node_to_be_sent(query_context);
1292 
1293 		if (REPLICATION)
1294 		{
1295 			char	   *rewrite_query;
1296 			bool		rewrite_to_params = true;
1297 
1298 			/*
1299 			 * rewrite `now()'. if stmt is unnamed, we rewrite `now()' to
1300 			 * timestamp constant. else we rewrite `now()' to params and
1301 			 * expand that at Bind message.
1302 			 */
1303 			if (*name == '\0')
1304 				rewrite_to_params = false;
1305 			msg->num_tsparams = 0;
1306 			rewrite_query = rewrite_timestamp(backend, node, rewrite_to_params, msg);
1307 			if (rewrite_query != NULL)
1308 			{
1309 				int			alloc_len = len - strlen(stmt) + strlen(rewrite_query);
1310 
1311 				/* switch memory context */
1312 				MemoryContext oldcxt = MemoryContextSwitchTo(session_context->memory_context);
1313 
1314 				contents = repalloc(msg->contents, alloc_len);
1315 				MemoryContextSwitchTo(oldcxt);
1316 
1317 				strcpy(contents, name);
1318 				strcpy(contents + strlen(name) + 1, rewrite_query);
1319 				memcpy(contents + strlen(name) + strlen(rewrite_query) + 2,
1320 					   stmt + strlen(stmt) + 1,
1321 					   len - (strlen(name) + strlen(stmt) + 2));
1322 
1323 				len = alloc_len;
1324 				name = contents;
1325 				stmt = contents + strlen(name) + 1;
1326 				ereport(DEBUG1,
1327 						(errmsg("Parse: rewrite query name:\"%s\" statement:\"%s\" len=%d", name, stmt, len)));
1328 
1329 				msg->len = len;
1330 				msg->contents = contents;
1331 
1332 				query_context->rewritten_query = rewrite_query;
1333 			}
1334 		}
1335 
1336 		/*
1337 		 * If the query is BEGIN READ WRITE in main replica mode, we send
1338 		 * BEGIN instead of it to standbys. original_query which is
1339 		 * BEGIN READ WRITE is sent to primary. rewritten_query which is BEGIN
1340 		 * is sent to standbys.
1341 		 */
1342 		if (is_start_transaction_query(query_context->parse_tree) &&
1343 			is_read_write((TransactionStmt *) query_context->parse_tree) &&
1344 			MAIN_REPLICA)
1345 		{
1346 			query_context->rewritten_query = pstrdup("BEGIN");
1347 		}
1348 	}
1349 
1350 	/*
1351 	 * If not in streaming or logical replication mode, send "SYNC" message if not in a transaction.
1352 	 */
1353 	if (!SL_MODE)
1354 	{
1355 		char		kind;
1356 
1357 		if (TSTATE(backend, MAIN_NODE_ID) != 'T')
1358 		{
1359 			int			i;
1360 
1361 			/* synchronize transaction state */
1362 			for (i = 0; i < NUM_BACKENDS; i++)
1363 			{
1364 				if (!VALID_BACKEND(i))
1365 					continue;
1366 				/* send sync message */
1367 				send_extended_protocol_message(backend, i, "S", 0, "");
1368 			}
1369 
1370 			kind = pool_read_kind(backend);
1371 			if (kind != 'Z')
1372 				ereport(ERROR,
1373 						(errmsg("unable to parse the query"),
1374 						 errdetail("invalid read kind")));
1375 
1376 			/*
1377 			 * SYNC message returns "Ready for Query" message.
1378 			 */
1379 			if (ReadyForQuery(frontend, backend, 0, false) != POOL_CONTINUE)
1380 			{
1381 				pool_query_context_destroy(query_context);
1382 				return POOL_END;
1383 			}
1384 
1385 			/*
1386 			 * set in_progress flag, because ReadyForQuery unset it.
1387 			 * in_progress flag influences VALID_BACKEND.
1388 			 */
1389 			if (!pool_is_query_in_progress())
1390 				pool_set_query_in_progress();
1391 		}
1392 
1393 		if (is_strict_query(query_context->parse_tree))
1394 		{
1395 			start_internal_transaction(frontend, backend, query_context->parse_tree);
1396 			allow_close_transaction = 1;
1397 		}
1398 
1399 		if (insert_stmt_with_lock)
1400 		{
1401 			/* start a transaction if needed and lock the table */
1402 			status = insert_lock(frontend, backend, stmt, (InsertStmt *) query_context->parse_tree, insert_stmt_with_lock);
1403 			if (status != POOL_CONTINUE)
1404 				ereport(ERROR,
1405 						(errmsg("unable to parse the query"),
1406 						 errdetail("unable to get insert lock")));
1407 
1408 		}
1409 	}
1410 
1411 	if (REPLICATION || SLONY)
1412 	{
1413 		/*
1414 		 * We must synchronize because Parse message acquires table locks.
1415 		 */
1416 		ereport(DEBUG1,
1417 				(errmsg("Parse: waiting for main node completing the query")));
1418 		pool_extended_send_and_wait(query_context, "P", len, contents, 1, MAIN_NODE_ID, false);
1419 
1420 
1421 		/*
1422 		 * We must check deadlock error because a aborted transaction by
1423 		 * detecting deadlock isn't same on all nodes. If a transaction is
1424 		 * aborted on main node, pgpool send a error query to another nodes.
1425 		 */
1426 		deadlock_detected = detect_deadlock_error(MAIN(backend), MAJOR(backend));
1427 
1428 		/*
1429 		 * Check if other than deadlock error detected.  If so, emit log. This
1430 		 * is useful when invalid encoding error occurs. In this case,
1431 		 * PostgreSQL does not report what statement caused that error and
1432 		 * make users confused.
1433 		 */
1434 		per_node_error_log(backend, MAIN_NODE_ID, stmt, "Parse: Error or notice message from backend: ", true);
1435 
1436 		if (deadlock_detected)
1437 		{
1438 			POOL_QUERY_CONTEXT *error_qc;
1439 
1440 			error_qc = pool_init_query_context();
1441 
1442 
1443 			pool_start_query(error_qc, POOL_ERROR_QUERY, strlen(POOL_ERROR_QUERY) + 1, node);
1444 			pool_copy_prep_where(query_context->where_to_send, error_qc->where_to_send);
1445 
1446 			ereport(LOG,
1447 					(errmsg("Parse: received deadlock error message from main node")));
1448 
1449 			pool_send_and_wait(error_qc, -1, MAIN_NODE_ID);
1450 
1451 			pool_query_context_destroy(error_qc);
1452 
1453 
1454 			pool_set_query_in_progress();
1455 			session_context->query_context = query_context;
1456 		}
1457 		else
1458 		{
1459 			pool_extended_send_and_wait(query_context, "P", len, contents, -1, MAIN_NODE_ID, false);
1460 		}
1461 	}
1462 	else if (SL_MODE)
1463 	{
1464 		POOL_PENDING_MESSAGE *pmsg;
1465 
1466 		/*
1467 		 * XXX fix me:even with streaming replication mode, couldn't we have a
1468 		 * deadlock
1469 		 */
1470 		pool_set_query_in_progress();
1471 #ifdef NOT_USED
1472 		pool_clear_sync_map();
1473 #endif
1474 		pool_extended_send_and_wait(query_context, "P", len, contents, 1, MAIN_NODE_ID, true);
1475 		pool_extended_send_and_wait(query_context, "P", len, contents, -1, MAIN_NODE_ID, true);
1476 		pool_add_sent_message(session_context->uncompleted_message);
1477 
1478 		/* Add pending message */
1479 		pmsg = pool_pending_message_create('P', len, contents);
1480 		pool_pending_message_dest_set(pmsg, query_context);
1481 		pool_pending_message_add(pmsg);
1482 		pool_pending_message_free_pending_message(pmsg);
1483 
1484 		pool_unset_query_in_progress();
1485 	}
1486 	else
1487 	{
1488 		pool_extended_send_and_wait(query_context, "P", len, contents, 1, MAIN_NODE_ID, false);
1489 	}
1490 
1491 	return POOL_CONTINUE;
1492 
1493 }
1494 
1495 POOL_STATUS
Bind(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend,int len,char * contents)1496 Bind(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend,
1497 	 int len, char *contents)
1498 {
1499 	char	   *pstmt_name;
1500 	char	   *portal_name;
1501 	char	   *rewrite_msg = NULL;
1502 	POOL_SENT_MESSAGE *parse_msg;
1503 	POOL_SENT_MESSAGE *bind_msg;
1504 	POOL_SESSION_CONTEXT *session_context;
1505 	POOL_QUERY_CONTEXT *query_context;
1506 	int			insert_stmt_with_lock = 0;
1507 	bool		nowait;
1508 
1509 	/* Get session context */
1510 	session_context = pool_get_session_context(false);
1511 
1512 	/*
1513 	 * Rewrite message
1514 	 */
1515 	portal_name = contents;
1516 	pstmt_name = contents + strlen(portal_name) + 1;
1517 
1518 	if (pool_config->log_client_messages)
1519 		ereport(LOG,
1520 				(errmsg("Bind message from frontend."),
1521 				 errdetail("portal: \"%s\", statement: \"%s\"", portal_name, pstmt_name)));
1522 	parse_msg = pool_get_sent_message('Q', pstmt_name, POOL_SENT_MESSAGE_CREATED);
1523 	if (!parse_msg)
1524 		parse_msg = pool_get_sent_message('P', pstmt_name, POOL_SENT_MESSAGE_CREATED);
1525 	if (!parse_msg)
1526 	{
1527 		ereport(FATAL,
1528 				(errmsg("unable to bind"),
1529 				 errdetail("cannot get parse message \"%s\"", pstmt_name)));
1530 	}
1531 
1532 	bind_msg = pool_create_sent_message('B', len, contents,
1533 										parse_msg->num_tsparams, portal_name,
1534 										parse_msg->query_context);
1535 
1536 	query_context = parse_msg->query_context;
1537 	if (!query_context)
1538 	{
1539 		ereport(FATAL,
1540 				(errmsg("unable to bind"),
1541 				 errdetail("cannot get the query context")));
1542 	}
1543 
1544 	/*
1545 	 * If the query can be cached, save its offset of query text in bind
1546 	 * message's content.
1547 	 */
1548 	if (query_context->is_cache_safe)
1549 	{
1550 		bind_msg->param_offset = sizeof(char) * (strlen(portal_name) + strlen(pstmt_name) + 2);
1551 	}
1552 
1553 	session_context->uncompleted_message = bind_msg;
1554 
1555 	/* rewrite bind message */
1556 	if (REPLICATION && bind_msg->num_tsparams > 0)
1557 	{
1558 		rewrite_msg = bind_rewrite_timestamp(backend, bind_msg, contents, &len);
1559 		if (rewrite_msg != NULL)
1560 			contents = rewrite_msg;
1561 	}
1562 
1563 	session_context->query_context = query_context;
1564 
1565 	/*
1566 	 * Take care the case when the previous parse message has been sent to
1567 	 * other than primary node. In this case, we send a parse message to the
1568 	 * primary node.
1569 	 */
1570 	if (pool_config->load_balance_mode && pool_is_writing_transaction() &&
1571 		TSTATE(backend, MAIN_REPLICA ? PRIMARY_NODE_ID : REAL_MAIN_NODE_ID) == 'T' &&
1572 		pool_config->disable_load_balance_on_write != DLBOW_OFF)
1573 	{
1574 		if (!SL_MODE)
1575 		{
1576 			pool_where_to_send(query_context, query_context->original_query,
1577 							   query_context->parse_tree);
1578 		}
1579 
1580 		if (parse_before_bind(frontend, backend, parse_msg, bind_msg) != POOL_CONTINUE)
1581 			return POOL_END;
1582 	}
1583 
1584 	if (pool_config->disable_load_balance_on_write == DLBOW_DML_ADAPTIVE &&
1585 		TSTATE(backend, MAIN_REPLICA ? PRIMARY_NODE_ID : REAL_MAIN_NODE_ID) == 'T')
1586 	{
1587 		pool_where_to_send(query_context, query_context->original_query,
1588 							query_context->parse_tree);
1589 	}
1590 
1591 	/*
1592 	 * Start a transaction if necessary in replication mode
1593 	 */
1594 	if (REPLICATION)
1595 	{
1596 		ereport(DEBUG1,
1597 				(errmsg("Bind: checking strict query")));
1598 		if (is_strict_query(query_context->parse_tree))
1599 		{
1600 			ereport(DEBUG1,
1601 					(errmsg("Bind: strict query")));
1602 			start_internal_transaction(frontend, backend, query_context->parse_tree);
1603 			allow_close_transaction = 1;
1604 		}
1605 
1606 		ereport(DEBUG1,
1607 				(errmsg("Bind: checking insert lock")));
1608 		insert_stmt_with_lock = need_insert_lock(backend, query_context->original_query, query_context->parse_tree);
1609 		if (insert_stmt_with_lock)
1610 		{
1611 			ereport(DEBUG1,
1612 					(errmsg("Bind: issuing insert lock")));
1613 			/* issue a LOCK command to keep consistency */
1614 			if (insert_lock(frontend, backend, query_context->original_query, (InsertStmt *) query_context->parse_tree, insert_stmt_with_lock) != POOL_CONTINUE)
1615 			{
1616 				pool_query_context_destroy(query_context);
1617 				return POOL_END;
1618 			}
1619 		}
1620 	}
1621 
1622 	ereport(DEBUG1,
1623 			(errmsg("Bind: waiting for main node completing the query")));
1624 
1625 	pool_set_query_in_progress();
1626 
1627 	if (SL_MODE)
1628 	{
1629 		nowait = true;
1630 		session_context->query_context = query_context = bind_msg->query_context;
1631 	}
1632 	else
1633 		nowait = false;
1634 
1635 	pool_extended_send_and_wait(query_context, "B", len, contents, 1, MAIN_NODE_ID, nowait);
1636 	pool_extended_send_and_wait(query_context, "B", len, contents, -1, MAIN_NODE_ID, nowait);
1637 
1638 	if (SL_MODE)
1639 	{
1640 		POOL_PENDING_MESSAGE *pmsg;
1641 
1642 		pool_unset_query_in_progress();
1643 		pool_add_sent_message(session_context->uncompleted_message);
1644 
1645 		/* Add pending message */
1646 		pmsg = pool_pending_message_create('B', len, contents);
1647 		pool_pending_message_dest_set(pmsg, query_context);
1648 		pool_pending_message_query_set(pmsg, query_context);
1649 		pool_pending_message_add(pmsg);
1650 		pool_pending_message_free_pending_message(pmsg);
1651 	}
1652 
1653 	if (rewrite_msg)
1654 		pfree(rewrite_msg);
1655 	return POOL_CONTINUE;
1656 }
1657 
1658 POOL_STATUS
Describe(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend,int len,char * contents)1659 Describe(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend,
1660 		 int len, char *contents)
1661 {
1662 	POOL_SENT_MESSAGE *msg;
1663 	POOL_SESSION_CONTEXT *session_context;
1664 	POOL_QUERY_CONTEXT *query_context;
1665 
1666 	bool		nowait;
1667 
1668 	/* Get session context */
1669 	session_context = pool_get_session_context(false);
1670 
1671 	/* Prepared Statement */
1672 	if (*contents == 'S')
1673 	{
1674 		if (pool_config->log_client_messages)
1675 			ereport(LOG,
1676 					(errmsg("Describe message from frontend."),
1677 					 errdetail("statement: \"%s\"", contents + 1)));
1678 		msg = pool_get_sent_message('Q', contents + 1, POOL_SENT_MESSAGE_CREATED);
1679 		if (!msg)
1680 			msg = pool_get_sent_message('P', contents + 1, POOL_SENT_MESSAGE_CREATED);
1681 		if (!msg)
1682 			ereport(FATAL,
1683 					(return_code(2),
1684 					 errmsg("unable to execute Describe"),
1685 					 errdetail("unable to get the parse message")));
1686 	}
1687 	/* Portal */
1688 	else
1689 	{
1690 		if (pool_config->log_client_messages)
1691 			ereport(LOG,
1692 					(errmsg("Describe message from frontend."),
1693 					 errdetail("portal: \"%s\"", contents + 1)));
1694 		msg = pool_get_sent_message('B', contents + 1, POOL_SENT_MESSAGE_CREATED);
1695 		if (!msg)
1696 			ereport(FATAL,
1697 					(return_code(2),
1698 					 errmsg("unable to execute Describe"),
1699 					 errdetail("unable to get the bind message")));
1700 	}
1701 
1702 	query_context = msg->query_context;
1703 
1704 	if (query_context == NULL)
1705 		ereport(FATAL,
1706 				(return_code(2),
1707 				 errmsg("unable to execute Describe"),
1708 				 errdetail("unable to get the query context")));
1709 
1710 	session_context->query_context = query_context;
1711 
1712 	/*
1713 	 * Calling pool_where_to_send here is dangerous because the node
1714 	 * parse/bind has been sent could be change by pool_where_to_send() and it
1715 	 * leads to "portal not found" etc. errors.
1716 	 */
1717 	ereport(DEBUG1,
1718 			(errmsg("Describe: waiting for main node completing the query")));
1719 
1720 	nowait = SL_MODE;
1721 
1722 	pool_set_query_in_progress();
1723 	pool_extended_send_and_wait(query_context, "D", len, contents, 1, MAIN_NODE_ID, nowait);
1724 	pool_extended_send_and_wait(query_context, "D", len, contents, -1, MAIN_NODE_ID, nowait);
1725 
1726 	if (SL_MODE)
1727 	{
1728 		POOL_PENDING_MESSAGE *pmsg;
1729 
1730 		/* Add pending message */
1731 		pmsg = pool_pending_message_create('D', len, contents);
1732 		pool_pending_message_dest_set(pmsg, query_context);
1733 		pool_pending_message_query_set(pmsg, query_context);
1734 		pool_pending_message_add(pmsg);
1735 		pool_pending_message_free_pending_message(pmsg);
1736 
1737 		pool_unset_query_in_progress();
1738 	}
1739 
1740 	return POOL_CONTINUE;
1741 }
1742 
1743 
1744 POOL_STATUS
Close(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend,int len,char * contents)1745 Close(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend,
1746 	  int len, char *contents)
1747 {
1748 	POOL_SENT_MESSAGE *msg;
1749 	POOL_SESSION_CONTEXT *session_context;
1750 	POOL_QUERY_CONTEXT *query_context;
1751 
1752 	/* Get session context */
1753 	session_context = pool_get_session_context(false);
1754 
1755 	/* Prepared Statement */
1756 	if (*contents == 'S')
1757 	{
1758 		msg = pool_get_sent_message('Q', contents + 1, POOL_SENT_MESSAGE_CREATED);
1759 		if (pool_config->log_client_messages)
1760 			ereport(LOG,
1761 					(errmsg("Close message from frontend."),
1762 					 errdetail("statement: \"%s\"", contents + 1)));
1763 		if (!msg)
1764 			msg = pool_get_sent_message('P', contents + 1, POOL_SENT_MESSAGE_CREATED);
1765 	}
1766 	/* Portal */
1767 	else if (*contents == 'P')
1768 	{
1769 		if (pool_config->log_client_messages)
1770 			ereport(LOG,
1771 					(errmsg("Close message from frontend."),
1772 					 errdetail("portal: \"%s\"", contents + 1)));
1773 		msg = pool_get_sent_message('B', contents + 1, POOL_SENT_MESSAGE_CREATED);
1774 	}
1775 	else
1776 		ereport(FATAL,
1777 				(return_code(2),
1778 				 errmsg("unable to execute close, invalid message")));
1779 
1780 	/*
1781 	 * As per the postgresql, calling close on non existing portals or
1782 	 * statements is not an error. So on the same footings we will ignore all
1783 	 * such calls and return the close complete message to clients with out
1784 	 * going to backend
1785 	 */
1786 	if (!msg)
1787 	{
1788 		int			len = htonl(4);
1789 
1790 		pool_set_command_success();
1791 		pool_unset_query_in_progress();
1792 
1793 		pool_write(frontend, "3", 1);
1794 		pool_write_and_flush(frontend, &len, sizeof(len));
1795 
1796 		return POOL_CONTINUE;
1797 	}
1798 
1799 	session_context->uncompleted_message = msg;
1800 	query_context = msg->query_context;
1801 
1802 	if (!query_context)
1803 		ereport(FATAL,
1804 				(return_code(2),
1805 				 errmsg("unable to execute close"),
1806 				 errdetail("unable to get the query context")));
1807 
1808 	session_context->query_context = query_context;
1809 
1810 	/*
1811 	 * pool_where_to_send(query_context, query_context->original_query,
1812 	 * query_context->parse_tree);
1813 	 */
1814 
1815 	ereport(DEBUG1,
1816 			(errmsg("Close: waiting for main node completing the query")));
1817 
1818 	pool_set_query_in_progress();
1819 
1820 	if (!SL_MODE)
1821 	{
1822 		pool_extended_send_and_wait(query_context, "C", len, contents, 1, MAIN_NODE_ID, false);
1823 		pool_extended_send_and_wait(query_context, "C", len, contents, -1, MAIN_NODE_ID, false);
1824 	}
1825 	else
1826 	{
1827 		POOL_PENDING_MESSAGE *pmsg;
1828 		bool		where_to_send_save[MAX_NUM_BACKENDS];
1829 
1830 		/*
1831 		 * Parse_before_bind() may have sent a bind message to the primary
1832 		 * node id. So send the close message to the primary node as well.
1833 		 * Even if not, sending a close message for non existing
1834 		 * statement/portal is harmless. No error will happen.
1835 		 */
1836 		if (session_context->load_balance_node_id != PRIMARY_NODE_ID)
1837 		{
1838 			/* save where_to_send map */
1839 			memcpy(where_to_send_save, query_context->where_to_send, sizeof(where_to_send_save));
1840 
1841 			query_context->where_to_send[PRIMARY_NODE_ID] = true;
1842 			query_context->where_to_send[session_context->load_balance_node_id] = true;
1843 		}
1844 
1845 		pool_extended_send_and_wait(query_context, "C", len, contents, 1, MAIN_NODE_ID, true);
1846 		pool_extended_send_and_wait(query_context, "C", len, contents, -1, MAIN_NODE_ID, true);
1847 
1848 		/* Add pending message */
1849 		pmsg = pool_pending_message_create('C', len, contents);
1850 		pool_pending_message_dest_set(pmsg, query_context);
1851 		pool_pending_message_query_set(pmsg, query_context);
1852 		pool_pending_message_add(pmsg);
1853 		pool_pending_message_free_pending_message(pmsg);
1854 
1855 		if (session_context->load_balance_node_id != PRIMARY_NODE_ID)
1856 		{
1857 			/* Restore where_to_send map */
1858 			memcpy(query_context->where_to_send, where_to_send_save, sizeof(where_to_send_save));
1859 		}
1860 
1861 #ifdef NOT_USED
1862 		dump_pending_message();
1863 #endif
1864 		pool_unset_query_in_progress();
1865 
1866 		/*
1867 		 * Remove sent message
1868 		 */
1869 		ereport(DEBUG1,
1870 				(errmsg("Close: removing sent message %c %s", *contents, contents + 1)));
1871 		pool_set_sent_message_state(msg);
1872 	}
1873 
1874 	return POOL_CONTINUE;
1875 }
1876 
1877 
1878 POOL_STATUS
FunctionCall3(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend,int len,char * contents)1879 FunctionCall3(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend,
1880 			  int len, char *contents)
1881 {
1882 	/*
1883 	 * If Function call message for lo_creat, rewrite it
1884 	 */
1885 	char	   *rewrite_lo;
1886 	int			rewrite_len;
1887 
1888 	rewrite_lo = pool_rewrite_lo_creat('F', contents, len, frontend,
1889 									   backend, &rewrite_len);
1890 
1891 	if (rewrite_lo != NULL)
1892 	{
1893 		contents = rewrite_lo;
1894 		len = rewrite_len;
1895 	}
1896 	return SimpleForwardToBackend('F', frontend, backend, len, contents);
1897 }
1898 
1899 /*
1900  * Process ReadyForQuery('Z') message.
1901  * If send_ready is true, send 'Z' message to frontend.
1902  * If cache_commit is true, commit or discard query cache according to
1903  * transaction state.
1904  *
1905  * - if the error status "mismatch_ntuples" is set, send an error query
1906  *	 to all DB nodes to abort transaction or do failover.
1907  * - internal transaction is closed
1908  */
1909 POOL_STATUS
ReadyForQuery(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend,bool send_ready,bool cache_commit)1910 ReadyForQuery(POOL_CONNECTION * frontend,
1911 			  POOL_CONNECTION_POOL * backend, bool send_ready, bool cache_commit)
1912 {
1913 	int			i;
1914 	int			len;
1915 	signed char kind;
1916 	signed char state = 0;
1917 	POOL_SESSION_CONTEXT *session_context;
1918 	Node	   *node = NULL;
1919 	char	   *query = NULL;
1920 	bool		got_estate = false;
1921 
1922 	/*
1923 	 * It is possible that the "ignore until sync is received" flag was set if
1924 	 * we send sync to backend and the backend returns error. Let's reset the
1925 	 * flag unconditionally because we apparently have received a "ready for
1926 	 * query" message from backend.
1927 	 */
1928 	pool_unset_ignore_till_sync();
1929 
1930 	/* Reset previous message */
1931 	pool_pending_message_reset_previous_message();
1932 
1933 	/* Get session context */
1934 	session_context = pool_get_session_context(false);
1935 
1936 	/*
1937 	 * If the numbers of update tuples are differ and
1938 	 * failover_if_affected_tuples_mismatch is false, we abort transactions by
1939 	 * using do_error_command.  If failover_if_affected_tuples_mismatch is
1940 	 * true, trigger failover. This only works with PROTO_MAJOR_V3.
1941 	 */
1942 	if (session_context->mismatch_ntuples && MAJOR(backend) == PROTO_MAJOR_V3)
1943 	{
1944 		int			i;
1945 		char		kind;
1946 
1947 		/*
1948 		 * If failover_if_affected_tuples_mismatch, is true, then decide
1949 		 * victim nodes by using find_victim_nodes and degenerate them.
1950 		 */
1951 		if (pool_config->failover_if_affected_tuples_mismatch)
1952 		{
1953 			int		   *victim_nodes;
1954 			int			number_of_nodes;
1955 			char		msgbuf[128];
1956 
1957 			victim_nodes = find_victim_nodes(session_context->ntuples, NUM_BACKENDS,
1958 											 MAIN_NODE_ID, &number_of_nodes);
1959 			if (victim_nodes)
1960 			{
1961 				int			i;
1962 				String	   *msg;
1963 
1964 				msg = init_string("ReadyForQuery: Degenerate backends:");
1965 
1966 				for (i = 0; i < number_of_nodes; i++)
1967 				{
1968 					snprintf(msgbuf, sizeof(msgbuf), " %d", victim_nodes[i]);
1969 					string_append_char(msg, msgbuf);
1970 				}
1971 				ereport(LOG,
1972 						(errmsg("processing ready for query message"),
1973 						 errdetail("%s", msg->data)));
1974 
1975 				free_string(msg);
1976 
1977 				msg = init_string("ReadyForQuery: Number of affected tuples are:");
1978 
1979 				for (i = 0; i < NUM_BACKENDS; i++)
1980 				{
1981 					snprintf(msgbuf, sizeof(msgbuf), " %d", session_context->ntuples[i]);
1982 					string_append_char(msg, msgbuf);
1983 				}
1984 				ereport(LOG,
1985 						(errmsg("processing ready for query message"),
1986 						 errdetail("%s", msg->data)));
1987 
1988 				free_string(msg);
1989 
1990 				degenerate_backend_set(victim_nodes, number_of_nodes, REQ_DETAIL_CONFIRMED | REQ_DETAIL_SWITCHOVER);
1991 				child_exit(POOL_EXIT_AND_RESTART);
1992 			}
1993 			else
1994 			{
1995 				ereport(LOG,
1996 						(errmsg("processing ready for query message"),
1997 						 errdetail("find_victim_nodes returned no victim node")));
1998 			}
1999 		}
2000 
2001 		/*
2002 		 * XXX: discard rest of ReadyForQuery packet
2003 		 */
2004 
2005 		if (pool_read_message_length(backend) < 0)
2006 			return POOL_END;
2007 
2008 		state = pool_read_kind(backend);
2009 		if (state < 0)
2010 			return POOL_END;
2011 
2012 		for (i = 0; i < NUM_BACKENDS; i++)
2013 		{
2014 			if (VALID_BACKEND(i))
2015 			{
2016 				/* abort transaction on all nodes. */
2017 				do_error_command(CONNECTION(backend, i), PROTO_MAJOR_V3);
2018 			}
2019 		}
2020 
2021 		/* loop through until we get ReadyForQuery */
2022 		for (;;)
2023 		{
2024 			kind = pool_read_kind(backend);
2025 			if (kind < 0)
2026 				return POOL_END;
2027 
2028 			if (kind == 'Z')
2029 				break;
2030 
2031 
2032 			/* put the message back to read buffer */
2033 			for (i = 0; i < NUM_BACKENDS; i++)
2034 			{
2035 				if (VALID_BACKEND(i))
2036 				{
2037 					pool_unread(CONNECTION(backend, i), &kind, 1);
2038 				}
2039 			}
2040 
2041 			/* discard rest of the packet */
2042 			if (pool_discard_packet(backend) != POOL_CONTINUE)
2043 				return POOL_END;
2044 		}
2045 		session_context->mismatch_ntuples = false;
2046 	}
2047 
2048 	/*
2049 	 * if a transaction is started for insert lock, we need to close the
2050 	 * transaction.
2051 	 */
2052 	/* if (pool_is_query_in_progress() && allow_close_transaction) */
2053 	if (allow_close_transaction)
2054 	{
2055 		bool internal_transaction_started = INTERNAL_TRANSACTION_STARTED(backend, MAIN_NODE_ID);
2056 
2057 		/*
2058 		 * If we are running in snapshot isolation mode and started an
2059 		 * internal transaction, wait until snapshot is prepared.
2060 		 */
2061 		if (pool_config->backend_clustering_mode == CM_SNAPSHOT_ISOLATION &&
2062 			internal_transaction_started)
2063 		{
2064 			si_commit_request();
2065 		}
2066 
2067 		/* close an internal transaction */
2068 		if (end_internal_transaction(frontend, backend) != POOL_CONTINUE)
2069 			return POOL_END;
2070 
2071 		/*
2072 		 * If we are running in snapshot isolation mode and started an
2073 		 * internal transaction, notice that commit is done.
2074 		 */
2075 		if (pool_config->backend_clustering_mode == CM_SNAPSHOT_ISOLATION &&
2076 			internal_transaction_started)
2077 		{
2078 			si_commit_done();
2079 			/* reset transaction readonly flag */
2080 			session_context->transaction_read_only = false;
2081 		}
2082 	}
2083 
2084 	if (MAJOR(backend) == PROTO_MAJOR_V3)
2085 	{
2086 		if ((len = pool_read_message_length(backend)) < 0)
2087 			return POOL_END;
2088 
2089 		/*
2090 		 * Set transaction state for each node
2091 		 */
2092 		state = TSTATE(backend,
2093 					   MAIN_REPLICA ? PRIMARY_NODE_ID : REAL_MAIN_NODE_ID);
2094 
2095 		for (i = 0; i < NUM_BACKENDS; i++)
2096 		{
2097 			if (!VALID_BACKEND(i))
2098 				continue;
2099 
2100 			if (pool_read(CONNECTION(backend, i), &kind, sizeof(kind)))
2101 				return POOL_END;
2102 
2103 			TSTATE(backend, i) = kind;
2104 			ereport(DEBUG5,
2105 					(errmsg("processing ReadyForQuery"),
2106 					 errdetail("transaction state '%c'(%02x)", state, state)));
2107 
2108 			/*
2109 			 * The transaction state to be returned to frontend is main node's.
2110 			 */
2111 			if (i == (MAIN_REPLICA ? PRIMARY_NODE_ID : REAL_MAIN_NODE_ID))
2112 			{
2113 				state = kind;
2114 			}
2115 
2116 			/*
2117 			 * However, if the state is 'E', then frontend should had been
2118 			 * already reported an ERROR. So, to match with that, let be the
2119 			 * state to be returned to frontend.
2120 			 */
2121 			if (kind == 'E')
2122 				got_estate = true;
2123 		}
2124 	}
2125 
2126 	/*
2127 	 * Make sure that no message remains in the backend buffer.  If something
2128 	 * remains, it could be an "out of band" ERROR or FATAL error, or a NOTICE
2129 	 * message, which was generated by backend itself for some reasons like
2130 	 * recovery conflict or SIGTERM received. If so, let's consume it and emit
2131 	 * a log message so that next read_kind_from_backend() will not hang in
2132 	 * trying to read from backend which may have not produced such a message.
2133 	 */
2134 	if (pool_is_query_in_progress())
2135 	{
2136 		for (i = 0; i < NUM_BACKENDS; i++)
2137 		{
2138 			if (!VALID_BACKEND(i))
2139 				continue;
2140 			if (!pool_read_buffer_is_empty(CONNECTION(backend, i)))
2141 				per_node_error_log(backend, i,
2142 								   "(out of band message)",
2143 								   "ReadyForQuery: Error or notice message from backend: ", false);
2144 		}
2145 	}
2146 
2147 	if (send_ready)
2148 	{
2149 		pool_write(frontend, "Z", 1);
2150 
2151 		if (MAJOR(backend) == PROTO_MAJOR_V3)
2152 		{
2153 			len = htonl(len);
2154 			pool_write(frontend, &len, sizeof(len));
2155 			if (got_estate)
2156 				state = 'E';
2157 			pool_write(frontend, &state, 1);
2158 		}
2159 		pool_flush(frontend);
2160 	}
2161 
2162 	if (pool_is_query_in_progress())
2163 	{
2164 		node = pool_get_parse_tree();
2165 		query = pool_get_query_string();
2166 
2167 		if (pool_is_command_success())
2168 		{
2169 			if (node)
2170 				pool_at_command_success(frontend, backend);
2171 
2172 			/* Memory cache enabled? */
2173 			if (cache_commit && pool_config->memory_cache_enabled)
2174 			{
2175 
2176 				/*
2177 				 * If we are doing extended query and the state is after
2178 				 * EXECUTE, then we can commit cache. We check latter
2179 				 * condition by looking at query_context->query_w_hex. This
2180 				 * check is necessary for certain frame work such as PHP PDO.
2181 				 * It sends Sync message right after PARSE and it produces
2182 				 * "Ready for query" message from backend.
2183 				 */
2184 				if (pool_is_doing_extended_query_message())
2185 				{
2186 					if (session_context->query_context &&
2187 						session_context->query_context->query_state[MAIN_NODE_ID] == POOL_EXECUTE_COMPLETE)
2188 					{
2189 						pool_handle_query_cache(backend, session_context->query_context->query_w_hex, node, state);
2190 						if (session_context->query_context->query_w_hex)
2191 							pfree(session_context->query_context->query_w_hex);
2192 						session_context->query_context->query_w_hex = NULL;
2193 					}
2194 				}
2195 				else
2196 				{
2197 					if (MAJOR(backend) != PROTO_MAJOR_V3)
2198 					{
2199 						state = 'I';	/* XXX I don't think query cache works
2200 										 * with PROTO2 protocol */
2201 					}
2202 					pool_handle_query_cache(backend, query, node, state);
2203 				}
2204 			}
2205 		}
2206 
2207 		/*
2208 		 * If PREPARE or extended query protocol commands caused error, remove
2209 		 * the temporary saved message. (except when ReadyForQuery() is called
2210 		 * during Parse() of extended queries)
2211 		 */
2212 		else
2213 		{
2214 			if ((pool_is_doing_extended_query_message() &&
2215 				 session_context->query_context &&
2216 				 session_context->query_context->query_state[MAIN_NODE_ID] != POOL_UNPARSED &&
2217 				 session_context->uncompleted_message) ||
2218 				(!pool_is_doing_extended_query_message() && session_context->uncompleted_message &&
2219 				 session_context->uncompleted_message->kind != 0))
2220 			{
2221 				pool_add_sent_message(session_context->uncompleted_message);
2222 				pool_remove_sent_message(session_context->uncompleted_message->kind,
2223 										 session_context->uncompleted_message->name);
2224 				session_context->uncompleted_message = NULL;
2225 			}
2226 		}
2227 
2228 		pool_unset_query_in_progress();
2229 	}
2230 	if (!pool_is_doing_extended_query_message())
2231 	{
2232 		if (!(node && IsA(node, PrepareStmt)))
2233 		{
2234 			pool_query_context_destroy(pool_get_session_context(false)->query_context);
2235 		}
2236 	}
2237 
2238 	/*
2239 	 * Show ps idle status
2240 	 */
2241 	pool_ps_idle_display(backend);
2242 
2243 	return POOL_CONTINUE;
2244 }
2245 
2246 /*
2247  * Close running transactions on standbys.
2248  */
close_standby_transactions(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend)2249 static POOL_STATUS close_standby_transactions(POOL_CONNECTION * frontend,
2250 											  POOL_CONNECTION_POOL * backend)
2251 {
2252 	int			i;
2253 
2254 	for (i = 0; i < NUM_BACKENDS; i++)
2255 	{
2256 		if (CONNECTION_SLOT(backend, i) &&
2257 			TSTATE(backend, i) == 'T' &&
2258 			BACKEND_INFO(i).backend_status == CON_UP &&
2259 			(MAIN_REPLICA ? PRIMARY_NODE_ID : REAL_MAIN_NODE_ID) != i)
2260 		{
2261 			per_node_statement_log(backend, i, "COMMIT");
2262 			if (do_command(frontend, CONNECTION(backend, i), "COMMIT", MAJOR(backend),
2263 						   MAIN_CONNECTION(backend)->pid,
2264 						   MAIN_CONNECTION(backend)->key, 0) != POOL_CONTINUE)
2265 				ereport(ERROR,
2266 						(errmsg("unable to close standby transactions"),
2267 						 errdetail("do_command returned DEADLOCK status")));
2268 		}
2269 	}
2270 	return POOL_CONTINUE;
2271 }
2272 
2273 POOL_STATUS
ParseComplete(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend)2274 ParseComplete(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend)
2275 {
2276 	POOL_SESSION_CONTEXT *session_context;
2277 
2278 	/* Get session context */
2279 	session_context = pool_get_session_context(false);
2280 
2281 	if (!SL_MODE && session_context->uncompleted_message)
2282 	{
2283 		POOL_QUERY_CONTEXT *qc;
2284 
2285 		pool_add_sent_message(session_context->uncompleted_message);
2286 
2287 		qc = session_context->uncompleted_message->query_context;
2288 		if (qc)
2289 			pool_set_query_state(qc, POOL_PARSE_COMPLETE);
2290 
2291 		session_context->uncompleted_message = NULL;
2292 	}
2293 
2294 	return SimpleForwardToFrontend('1', frontend, backend);
2295 }
2296 
2297 POOL_STATUS
BindComplete(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend)2298 BindComplete(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend)
2299 {
2300 	POOL_SESSION_CONTEXT *session_context;
2301 
2302 	/* Get session context */
2303 	session_context = pool_get_session_context(false);
2304 
2305 	if (!SL_MODE && session_context->uncompleted_message)
2306 	{
2307 		POOL_QUERY_CONTEXT *qc;
2308 
2309 		pool_add_sent_message(session_context->uncompleted_message);
2310 
2311 		qc = session_context->uncompleted_message->query_context;
2312 		if (qc)
2313 			pool_set_query_state(qc, POOL_BIND_COMPLETE);
2314 
2315 		session_context->uncompleted_message = NULL;
2316 	}
2317 
2318 	return SimpleForwardToFrontend('2', frontend, backend);
2319 }
2320 
2321 POOL_STATUS
CloseComplete(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend)2322 CloseComplete(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend)
2323 {
2324 	POOL_SESSION_CONTEXT *session_context;
2325 	POOL_STATUS status;
2326 	char		kind = ' ';
2327 	char	   *name = "";
2328 
2329 	/* Get session context */
2330 	session_context = pool_get_session_context(false);
2331 
2332 	/* Send CloseComplete(3) to frontend before removing the target message */
2333 	status = SimpleForwardToFrontend('3', frontend, backend);
2334 
2335 	/* Remove the target message */
2336 	if (SL_MODE)
2337 	{
2338 		POOL_PENDING_MESSAGE *pmsg;
2339 
2340 		pmsg = pool_pending_message_pull_out();
2341 
2342 		if (pmsg)
2343 		{
2344 			/* Sanity check */
2345 			if (pmsg->type != POOL_CLOSE)
2346 			{
2347 				ereport(LOG,
2348 						(errmsg("CloseComplete: pending message was not Close request: %s", pool_pending_message_type_to_string(pmsg->type))));
2349 			}
2350 			else
2351 			{
2352 				kind = pool_get_close_message_spec(pmsg);
2353 				name = pool_get_close_message_name(pmsg);
2354 				kind = kind == 'S' ? 'P' : 'B';
2355 			}
2356 			pool_pending_message_free_pending_message(pmsg);
2357 		}
2358 	}
2359 	else
2360 	{
2361 		if (session_context->uncompleted_message)
2362 		{
2363 			kind = session_context->uncompleted_message->kind;
2364 			name = session_context->uncompleted_message->name;
2365 			session_context->uncompleted_message = NULL;
2366 		}
2367 		else
2368 		{
2369 			ereport(ERROR,
2370 					(errmsg("processing CloseComplete, uncompleted message not found")));
2371 		}
2372 	}
2373 
2374 	if (kind != ' ')
2375 	{
2376 		pool_remove_sent_message(kind, name);
2377 		ereport(DEBUG1,
2378 				(errmsg("CloseComplete: remove sent message. kind:%c, name:%s",
2379 						kind, name)));
2380 		if (pool_config->memory_cache_enabled)
2381 		{
2382 			/*
2383 			 * Discard current temp query cache.  This is necessary for the
2384 			 * case of clustering mode is not either streaming or logical
2385 			 * replication. Because in the cases the cache has been already
2386 			 * discarded upon receiving CommandComplete.
2387 			 */
2388 			pool_discard_current_temp_query_cache();
2389 		}
2390 	}
2391 
2392 	return status;
2393 }
2394 
2395 POOL_STATUS
ParameterDescription(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend)2396 ParameterDescription(POOL_CONNECTION * frontend,
2397 					 POOL_CONNECTION_POOL * backend)
2398 {
2399 	int			len,
2400 				len1 = 0;
2401 	char	   *p = NULL;
2402 	char	   *p1 = NULL;
2403 	int			sendlen;
2404 	int			i;
2405 
2406 	POOL_SESSION_CONTEXT *session_context;
2407 	int			num_params,
2408 				send_num_params,
2409 				num_dmy;
2410 	char		kind = 't';
2411 
2412 	session_context = pool_get_session_context(false);
2413 
2414 	/* only in replication mode and rewritten query */
2415 	if (!REPLICATION || !session_context->query_context->rewritten_query)
2416 		return SimpleForwardToFrontend('t', frontend, backend);
2417 
2418 	/* get number of parameters in original query */
2419 	num_params = session_context->query_context->num_original_params;
2420 
2421 	pool_read(MAIN(backend), &len, sizeof(len));
2422 
2423 	len = ntohl(len);
2424 	len -= sizeof(int32);
2425 	len1 = len;
2426 
2427 	/* number of parameters in rewritten query is just discarded */
2428 	pool_read(MAIN(backend), &num_dmy, sizeof(int16));
2429 	len -= sizeof(int16);
2430 
2431 	p = pool_read2(MAIN(backend), len);
2432 	if (p == NULL)
2433 		ereport(ERROR,
2434 				(errmsg("ParameterDescription. connection error"),
2435 				 errdetail("read from backend failed")));
2436 
2437 
2438 	p1 = palloc(len);
2439 	memcpy(p1, p, len);
2440 
2441 	for (i = 0; i < NUM_BACKENDS; i++)
2442 	{
2443 		if (VALID_BACKEND(i) && !IS_MAIN_NODE_ID(i))
2444 		{
2445 			pool_read(CONNECTION(backend, i), &len, sizeof(len));
2446 
2447 			len = ntohl(len);
2448 			len -= sizeof(int32);
2449 
2450 			p = pool_read2(CONNECTION(backend, i), len);
2451 			if (p == NULL)
2452 				ereport(ERROR,
2453 						(errmsg("ParameterDescription. connection error"),
2454 						 errdetail("read from backend no %d failed", i)));
2455 
2456 			if (len != len1)
2457 				ereport(DEBUG1,
2458 						(errmsg("ParameterDescription. backends does not match"),
2459 						 errdetail("length does not match between backends main(%d) %d th backend(%d) kind:(%c)", len, i, len1, kind)));
2460 		}
2461 	}
2462 
2463 	pool_write(frontend, &kind, 1);
2464 
2465 	/* send back OIDs of parameters in original query and left are discarded */
2466 	len = sizeof(int16) + num_params * sizeof(int32);
2467 	sendlen = htonl(len + sizeof(int32));
2468 	pool_write(frontend, &sendlen, sizeof(int32));
2469 
2470 	send_num_params = htons(num_params);
2471 	pool_write(frontend, &send_num_params, sizeof(int16));
2472 
2473 	pool_write_and_flush(frontend, p1, num_params * sizeof(int32));
2474 
2475 	pfree(p1);
2476 	return POOL_CONTINUE;
2477 }
2478 
2479 POOL_STATUS
ErrorResponse3(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend)2480 ErrorResponse3(POOL_CONNECTION * frontend,
2481 			   POOL_CONNECTION_POOL * backend)
2482 {
2483 	POOL_STATUS ret;
2484 
2485 	ret = SimpleForwardToFrontend('E', frontend, backend);
2486 	if (ret != POOL_CONTINUE)
2487 		return ret;
2488 
2489 	if (!SL_MODE)
2490 		raise_intentional_error_if_need(backend);
2491 
2492 	return POOL_CONTINUE;
2493 }
2494 
2495 POOL_STATUS
FunctionCall(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend)2496 FunctionCall(POOL_CONNECTION * frontend,
2497 			 POOL_CONNECTION_POOL * backend)
2498 {
2499 	char		dummy[2];
2500 	int			oid;
2501 	int			argn;
2502 	int			i;
2503 
2504 	for (i = 0; i < NUM_BACKENDS; i++)
2505 	{
2506 		if (VALID_BACKEND(i))
2507 		{
2508 			pool_write(CONNECTION(backend, i), "F", 1);
2509 		}
2510 	}
2511 
2512 	/* dummy */
2513 	pool_read(frontend, dummy, sizeof(dummy));
2514 
2515 	for (i = 0; i < NUM_BACKENDS; i++)
2516 	{
2517 		if (VALID_BACKEND(i))
2518 		{
2519 			pool_write(CONNECTION(backend, i), dummy, sizeof(dummy));
2520 		}
2521 	}
2522 
2523 	/* function object id */
2524 	pool_read(frontend, &oid, sizeof(oid));
2525 
2526 	for (i = 0; i < NUM_BACKENDS; i++)
2527 	{
2528 		if (VALID_BACKEND(i))
2529 		{
2530 			pool_write(CONNECTION(backend, i), &oid, sizeof(oid));
2531 		}
2532 	}
2533 
2534 	/* number of arguments */
2535 	pool_read(frontend, &argn, sizeof(argn));
2536 
2537 	for (i = 0; i < NUM_BACKENDS; i++)
2538 	{
2539 		if (VALID_BACKEND(i))
2540 		{
2541 			pool_write(CONNECTION(backend, i), &argn, sizeof(argn));
2542 		}
2543 	}
2544 
2545 	argn = ntohl(argn);
2546 
2547 	for (i = 0; i < argn; i++)
2548 	{
2549 		int			len;
2550 		char	   *arg;
2551 
2552 		/* length of each argument in bytes */
2553 		pool_read(frontend, &len, sizeof(len));
2554 
2555 		for (i = 0; i < NUM_BACKENDS; i++)
2556 		{
2557 			if (VALID_BACKEND(i))
2558 			{
2559 				pool_write(CONNECTION(backend, i), &len, sizeof(len));
2560 			}
2561 		}
2562 
2563 		len = ntohl(len);
2564 
2565 		/* argument value itself */
2566 		if ((arg = pool_read2(frontend, len)) == NULL)
2567 			ereport(FATAL,
2568 					(return_code(2),
2569 					 errmsg("failed to process function call"),
2570 					 errdetail("read from frontend failed")));
2571 
2572 		for (i = 0; i < NUM_BACKENDS; i++)
2573 		{
2574 			if (VALID_BACKEND(i))
2575 			{
2576 				pool_write(CONNECTION(backend, i), arg, len);
2577 			}
2578 		}
2579 	}
2580 
2581 	for (i = 0; i < NUM_BACKENDS; i++)
2582 	{
2583 		if (VALID_BACKEND(i))
2584 		{
2585 			pool_flush(CONNECTION(backend, i));
2586 		}
2587 	}
2588 	return POOL_CONTINUE;
2589 }
2590 
2591 POOL_STATUS
ProcessFrontendResponse(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend)2592 ProcessFrontendResponse(POOL_CONNECTION * frontend,
2593 						POOL_CONNECTION_POOL * backend)
2594 {
2595 	char		fkind;
2596 	char	   *bufp = NULL;
2597 	char	   *contents;
2598 	POOL_STATUS status;
2599 	int			len = 0;
2600 
2601 	/* Get session context */
2602 	pool_get_session_context(false);
2603 
2604 	if (pool_read_buffer_is_empty(frontend) && frontend->no_forward != 0)
2605 		return POOL_CONTINUE;
2606 
2607 	/* Are we suspending reading from frontend? */
2608 	if (pool_is_suspend_reading_from_frontend())
2609 		return POOL_CONTINUE;
2610 
2611 	pool_read(frontend, &fkind, 1);
2612 
2613 	ereport(DEBUG5,
2614 			(errmsg("processing frontend response"),
2615 			 errdetail("received kind '%c'(%02x) from frontend", fkind, fkind)));
2616 
2617 
2618 	if (MAJOR(backend) == PROTO_MAJOR_V3)
2619 	{
2620 		if (pool_read(frontend, &len, sizeof(len)) < 0)
2621 			ereport(ERROR,
2622 					(errmsg("unable to process frontend response"),
2623 					 errdetail("failed to read message length from frontend. frontend abnormally exited")));
2624 
2625 		len = ntohl(len) - 4;
2626 		if (len > 0)
2627 			bufp = pool_read2(frontend, len);
2628 		else if (len < 0)
2629 			ereport(ERROR,
2630 					(errmsg("frontend message length is less than 4 (kind: %c)", fkind)));
2631 	}
2632 	else
2633 	{
2634 		if (fkind != 'F')
2635 			bufp = pool_read_string(frontend, &len, 0);
2636 	}
2637 
2638 	if (len > 0 && bufp == NULL)
2639 		ereport(ERROR,
2640 				(errmsg("unable to process frontend response"),
2641 				 errdetail("failed to read message from frontend. frontend abnormally exited")));
2642 
2643 	if (fkind != 'S' && pool_is_ignore_till_sync())
2644 	{
2645 		/*
2646 		 * Flag setting for calling ProcessBackendResponse() in
2647 		 * pool_process_query().
2648 		 */
2649 		if (!pool_is_query_in_progress())
2650 			pool_set_query_in_progress();
2651 
2652 		return POOL_CONTINUE;
2653 	}
2654 
2655 	pool_unset_doing_extended_query_message();
2656 
2657 	/*
2658 	 * Allocate buffer and copy the packet contents.  Because inside these
2659 	 * protocol modules, pool_read2 maybe called and modify its buffer
2660 	 * contents.
2661 	 */
2662 	if (len > 0)
2663 	{
2664 		contents = palloc(len);
2665 		memcpy(contents, bufp, len);
2666 	}
2667 	else
2668 	{
2669 		/*
2670 		 * Set dummy content if len <= 0. this happens only when protocol
2671 		 * version is 2.
2672 		 */
2673 		contents = palloc(1);
2674 		memcpy(contents, "", 1);
2675 	}
2676 
2677 	switch (fkind)
2678 	{
2679 			POOL_QUERY_CONTEXT *query_context;
2680 			char	   *query;
2681 			Node	   *node;
2682 
2683 		case 'X':				/* Terminate */
2684 			if (contents)
2685 				pfree(contents);
2686 			if (pool_config->log_client_messages)
2687 				ereport(LOG,
2688 						(errmsg("Terminate message from frontend.")));
2689 			ereport(DEBUG5,
2690 					(errmsg("Frontend terminated"),
2691 					 errdetail("received message kind 'X' from frontend")));
2692 			return POOL_END;
2693 
2694 		case 'Q':				/* Query */
2695 			allow_close_transaction = 1;
2696 			if (pool_config->log_client_messages)
2697 				ereport(LOG,
2698 						(errmsg("Query message from frontend."),
2699 						 errdetail("query: \"%s\"", contents)));
2700 			status = SimpleQuery(frontend, backend, len, contents);
2701 			break;
2702 
2703 		case 'E':				/* Execute */
2704 			allow_close_transaction = 1;
2705 			pool_set_doing_extended_query_message();
2706 			if (!pool_is_query_in_progress() && !pool_is_ignore_till_sync())
2707 				pool_set_query_in_progress();
2708 			status = Execute(frontend, backend, len, contents);
2709 			break;
2710 
2711 		case 'P':				/* Parse */
2712 			allow_close_transaction = 0;
2713 			pool_set_doing_extended_query_message();
2714 			status = Parse(frontend, backend, len, contents);
2715 			break;
2716 
2717 		case 'B':				/* Bind */
2718 			pool_set_doing_extended_query_message();
2719 			status = Bind(frontend, backend, len, contents);
2720 			break;
2721 
2722 		case 'C':				/* Close */
2723 			pool_set_doing_extended_query_message();
2724 			if (!pool_is_query_in_progress() && !pool_is_ignore_till_sync())
2725 				pool_set_query_in_progress();
2726 			status = Close(frontend, backend, len, contents);
2727 			break;
2728 
2729 		case 'D':				/* Describe */
2730 			pool_set_doing_extended_query_message();
2731 			status = Describe(frontend, backend, len, contents);
2732 			break;
2733 
2734 		case 'S':				/* Sync */
2735 			if (pool_config->log_client_messages)
2736 				ereport(LOG,
2737 						(errmsg("Sync message from frontend.")));
2738 			pool_set_doing_extended_query_message();
2739 			if (pool_is_ignore_till_sync())
2740 				pool_unset_ignore_till_sync();
2741 
2742 			if (SL_MODE)
2743 			{
2744 				POOL_PENDING_MESSAGE *msg;
2745 
2746 				pool_unset_query_in_progress();
2747 				msg = pool_pending_message_create('S', 0, NULL);
2748 				pool_pending_message_add(msg);
2749 				pool_pending_message_free_pending_message(msg);
2750 			}
2751 			else if (!pool_is_query_in_progress())
2752 				pool_set_query_in_progress();
2753 			status = SimpleForwardToBackend(fkind, frontend, backend, len, contents);
2754 
2755 			if (SL_MODE)
2756 			{
2757 				/*
2758 				 * From now on suspend to read from frontend until we receive
2759 				 * ready for query message from backend.
2760 				 */
2761 				pool_set_suspend_reading_from_frontend();
2762 			}
2763 			break;
2764 
2765 		case 'F':				/* FunctionCall */
2766 			if (pool_config->log_client_messages)
2767 			{
2768 				int			oid;
2769 
2770 				memcpy(&oid, contents, sizeof(int));
2771 				ereport(LOG,
2772 						(errmsg("FunctionCall message from frontend."),
2773 						 errdetail("oid: \"%d\"", ntohl(oid))));
2774 			}
2775 
2776 			/*
2777 			 * Create dummy query context as if it were an INSERT.
2778 			 */
2779 			query_context = pool_init_query_context();
2780 			query = "INSERT INTO foo VALUES(1)";
2781 			MemoryContext old_context = MemoryContextSwitchTo(query_context->memory_context);
2782 
2783 			node = get_dummy_insert_query_node();
2784 			pool_start_query(query_context, query, strlen(query) + 1, node);
2785 
2786 			MemoryContextSwitchTo(old_context);
2787 
2788 			pool_where_to_send(query_context, query_context->original_query,
2789 							   query_context->parse_tree);
2790 
2791 			if (MAJOR(backend) == PROTO_MAJOR_V3)
2792 				status = FunctionCall3(frontend, backend, len, contents);
2793 			else
2794 				status = FunctionCall(frontend, backend);
2795 
2796 			break;
2797 
2798 		case 'c':				/* CopyDone */
2799 		case 'd':				/* CopyData */
2800 		case 'f':				/* CopyFail */
2801 		case 'H':				/* Flush */
2802 			if (fkind == 'H' && pool_config->log_client_messages)
2803 				ereport(LOG,
2804 						(errmsg("Flush message from frontend.")));
2805 			if (MAJOR(backend) == PROTO_MAJOR_V3)
2806 			{
2807 				if (fkind == 'H')
2808 				{
2809 					pool_set_doing_extended_query_message();
2810 				}
2811 				status = SimpleForwardToBackend(fkind, frontend, backend, len, contents);
2812 
2813 				/*
2814 				 * After flush message received, extended query mode should be
2815 				 * continued.
2816 				 */
2817 				if (fkind != 'H' && pool_is_doing_extended_query_message())
2818 				{
2819 					pool_unset_doing_extended_query_message();
2820 				}
2821 
2822 				break;
2823 			}
2824 
2825 		default:
2826 			ereport(FATAL,
2827 					(return_code(2),
2828 					 errmsg("unable to process frontend response"),
2829 					 errdetail("unknown message type %c(%02x)", fkind, fkind)));
2830 
2831 	}
2832 	if (contents)
2833 		pfree(contents);
2834 
2835 	if (status != POOL_CONTINUE)
2836 		ereport(FATAL,
2837 				(return_code(2),
2838 				 errmsg("unable to process frontend response")));
2839 
2840 	return status;
2841 }
2842 
2843 POOL_STATUS
ProcessBackendResponse(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend,int * state,short * num_fields)2844 ProcessBackendResponse(POOL_CONNECTION * frontend,
2845 					   POOL_CONNECTION_POOL * backend,
2846 					   int *state, short *num_fields)
2847 {
2848 	int			status = POOL_CONTINUE;
2849 	char		kind;
2850 
2851 	/* Get session context */
2852 	pool_get_session_context(false);
2853 
2854 	if (pool_is_ignore_till_sync())
2855 	{
2856 		if (pool_is_query_in_progress())
2857 			pool_unset_query_in_progress();
2858 
2859 		/*
2860 		 * Check if we have pending data in backend connection cache. If we
2861 		 * do, it is likely that a sync message has been sent to backend and
2862 		 * the backend replied back to us. So we need to process it.
2863 		 */
2864 		if (is_backend_cache_empty(backend))
2865 		{
2866 			return POOL_CONTINUE;
2867 		}
2868 	}
2869 
2870 	if (pool_is_skip_reading_from_backends())
2871 	{
2872 		pool_unset_skip_reading_from_backends();
2873 		return POOL_CONTINUE;
2874 	}
2875 
2876 	read_kind_from_backend(frontend, backend, &kind);
2877 
2878 	/*
2879 	 * Sanity check
2880 	 */
2881 	if (kind == 0)
2882 	{
2883 		ereport(FATAL,
2884 				(return_code(2),
2885 				 errmsg("unable to process backend response"),
2886 				 errdetail("invalid message kind sent by backend connection")));
2887 	}
2888 	ereport(DEBUG5,
2889 			(errmsg("processing backend response"),
2890 			 errdetail("received kind '%c'(%02x) from backend", kind, kind)));
2891 
2892 
2893 	if (MAJOR(backend) == PROTO_MAJOR_V3)
2894 	{
2895 		switch (kind)
2896 		{
2897 			case 'G':			/* CopyInResponse */
2898 				status = CopyInResponse(frontend, backend);
2899 				break;
2900 
2901 			case 'S':			/* ParameterStatus */
2902 				status = ParameterStatus(frontend, backend);
2903 				break;
2904 
2905 			case 'Z':			/* ReadyForQuery */
2906 				ereport(DEBUG5,
2907 						(errmsg("processing backend response"),
2908 						 errdetail("Ready For Query received")));
2909 				pool_unset_suspend_reading_from_frontend();
2910 				status = ReadyForQuery(frontend, backend, true, true);
2911 #ifdef DEBUG
2912 				extern bool stop_now;
2913 
2914 				if (stop_now)
2915 					exit(0);
2916 #endif
2917 				break;
2918 
2919 			case '1':			/* ParseComplete */
2920 				if (SL_MODE)
2921 				{
2922 					POOL_PENDING_MESSAGE *pmsg;
2923 
2924 					pmsg = pool_pending_message_get_previous_message();
2925 					if (pmsg && pmsg->not_forward_to_frontend)
2926 					{
2927 						/*
2928 						 * parse_before_bind() was called. Do not forward the
2929 						 * parse complete message to frontend.
2930 						 */
2931 						ereport(DEBUG5,
2932 								(errmsg("processing backend response"),
2933 								 errdetail("do not forward parse complete message to frontend")));
2934 						pool_discard_packet_contents(backend);
2935 						pool_unset_query_in_progress();
2936 						pool_set_command_success();
2937 						status = POOL_CONTINUE;
2938 						break;
2939 					}
2940 				}
2941 				status = ParseComplete(frontend, backend);
2942 				pool_set_command_success();
2943 				pool_unset_query_in_progress();
2944 				break;
2945 
2946 			case '2':			/* BindComplete */
2947 				status = BindComplete(frontend, backend);
2948 				pool_set_command_success();
2949 				pool_unset_query_in_progress();
2950 				break;
2951 
2952 			case '3':			/* CloseComplete */
2953 				if (SL_MODE)
2954 				{
2955 					POOL_PENDING_MESSAGE *pmsg;
2956 
2957 					pmsg = pool_pending_message_get_previous_message();
2958 					if (pmsg && pmsg->not_forward_to_frontend)
2959 					{
2960 						/*
2961 						 * parse_before_bind() was called. Do not forward the
2962 						 * close complete message to frontend.
2963 						 */
2964 						ereport(DEBUG5,
2965 								(errmsg("processing backend response"),
2966 								 errdetail("do not forward close complete message to frontend")));
2967 						pool_discard_packet_contents(backend);
2968 
2969 						/*
2970 						 * Remove pending message here because
2971 						 * read_kind_from_backend() did not do it.
2972 						 */
2973 						pmsg = pool_pending_message_pull_out();
2974 						pool_pending_message_free_pending_message(pmsg);
2975 
2976 						if (pool_is_doing_extended_query_message())
2977 							pool_unset_query_in_progress();
2978 						pool_set_command_success();
2979 						status = POOL_CONTINUE;
2980 						break;
2981 					}
2982 				}
2983 
2984 				status = CloseComplete(frontend, backend);
2985 				pool_set_command_success();
2986 				if (pool_is_doing_extended_query_message())
2987 					pool_unset_query_in_progress();
2988 				break;
2989 
2990 			case 'E':			/* ErrorResponse */
2991 				if (pool_is_doing_extended_query_message())
2992 				{
2993 					char	*message;
2994 
2995 					/* Log the error message which was possibly missed till
2996 					 * a sync message was sent */
2997 					if (pool_extract_error_message(false, MAIN(backend), PROTO_MAJOR_V3,
2998 												   true, &message) == 1)
2999 					{
3000 						ereport(LOG,
3001 								(errmsg("Error message from backend: DB node id: %d message: \"%s\"",
3002 										MAIN_NODE_ID, message)));
3003 						pfree(message);
3004 					}
3005 				}
3006 
3007 				/* Forward the error message to frontend */
3008 				status = ErrorResponse3(frontend, backend);
3009 				pool_unset_command_success();
3010 				if (TSTATE(backend, MAIN_REPLICA ? PRIMARY_NODE_ID :
3011 						   REAL_MAIN_NODE_ID) != 'I')
3012 				{
3013 					pool_set_failed_transaction();
3014 
3015 					/* Remove ongoing CREATE/DROP temp tables */
3016 					pool_temp_tables_remove_pending();
3017 				}
3018 				if (pool_is_doing_extended_query_message())
3019 				{
3020 					pool_set_ignore_till_sync();
3021 					pool_unset_query_in_progress();
3022 					pool_unset_suspend_reading_from_frontend();
3023 					if (SL_MODE)
3024 						pool_discard_except_sync_and_ready_for_query(frontend, backend);
3025 				}
3026 				break;
3027 
3028 			case 'C':			/* CommandComplete */
3029 				status = CommandComplete(frontend, backend, true);
3030 				pool_set_command_success();
3031 				if (pool_is_doing_extended_query_message())
3032 					pool_unset_query_in_progress();
3033 				break;
3034 
3035 			case 't':			/* ParameterDescription */
3036 				status = ParameterDescription(frontend, backend);
3037 				break;
3038 
3039 			case 'I':			/* EmptyQueryResponse */
3040 				status = CommandComplete(frontend, backend, false);
3041 
3042 				/*
3043 				 * Empty query response message should be treated same as
3044 				 * Command complete message. When we receive the Command
3045 				 * complete message, we unset the query in progress flag if
3046 				 * operated in streaming replication mode. So we unset the
3047 				 * flag as well. See bug 190 for more details.
3048 				 */
3049 				if (pool_is_doing_extended_query_message())
3050 					pool_unset_query_in_progress();
3051 				break;
3052 
3053 			case 'T':			/* RowDescription */
3054 				status = SimpleForwardToFrontend(kind, frontend, backend);
3055 				if (pool_is_doing_extended_query_message())
3056 					pool_unset_query_in_progress();
3057 				break;
3058 
3059 			case 'n':			/* NoData */
3060 				status = SimpleForwardToFrontend(kind, frontend, backend);
3061 				if (pool_is_doing_extended_query_message())
3062 					pool_unset_query_in_progress();
3063 				break;
3064 
3065 			case 's':			/* PortalSuspended */
3066 				status = SimpleForwardToFrontend(kind, frontend, backend);
3067 				if (pool_is_doing_extended_query_message())
3068 					pool_unset_query_in_progress();
3069 				break;
3070 
3071 			default:
3072 				status = SimpleForwardToFrontend(kind, frontend, backend);
3073 				break;
3074 		}
3075 	}
3076 	else
3077 	{
3078 		switch (kind)
3079 		{
3080 			case 'A':			/* NotificationResponse */
3081 				status = NotificationResponse(frontend, backend);
3082 				break;
3083 
3084 			case 'B':			/* BinaryRow */
3085 				status = BinaryRow(frontend, backend, *num_fields);
3086 				break;
3087 
3088 			case 'C':			/* CompletedResponse */
3089 				status = CompletedResponse(frontend, backend);
3090 				break;
3091 
3092 			case 'D':			/* AsciiRow */
3093 				status = AsciiRow(frontend, backend, *num_fields);
3094 				break;
3095 
3096 			case 'E':			/* ErrorResponse */
3097 				status = ErrorResponse(frontend, backend);
3098 				if (TSTATE(backend, MAIN_REPLICA ? PRIMARY_NODE_ID :
3099 						   REAL_MAIN_NODE_ID) != 'I')
3100 					pool_set_failed_transaction();
3101 				break;
3102 
3103 			case 'G':			/* CopyInResponse */
3104 				status = CopyInResponse(frontend, backend);
3105 				break;
3106 
3107 			case 'H':			/* CopyOutResponse */
3108 				status = CopyOutResponse(frontend, backend);
3109 				break;
3110 
3111 			case 'I':			/* EmptyQueryResponse */
3112 				EmptyQueryResponse(frontend, backend);
3113 				break;
3114 
3115 			case 'N':			/* NoticeResponse */
3116 				NoticeResponse(frontend, backend);
3117 				break;
3118 
3119 			case 'P':			/* CursorResponse */
3120 				status = CursorResponse(frontend, backend);
3121 				break;
3122 
3123 			case 'T':			/* RowDescription */
3124 				status = RowDescription(frontend, backend, num_fields);
3125 				break;
3126 
3127 			case 'V':			/* FunctionResultResponse and
3128 								 * FunctionVoidResponse */
3129 				status = FunctionResultResponse(frontend, backend);
3130 				break;
3131 
3132 			case 'Z':			/* ReadyForQuery */
3133 				status = ReadyForQuery(frontend, backend, true, true);
3134 				break;
3135 
3136 			default:
3137 				ereport(FATAL,
3138 						(return_code(1),
3139 						 errmsg("Unknown message type %c(%02x)", kind, kind)));
3140 		}
3141 	}
3142 
3143 	/*
3144 	 * Do we receive ready for query while processing reset request?
3145 	 */
3146 	if (kind == 'Z' && frontend->no_forward && *state == 1)
3147 	{
3148 		*state = 0;
3149 	}
3150 
3151 	if (status != POOL_CONTINUE)
3152 		ereport(FATAL,
3153 				(return_code(2),
3154 				 errmsg("unable to process backend response for message kind '%c'", kind)));
3155 
3156 	return status;
3157 }
3158 
3159 POOL_STATUS
CopyInResponse(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend)3160 CopyInResponse(POOL_CONNECTION * frontend,
3161 			   POOL_CONNECTION_POOL * backend)
3162 {
3163 	POOL_STATUS status;
3164 
3165 	/* forward to the frontend */
3166 	if (MAJOR(backend) == PROTO_MAJOR_V3)
3167 	{
3168 		SimpleForwardToFrontend('G', frontend, backend);
3169 		pool_flush(frontend);
3170 	}
3171 	else
3172 		pool_write_and_flush(frontend, "G", 1);
3173 
3174 	status = CopyDataRows(frontend, backend, 1);
3175 	return status;
3176 }
3177 
3178 POOL_STATUS
CopyOutResponse(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend)3179 CopyOutResponse(POOL_CONNECTION * frontend,
3180 				POOL_CONNECTION_POOL * backend)
3181 {
3182 	POOL_STATUS status;
3183 
3184 	/* forward to the frontend */
3185 	if (MAJOR(backend) == PROTO_MAJOR_V3)
3186 	{
3187 		SimpleForwardToFrontend('H', frontend, backend);
3188 		pool_flush(frontend);
3189 	}
3190 	else
3191 		pool_write_and_flush(frontend, "H", 1);
3192 
3193 	status = CopyDataRows(frontend, backend, 0);
3194 	return status;
3195 }
3196 
3197 POOL_STATUS
CopyDataRows(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend,int copyin)3198 CopyDataRows(POOL_CONNECTION * frontend,
3199 			 POOL_CONNECTION_POOL * backend, int copyin)
3200 {
3201 	char	   *string = NULL;
3202 	int			len;
3203 	int			i;
3204 	int			copy_count;
3205 
3206 #ifdef DEBUG
3207 	int			j = 0;
3208 	char		buf[1024];
3209 #endif
3210 
3211 	copy_count = 0;
3212 	for (;;)
3213 	{
3214 		if (copyin)
3215 		{
3216 			if (MAJOR(backend) == PROTO_MAJOR_V3)
3217 			{
3218 				char		kind;
3219 				char	   *contents = NULL;
3220 
3221 				pool_read(frontend, &kind, 1);
3222 
3223 				ereport(DEBUG5,
3224 						(errmsg("copy data rows"),
3225 						 errdetail("read kind from frontend %c(%02x)", kind, kind)));
3226 
3227 				pool_read(frontend, &len, sizeof(len));
3228 				len = ntohl(len) - 4;
3229 				if (len > 0)
3230 					contents = pool_read2(frontend, len);
3231 
3232 				SimpleForwardToBackend(kind, frontend, backend, len, contents);
3233 
3234 				/* CopyData? */
3235 				if (kind == 'd')
3236 				{
3237 					copy_count++;
3238 					continue;
3239 				}
3240 				else
3241 				{
3242 					if (pool_config->log_client_messages && copy_count != 0)
3243 						ereport(LOG,
3244 								(errmsg("CopyData message from frontend."),
3245 								 errdetail("count: %d", copy_count)));
3246 					if (kind == 'c' && pool_config->log_client_messages)
3247 						ereport(LOG,
3248 								(errmsg("CopyDone message from frontend.")));
3249 					if (kind == 'f' && pool_config->log_client_messages)
3250 						ereport(LOG,
3251 								(errmsg("CopyFail message from frontend.")));
3252 					ereport(DEBUG1,
3253 							(errmsg("copy data rows"),
3254 							 errdetail("invalid copyin kind. expected 'd' got '%c'", kind)));
3255 					break;
3256 				}
3257 			}
3258 			else
3259 				string = pool_read_string(frontend, &len, 1);
3260 		}
3261 		else
3262 		{
3263 			/* CopyOut */
3264 			if (MAJOR(backend) == PROTO_MAJOR_V3)
3265 			{
3266 				signed char kind;
3267 
3268 				kind = pool_read_kind(backend);
3269 
3270 				SimpleForwardToFrontend(kind, frontend, backend);
3271 
3272 				/* CopyData? */
3273 				if (kind == 'd')
3274 					continue;
3275 				else
3276 					break;
3277 			}
3278 			else
3279 			{
3280 				for (i = 0; i < NUM_BACKENDS; i++)
3281 				{
3282 					if (VALID_BACKEND(i))
3283 					{
3284 						string = pool_read_string(CONNECTION(backend, i), &len, 1);
3285 					}
3286 				}
3287 			}
3288 		}
3289 
3290 		if (string == NULL)
3291 			ereport(FATAL,
3292 					(errmsg("unable to copy data rows"),
3293 					 errdetail("cannot read string message from backend")));
3294 
3295 #ifdef DEBUG
3296 		strlcpy(buf, string, sizeof(buf));
3297 		ereport(DEBUG1,
3298 				(errmsg("copy data rows"),
3299 				 errdetail("copy line %d %d bytes :%s:", j++, len, buf)));
3300 #endif
3301 
3302 		if (copyin)
3303 		{
3304 			for (i = 0; i < NUM_BACKENDS; i++)
3305 			{
3306 				if (VALID_BACKEND(i))
3307 				{
3308 					pool_write(CONNECTION(backend, i), string, len);
3309 				}
3310 			}
3311 		}
3312 		else
3313 			pool_write(frontend, string, len);
3314 
3315 		if (len == PROTO_MAJOR_V3)
3316 		{
3317 			/* end of copy? */
3318 			if (string[0] == '\\' &&
3319 				string[1] == '.' &&
3320 				string[2] == '\n')
3321 			{
3322 				break;
3323 			}
3324 		}
3325 	}
3326 
3327 	/*
3328 	 * Wait till backend responds
3329 	 */
3330 	if (copyin)
3331 	{
3332 		for (i = 0; i < NUM_BACKENDS; i++)
3333 		{
3334 			if (VALID_BACKEND(i))
3335 			{
3336 				pool_flush(CONNECTION(backend, i));
3337 
3338 				/*
3339 				 * Check response from the backend.  First check SSL and read
3340 				 * buffer of the backend. It is possible that there's an error
3341 				 * message in the buffer if the COPY command went wrong.
3342 				 * Otherwise wait for data arrival to the backend socket.
3343 				 */
3344 				if (!pool_ssl_pending(CONNECTION(backend, i)) &&
3345 					pool_read_buffer_is_empty(CONNECTION(backend, i)) &&
3346 					synchronize(CONNECTION(backend, i)))
3347 					ereport(FATAL,
3348 							(return_code(2),
3349 							 errmsg("unable to copy data rows"),
3350 							 errdetail("failed to synchronize")));
3351 			}
3352 		}
3353 	}
3354 	else
3355 		pool_flush(frontend);
3356 
3357 	return POOL_CONTINUE;
3358 }
3359 
3360 /*
3361  * This function raises intentional error to make backends the same
3362  * transaction state.
3363  */
3364 void
raise_intentional_error_if_need(POOL_CONNECTION_POOL * backend)3365 raise_intentional_error_if_need(POOL_CONNECTION_POOL * backend)
3366 {
3367 	int			i;
3368 	POOL_SESSION_CONTEXT *session_context;
3369 	POOL_QUERY_CONTEXT *query_context;
3370 
3371 	/* Get session context */
3372 	session_context = pool_get_session_context(false);
3373 
3374 	query_context = session_context->query_context;
3375 
3376 	if (MAIN_REPLICA &&
3377 		TSTATE(backend, PRIMARY_NODE_ID) == 'T' &&
3378 		PRIMARY_NODE_ID != MAIN_NODE_ID &&
3379 		query_context &&
3380 		is_select_query(query_context->parse_tree, query_context->original_query))
3381 	{
3382 		pool_set_node_to_be_sent(query_context, PRIMARY_NODE_ID);
3383 		if (pool_is_doing_extended_query_message())
3384 		{
3385 			do_error_execute_command(backend, PRIMARY_NODE_ID, PROTO_MAJOR_V3);
3386 		}
3387 		else
3388 		{
3389 			do_error_command(CONNECTION(backend, PRIMARY_NODE_ID), MAJOR(backend));
3390 		}
3391 		ereport(DEBUG1,
3392 				(errmsg("raising intentional error"),
3393 				 errdetail("generating intentional error to sync backends transaction states")));
3394 	}
3395 
3396 	if (REPLICATION &&
3397 		TSTATE(backend, REAL_MAIN_NODE_ID) == 'T' &&
3398 		!pool_config->replicate_select &&
3399 		query_context &&
3400 		is_select_query(query_context->parse_tree, query_context->original_query))
3401 	{
3402 		for (i = 0; i < NUM_BACKENDS; i++)
3403 		{
3404 			/*
3405 			 * Send a syntax error query to all backends except the node which
3406 			 * the original query was sent.
3407 			 */
3408 			if (pool_is_node_to_be_sent(query_context, i))
3409 				continue;
3410 			else
3411 				pool_set_node_to_be_sent(query_context, i);
3412 
3413 			if (VALID_BACKEND(i))
3414 			{
3415 				/*
3416 				 * We must abort transaction to sync transaction state. If the
3417 				 * error was caused by an Execute message, we must send
3418 				 * invalid Execute message to abort transaction.
3419 				 *
3420 				 * Because extended query protocol ignores all messages before
3421 				 * receiving Sync message inside error state.
3422 				 */
3423 				if (pool_is_doing_extended_query_message())
3424 				{
3425 					do_error_execute_command(backend, i, PROTO_MAJOR_V3);
3426 				}
3427 				else
3428 				{
3429 					do_error_command(CONNECTION(backend, i), MAJOR(backend));
3430 				}
3431 			}
3432 		}
3433 	}
3434 }
3435 
3436 /*---------------------------------------------------
3437  * Check various errors from backend.  return values:
3438  *  0: no error
3439  *  1: deadlock detected
3440  *  2: serialization error detected
3441  *  3: query cancel
3442  * detected: 4
3443  *---------------------------------------------------
3444  */
3445 static int
check_errors(POOL_CONNECTION_POOL * backend,int backend_id)3446 check_errors(POOL_CONNECTION_POOL * backend, int backend_id)
3447 {
3448 
3449 	/*
3450 	 * Check dead lock error on the main node and abort transactions on all
3451 	 * nodes if so.
3452 	 */
3453 	if (detect_deadlock_error(CONNECTION(backend, backend_id), MAJOR(backend)) == SPECIFIED_ERROR)
3454 		return 1;
3455 
3456 	/*-----------------------------------------------------------------------------------
3457 	 * Check serialization failure error and abort
3458 	 * transactions on all nodes if so. Otherwise we allow
3459 	 * data inconsistency among DB nodes. See following
3460 	 * scenario: (M:main, S:replica)
3461 	 *
3462 	 * M:S1:BEGIN;
3463 	 * M:S2:BEGIN;
3464 	 * S:S1:BEGIN;
3465 	 * S:S2:BEGIN;
3466 	 * M:S1:SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
3467 	 * M:S2:SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
3468 	 * S:S1:SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
3469 	 * S:S2:SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
3470 	 * M:S1:UPDATE t1 SET i = i + 1;
3471 	 * S:S1:UPDATE t1 SET i = i + 1;
3472 	 * M:S2:UPDATE t1 SET i = i + 1; <-- blocked
3473 	 * S:S1:COMMIT;
3474 	 * M:S1:COMMIT;
3475 	 * M:S2:ERROR:  could not serialize access due to concurrent update
3476 	 * S:S2:UPDATE t1 SET i = i + 1; <-- success in UPDATE and data becomes inconsistent!
3477 	 *-----------------------------------------------------------------------------------
3478 	 */
3479 	if (detect_serialization_error(CONNECTION(backend, backend_id), MAJOR(backend), true) == SPECIFIED_ERROR)
3480 		return 2;
3481 
3482 	/*-------------------------------------------------------------------------------
3483 	 * check "SET TRANSACTION ISOLATION LEVEL must be called before any query" error.
3484 	 * This happens in following scenario:
3485 	 *
3486 	 * M:S1:BEGIN;
3487 	 * S:S1:BEGIN;
3488 	 * M:S1:SELECT 1; <-- only sent to MAIN
3489 	 * M:S1:SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
3490 	 * S:S1:SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
3491 	 * M: <-- error
3492 	 * S: <-- ok since no previous SELECT is sent. kind mismatch error occurs!
3493 	 *-------------------------------------------------------------------------------
3494 	 */
3495 
3496 	if (detect_active_sql_transaction_error(CONNECTION(backend, backend_id), MAJOR(backend)) == SPECIFIED_ERROR)
3497 		return 3;
3498 
3499 	/* check query cancel error */
3500 	if (detect_query_cancel_error(CONNECTION(backend, backend_id), MAJOR(backend)) == SPECIFIED_ERROR)
3501 		return 4;
3502 
3503 	return 0;
3504 }
3505 
3506 static void
generate_error_message(char * prefix,int specific_error,char * query)3507 generate_error_message(char *prefix, int specific_error, char *query)
3508 {
3509 	POOL_SESSION_CONTEXT *session_context;
3510 
3511 	session_context = pool_get_session_context(true);
3512 	if (!session_context)
3513 		return;
3514 
3515 	static char *error_messages[] = {
3516 		"received deadlock error message from main node. query: %s",
3517 		"received serialization failure error message from main node. query: %s",
3518 		"received SET TRANSACTION ISOLATION LEVEL must be called before any query error. query: %s",
3519 		"received query cancel error message from main node. query: %s"
3520 	};
3521 
3522 	String	   *msg;
3523 
3524 	if (specific_error < 1 || specific_error > sizeof(error_messages) / sizeof(char *))
3525 	{
3526 		ereport(LOG,
3527 				(errmsg("generate_error_message: invalid specific_error: %d", specific_error)));
3528 		return;
3529 	}
3530 
3531 	specific_error--;
3532 
3533 	msg = init_string(prefix);
3534 	string_append_char(msg, error_messages[specific_error]);
3535 	ereport(LOG,
3536 			(errmsg(msg->data, query)));
3537 	free_string(msg);
3538 }
3539 
3540 /*
3541  * Make per DB node statement log
3542  */
3543 void
per_node_statement_log(POOL_CONNECTION_POOL * backend,int node_id,char * query)3544 per_node_statement_log(POOL_CONNECTION_POOL * backend, int node_id, char *query)
3545 {
3546 	POOL_CONNECTION_POOL_SLOT *slot = backend->slots[node_id];
3547 
3548 	if (pool_config->log_per_node_statement)
3549 		ereport(LOG,
3550 				(errmsg("DB node id: %d backend pid: %d statement: %s", node_id, ntohl(slot->pid), query)));
3551 }
3552 
3553 /*
3554  * Check kind and produce error message
3555  * All data read in this function is returned to stream.
3556  */
3557 void
per_node_error_log(POOL_CONNECTION_POOL * backend,int node_id,char * query,char * prefix,bool unread)3558 per_node_error_log(POOL_CONNECTION_POOL * backend, int node_id, char *query, char *prefix, bool unread)
3559 {
3560 	POOL_CONNECTION_POOL_SLOT *slot = backend->slots[node_id];
3561 	char	   *message;
3562 	char	   kind;
3563 
3564 	pool_read(CONNECTION(backend, node_id), &kind, sizeof(kind));
3565 	pool_unread(CONNECTION(backend, node_id), &kind, sizeof(kind));
3566 
3567 	if (kind != 'E' && kind != 'N')
3568 	{
3569 		return;
3570 	}
3571 
3572 	if (pool_extract_error_message(true, CONNECTION(backend, node_id), MAJOR(backend), unread, &message) == 1)
3573 	{
3574 		ereport(LOG,
3575 				(errmsg("%s: DB node id: %d backend pid: %d statement: \"%s\" message: \"%s\"",
3576 						prefix, node_id, ntohl(slot->pid), query, message)));
3577 		pfree(message);
3578 	}
3579 }
3580 
3581 /*
3582  * Send parse message to primary/main node and wait for reply if particular
3583  * message is not yet parsed on the primary/main node but parsed on other
3584  * node. Caller must provide the parse message data as "message".
3585  */
parse_before_bind(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend,POOL_SENT_MESSAGE * message,POOL_SENT_MESSAGE * bind_message)3586 static POOL_STATUS parse_before_bind(POOL_CONNECTION * frontend,
3587 									 POOL_CONNECTION_POOL * backend,
3588 									 POOL_SENT_MESSAGE * message,
3589 									 POOL_SENT_MESSAGE * bind_message)
3590 {
3591 	int			i;
3592 	int			len = message->len;
3593 	char		kind = '\0';
3594 	char	   *contents = message->contents;
3595 	bool		parse_was_sent = false;
3596 	bool		backup[MAX_NUM_BACKENDS];
3597 	POOL_QUERY_CONTEXT *qc = message->query_context;
3598 
3599 	memcpy(backup, qc->where_to_send, sizeof(qc->where_to_send));
3600 
3601 	if (SL_MODE)
3602 	{
3603 		if (message->kind == 'P' && qc->where_to_send[PRIMARY_NODE_ID] == 0)
3604 		{
3605 			POOL_PENDING_MESSAGE *pmsg;
3606 			POOL_QUERY_CONTEXT *new_qc;
3607 			char		message_body[1024];
3608 			int			offset;
3609 			int			message_len;
3610 
3611 			/*
3612 			 * we are in streaming replication mode and the parse message has
3613 			 * not been sent to primary yet
3614 			 */
3615 
3616 			/* Prepare modified query context */
3617 			new_qc = pool_query_context_shallow_copy(qc);
3618 			memset(new_qc->where_to_send, 0, sizeof(new_qc->where_to_send));
3619 			new_qc->where_to_send[PRIMARY_NODE_ID] = 1;
3620 			new_qc->virtual_main_node_id = PRIMARY_NODE_ID;
3621 			new_qc->load_balance_node_id = PRIMARY_NODE_ID;
3622 
3623 			/*
3624 			 * Before sending the parse message to the primary, we need to
3625 			 * close the named statement. Otherwise we will get an error from
3626 			 * backend if the named statement already exists. This could
3627 			 * happened if parse_before_bind is called with a bind message
3628 			 * using same named statement. If the named statement does not
3629 			 * exist, it's fine. PostgreSQL just ignores a request trying to
3630 			 * close a non-existing statement. If the statement is unnamed
3631 			 * one, we do not need it because unnamed statement can be
3632 			 * overwritten anytime.
3633 			 */
3634 			message_body[0] = 'S';
3635 			offset = strlen(bind_message->contents) + 1;
3636 
3637 			ereport(DEBUG1,
3638 					(errmsg("parse before bind"),
3639 					 errdetail("close statement: %s", bind_message->contents + offset)));
3640 
3641 			if (bind_message->contents[offset] != '\0')
3642 			{
3643 				message_len = 1 + strlen(bind_message->contents + offset) + 1;
3644 				StrNCpy(message_body + 1, bind_message->contents + offset, sizeof(message_body) - 1);
3645 				pool_extended_send_and_wait(qc, "C", message_len, message_body, 1, PRIMARY_NODE_ID, false);
3646 				/* Add pending message */
3647 				pmsg = pool_pending_message_create('C', message_len, message_body);
3648 				pmsg->not_forward_to_frontend = true;
3649 				pool_pending_message_dest_set(pmsg, new_qc);
3650 				pool_pending_message_add(pmsg);
3651 				pool_pending_message_free_pending_message(pmsg);
3652 			}
3653 
3654 			/* Send parse message to primary node */
3655 			ereport(DEBUG1,
3656 					(errmsg("parse before bind"),
3657 					 errdetail("waiting for primary completing parse")));
3658 
3659 			pool_extended_send_and_wait(qc, "P", len, contents, 1, PRIMARY_NODE_ID, false);
3660 
3661 			/* Add pending message */
3662 			pmsg = pool_pending_message_create('P', len, contents);
3663 			pmsg->not_forward_to_frontend = true;
3664 			pool_pending_message_dest_set(pmsg, new_qc);
3665 			pool_pending_message_add(pmsg);
3666 			pool_pending_message_free_pending_message(pmsg);
3667 
3668 			/* Replace the query context of bind message */
3669 			bind_message->query_context = new_qc;
3670 
3671 #ifdef NOT_USED
3672 			/*
3673 			 * XXX 	pool_remove_sent_message() will pfree memory allocated by "contents".
3674 			 */
3675 
3676 			/* Remove old sent message */
3677 			pool_remove_sent_message('P', contents);
3678 			/* Create and add sent message of this parse message */
3679 			msg = pool_create_sent_message('P', len, contents, 0, contents, new_qc);
3680 			pool_add_sent_message(msg);
3681 #endif
3682 			/* Replace the query context of parse message */
3683 			message->query_context = new_qc;
3684 
3685 			return POOL_CONTINUE;
3686 		}
3687 		else
3688 		{
3689 			ereport(DEBUG1,
3690 					(errmsg("parse before bind"),
3691 					 errdetail("no need to re-send parse")));
3692 			return POOL_CONTINUE;
3693 		}
3694 	}
3695 	else
3696 	{
3697 		/* expect to send to main node only */
3698 		for (i = 0; i < NUM_BACKENDS; i++)
3699 		{
3700 			if (qc->where_to_send[i] && statecmp(qc->query_state[i], POOL_PARSE_COMPLETE) < 0)
3701 			{
3702 				ereport(DEBUG1,
3703 						(errmsg("parse before bind"),
3704 						 errdetail("waiting for backend %d completing parse", i)));
3705 
3706 				pool_extended_send_and_wait(qc, "P", len, contents, 1, i, false);
3707 			}
3708 			else
3709 			{
3710 				qc->where_to_send[i] = 0;
3711 			}
3712 		}
3713 	}
3714 
3715 	for (i = 0; i < NUM_BACKENDS; i++)
3716 	{
3717 		if (qc->where_to_send[i])
3718 		{
3719 			parse_was_sent = true;
3720 			break;
3721 		}
3722 	}
3723 
3724 	if (!SL_MODE && parse_was_sent)
3725 	{
3726 		pool_set_query_in_progress();
3727 
3728 		while (kind != '1')
3729 		{
3730 			PG_TRY();
3731 			{
3732 				read_kind_from_backend(frontend, backend, &kind);
3733 				pool_discard_packet_contents(backend);
3734 			}
3735 			PG_CATCH();
3736 			{
3737 				memcpy(qc->where_to_send, backup, sizeof(backup));
3738 				PG_RE_THROW();
3739 			}
3740 			PG_END_TRY();
3741 		}
3742 	}
3743 
3744 	memcpy(qc->where_to_send, backup, sizeof(backup));
3745 	return POOL_CONTINUE;
3746 }
3747 
3748 /*
3749  * Find victim nodes by "decide by majority" rule and returns array
3750  * of victim node ids. If no victim is found, return NULL.
3751  *
3752  * Arguments:
3753  * ntuples: Array of number of affected tuples. -1 represents down node.
3754  * nmembers: Number of elements in ntuples.
3755  * main_node: The main node id. Less than 0 means ignore this parameter.
3756  * number_of_nodes: Number of elements in victim nodes array.
3757  *
3758  * Note: If no one wins and main_node >= 0, winner would be the
3759  * main and other nodes who has same number of tuples as the main.
3760  *
3761  * Caution: Returned victim node array is allocated in static memory
3762  * of this function. Subsequent calls to this function will overwrite
3763  * the memory.
3764  */
3765 static int *
find_victim_nodes(int * ntuples,int nmembers,int main_node,int * number_of_nodes)3766 find_victim_nodes(int *ntuples, int nmembers, int main_node, int *number_of_nodes)
3767 {
3768 	static int	victim_nodes[MAX_NUM_BACKENDS];
3769 	static int	votes[MAX_NUM_BACKENDS];
3770 	int			maxvotes;
3771 	int			majority_ntuples;
3772 	int			me;
3773 	int			cnt;
3774 	int			healthy_nodes;
3775 	int			i,
3776 				j;
3777 
3778 	healthy_nodes = 0;
3779 	*number_of_nodes = 0;
3780 	maxvotes = 0;
3781 	majority_ntuples = 0;
3782 
3783 	for (i = 0; i < nmembers; i++)
3784 	{
3785 		me = ntuples[i];
3786 
3787 		/* Health node? */
3788 		if (me < 0)
3789 		{
3790 			votes[i] = -1;
3791 			continue;
3792 		}
3793 
3794 		healthy_nodes++;
3795 		votes[i] = 1;
3796 
3797 		for (j = 0; j < nmembers; j++)
3798 		{
3799 			if (i != j && me == ntuples[j])
3800 			{
3801 				votes[i]++;
3802 
3803 				if (votes[i] > maxvotes)
3804 				{
3805 					maxvotes = votes[i];
3806 					majority_ntuples = me;
3807 				}
3808 			}
3809 		}
3810 	}
3811 
3812 	/* Everyone is different */
3813 	if (maxvotes == 1)
3814 	{
3815 		/* Main node is specified? */
3816 		if (main_node < 0)
3817 			return NULL;
3818 
3819 		/*
3820 		 * If main node is specified, let it and others who has same ntuples
3821 		 * win.
3822 		 */
3823 		majority_ntuples = ntuples[main_node];
3824 	}
3825 	else
3826 	{
3827 		/* Find number of majority */
3828 		cnt = 0;
3829 		for (i = 0; i < nmembers; i++)
3830 		{
3831 			if (votes[i] == maxvotes)
3832 			{
3833 				cnt++;
3834 			}
3835 		}
3836 
3837 		if (cnt <= healthy_nodes / 2.0)
3838 		{
3839 			/* No one wins */
3840 
3841 			/* Main node is specified? */
3842 			if (main_node < 0)
3843 				return NULL;
3844 
3845 			/*
3846 			 * If main node is specified, let it and others who has same
3847 			 * ntuples win.
3848 			 */
3849 			majority_ntuples = ntuples[main_node];
3850 		}
3851 	}
3852 
3853 	/* Make victim nodes list */
3854 	for (i = 0; i < nmembers; i++)
3855 	{
3856 		if (ntuples[i] >= 0 && ntuples[i] != majority_ntuples)
3857 		{
3858 			victim_nodes[(*number_of_nodes)++] = i;
3859 		}
3860 	}
3861 
3862 	return victim_nodes;
3863 }
3864 
3865 
3866 /*
3867  * flatten_set_variable_args
3868  *		Given a parsenode List as emitted by the grammar for SET,
3869  *		convert to the flat string representation used by GUC.
3870  *
3871  * We need to be told the name of the variable the args are for, because
3872  * the flattening rules vary (ugh).
3873  *
3874  * The result is NULL if args is NIL (ie, SET ... TO DEFAULT), otherwise
3875  * a palloc'd string.
3876  */
3877 static char *
flatten_set_variable_args(const char * name,List * args)3878 flatten_set_variable_args(const char *name, List *args)
3879 {
3880 	StringInfoData buf;
3881 	ListCell   *l;
3882 
3883 	/* Fast path if just DEFAULT */
3884 	if (args == NIL)
3885 		return NULL;
3886 
3887 	initStringInfo(&buf);
3888 
3889 	/*
3890 	 * Each list member may be a plain A_Const node, or an A_Const within a
3891 	 * TypeCast; the latter case is supported only for ConstInterval arguments
3892 	 * (for SET TIME ZONE).
3893 	 */
3894 	foreach(l, args)
3895 	{
3896 		Node	   *arg = (Node *) lfirst(l);
3897 		char	   *val;
3898 		A_Const    *con;
3899 
3900 		if (l != list_head(args))
3901 			appendStringInfoString(&buf, ", ");
3902 
3903 		if (IsA(arg, TypeCast))
3904 		{
3905 			TypeCast   *tc = (TypeCast *) arg;
3906 
3907 			arg = tc->arg;
3908 		}
3909 
3910 		if (!IsA(arg, A_Const))
3911 			elog(ERROR, "unrecognized node type: %d", (int) nodeTag(arg));
3912 		con = (A_Const *) arg;
3913 
3914 		switch (nodeTag(&con->val))
3915 		{
3916 			case T_Integer:
3917 				appendStringInfo(&buf, "%d", intVal(&con->val));
3918 				break;
3919 			case T_Float:
3920 				/* represented as a string, so just copy it */
3921 				appendStringInfoString(&buf, strVal(&con->val));
3922 				break;
3923 			case T_String:
3924 				val = strVal(&con->val);
3925 				appendStringInfoString(&buf, val);
3926 				break;
3927 			default:
3928 				ereport(ERROR, (errmsg("unrecognized node type: %d",
3929 									   (int) nodeTag(&con->val))));
3930 				break;
3931 		}
3932 	}
3933 
3934 	return buf.data;
3935 }
3936 
3937 #ifdef NOT_USED
3938 /* Called when sync message is received.
3939  * Wait till ready for query received.
3940  */
3941 static void
pool_wait_till_ready_for_query(POOL_CONNECTION_POOL * backend)3942 pool_wait_till_ready_for_query(POOL_CONNECTION_POOL * backend)
3943 {
3944 	char		kind;
3945 	int			len;
3946 	int			poplen;
3947 	char	   *buf;
3948 	int			i;
3949 
3950 	for (i = 0; i < NUM_BACKENDS; i++)
3951 	{
3952 		if (VALID_BACKEND(i))
3953 		{
3954 			for (;;)
3955 			{
3956 				pool_read(CONNECTION(backend, i), &kind, sizeof(kind));
3957 				ereport(DEBUG5,
3958 						(errmsg("pool_wait_till_ready_for_query: kind: %c", kind)));
3959 				pool_push(CONNECTION(backend, i), &kind, sizeof(kind));
3960 				pool_read(CONNECTION(backend, i), &len, sizeof(len));
3961 				pool_push(CONNECTION(backend, i), &len, sizeof(len));
3962 				if ((ntohl(len) - sizeof(len)) > 0)
3963 				{
3964 					buf = pool_read2(CONNECTION(backend, i), ntohl(len) - sizeof(len));
3965 					pool_push(CONNECTION(backend, i), buf, ntohl(len) - sizeof(len));
3966 				}
3967 
3968 				if (kind == 'Z')	/* Ready for query? */
3969 				{
3970 					pool_pop(CONNECTION(backend, i), &poplen);
3971 					ereport(DEBUG5,
3972 							(errmsg("pool_wait_till_ready_for_query: backend:%d ready for query found. buffer length:%d",
3973 									i, CONNECTION(backend, i)->len)));
3974 					break;
3975 				}
3976 			}
3977 		}
3978 	}
3979 }
3980 #endif
3981 
3982 /*
3983  * Called when error response received in streaming replication mode and doing
3984  * extended query. Remove all pending messages and backend message buffer data
3985  * except POOL_SYNC pending message and ready for query.  If sync message is
3986  * not received yet, continue to read data from frontend until a sync message
3987  * is read.
3988  */
3989 static void
pool_discard_except_sync_and_ready_for_query(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend)3990 pool_discard_except_sync_and_ready_for_query(POOL_CONNECTION * frontend,
3991 											 POOL_CONNECTION_POOL * backend)
3992 {
3993 	POOL_PENDING_MESSAGE *pmsg;
3994 	int			i;
3995 
3996 	if (!pool_is_doing_extended_query_message() || !SL_MODE)
3997 		return;
3998 
3999 	/*
4000 	 * Check to see if we already received a sync message. If not, call
4001 	 * ProcessFrontendResponse() to get the sync message from client.
4002 	 */
4003 	pmsg = pool_pending_message_get(POOL_SYNC);
4004 	if (pmsg == NULL)
4005 	{
4006 		char		kind;
4007 		int			len;
4008 		POOL_PENDING_MESSAGE *msg;
4009 		char	   *contents = NULL;
4010 
4011 		for (;;)
4012 		{
4013 			pool_read(frontend, &kind, sizeof(kind));
4014 			pool_read(frontend, &len, sizeof(len));
4015 			len = ntohl(len) - sizeof(len);
4016 			if (len > 0)
4017 				contents = pool_read2(frontend, len);
4018 			if (kind == 'S')
4019 			{
4020 				msg = pool_pending_message_create('S', 0, NULL);
4021 				pool_pending_message_add(msg);
4022 				pool_pending_message_free_pending_message(msg);
4023 				SimpleForwardToBackend(kind, frontend, backend, len, contents);
4024 				break;
4025 			}
4026 		}
4027 	}
4028 	else
4029 		pool_pending_message_free_pending_message(pmsg);
4030 
4031 	/* Remove all pending messages except sync message */
4032 	do
4033 	{
4034 		pmsg = pool_pending_message_head_message();
4035 		if (pmsg && pmsg->type == POOL_SYNC)
4036 		{
4037 			ereport(DEBUG1,
4038 					(errmsg("Process backend response: sync pending message found after receiving error response")));
4039 			pool_unset_ignore_till_sync();
4040 			pool_pending_message_free_pending_message(pmsg);
4041 			break;
4042 		}
4043 		pool_pending_message_free_pending_message(pmsg);
4044 		pmsg = pool_pending_message_pull_out();
4045 		pool_pending_message_free_pending_message(pmsg);
4046 	}
4047 	while (pmsg);
4048 
4049 	pool_pending_message_reset_previous_message();
4050 
4051 	/* Discard read buffer except "Ready for query" */
4052 	for (i = 0; i < NUM_BACKENDS; i++)
4053 	{
4054 		if (VALID_BACKEND(i))
4055 		{
4056 			char		kind;
4057 			int			len;
4058 			int			sts;
4059 
4060 			while (!pool_read_buffer_is_empty(CONNECTION(backend, i)))
4061 			{
4062 				sts = pool_read(CONNECTION(backend, i), &kind, sizeof(kind));
4063 				if (sts < 0 || kind == '\0')
4064 				{
4065 					ereport(DEBUG1,
4066 							(errmsg("pool_discard_except_sync_and_ready_for_query: EOF detected while reading from backend: %d buffer length: %d sts: %d",
4067 									i, CONNECTION(backend, i)->len, sts)));
4068 					pool_unread(CONNECTION(backend, i), &kind, sizeof(kind));
4069 					break;
4070 				}
4071 
4072 				if (kind == 'Z')	/* Ready for query? */
4073 				{
4074 					pool_unread(CONNECTION(backend, i), &kind, sizeof(kind));
4075 					ereport(DEBUG1,
4076 							(errmsg("pool_discard_except_sync_and_ready_for_query: Ready for query found. backend:%d",
4077 									i)));
4078 					break;
4079 				}
4080 				else
4081 				{
4082 					/* Read and discard packet */
4083 					pool_read(CONNECTION(backend, i), &len, sizeof(len));
4084 					if ((ntohl(len) - sizeof(len)) > 0)
4085 					{
4086 						pool_read2(CONNECTION(backend, i), ntohl(len) - sizeof(len));
4087 					}
4088 					ereport(DEBUG1,
4089 							(errmsg("pool_discard_except_sync_and_ready_for_query: discarding packet %c (len:%lu) of backend:%d", kind, ntohl(len) - sizeof(len), i)));
4090 				}
4091 			}
4092 		}
4093 	}
4094 }
4095 
4096 /*
4097  * Handle misc treatment when a command successfully completed.
4098  * Preconditions: query is in progress. The command is succeeded.
4099  */
4100 void
pool_at_command_success(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend)4101 pool_at_command_success(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend)
4102 {
4103 	Node	   *node;
4104 	char	   *query;
4105 
4106 	/* Sanity checks */
4107 	if (!pool_is_query_in_progress())
4108 	{
4109 		ereport(ERROR,
4110 				(errmsg("pool_at_command_success: query is not in progress")));
4111 	}
4112 
4113 	if (!pool_is_command_success())
4114 	{
4115 		ereport(ERROR,
4116 				(errmsg("pool_at_command_success: command did not succeed")));
4117 	}
4118 
4119 	node = pool_get_parse_tree();
4120 
4121 	if (!node)
4122 	{
4123 		ereport(ERROR,
4124 				(errmsg("pool_at_command_success: no parse tree found")));
4125 	}
4126 
4127 	query = pool_get_query_string();
4128 
4129 	if (query == NULL)
4130 	{
4131 		ereport(ERROR,
4132 				(errmsg("pool_at_command_success: no query found")));
4133 	}
4134 
4135 	/*
4136 	 * If the query was BEGIN/START TRANSACTION, clear the history that we had
4137 	 * a writing command in the transaction and forget the transaction
4138 	 * isolation level.
4139 	 *
4140 	 * XXX If BEGIN is received while we are already in an explicit
4141 	 * transaction, the command *successes* (just with a NOTICE message). In
4142 	 * this case we lose "writing_transaction" etc. info.
4143 	 */
4144 	if (is_start_transaction_query(node))
4145 	{
4146 		if (pool_config->disable_load_balance_on_write != DLBOW_TRANS_TRANSACTION)
4147 			pool_unset_writing_transaction();
4148 
4149 		pool_unset_failed_transaction();
4150 		pool_unset_transaction_isolation();
4151 	}
4152 
4153 	/*
4154 	 * If the query was COMMIT/ABORT, clear the history that we had a writing
4155 	 * command in the transaction and forget the transaction isolation level.
4156 	 * This is necessary if succeeding transaction is not an explicit one.
4157 	 */
4158 	else if (is_commit_or_rollback_query(node))
4159 	{
4160 		if (pool_config->disable_load_balance_on_write != DLBOW_TRANS_TRANSACTION)
4161 			pool_unset_writing_transaction();
4162 
4163 		pool_unset_failed_transaction();
4164 		pool_unset_transaction_isolation();
4165 	}
4166 
4167 	/*
4168 	 * SET TRANSACTION ISOLATION LEVEL SERIALIZABLE or SET SESSION
4169 	 * CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL SERIALIZABLE, remember
4170 	 * it.
4171 	 */
4172 	else if (is_set_transaction_serializable(node))
4173 	{
4174 		pool_set_transaction_isolation(POOL_SERIALIZABLE);
4175 	}
4176 
4177 	/*
4178 	 * If 2PC commands has been executed, automatically close transactions on
4179 	 * standbys if there is any open transaction since 2PC commands close
4180 	 * transaction on primary.
4181 	 */
4182 	else if (is_2pc_transaction_query(node))
4183 	{
4184 		close_standby_transactions(frontend, backend);
4185 	}
4186 
4187 	else if (!is_select_query(node, query) || pool_has_function_call(node))
4188 	{
4189 		/*
4190 		 * If the query was not READ SELECT, and we are in an explicit
4191 		 * transaction or disable_load_balance_on_write is 'ALWAYS', remember
4192 		 * that we had a write query in this transaction.
4193 		 */
4194 		if (TSTATE(backend, MAIN_REPLICA ? PRIMARY_NODE_ID : REAL_MAIN_NODE_ID) == 'T' ||
4195 			pool_config->disable_load_balance_on_write == DLBOW_ALWAYS)
4196 		{
4197 			/*
4198 			 * However, if the query is "SET TRANSACTION READ ONLY" or its
4199 			 * variant, don't set it.
4200 			 */
4201 			if (!pool_is_transaction_read_only(node))
4202 			{
4203 				ereport(DEBUG1,
4204 						(errmsg("not SET TRANSACTION READ ONLY")));
4205 
4206 				pool_set_writing_transaction();
4207 			}
4208 		}
4209 
4210 		/*
4211 		 * If the query was CREATE TEMP TABLE, discard temp table relcache
4212 		 * because we might have had persistent table relation cache which has
4213 		 * table name as the temp table.
4214 		 */
4215 		if (IsA(node, CreateStmt))
4216 		{
4217 			CreateStmt *create_table_stmt = (CreateStmt *) node;
4218 
4219 			if (create_table_stmt->relation->relpersistence == 't')
4220 				discard_temp_table_relcache();
4221 		}
4222 	}
4223 }
4224 
4225 /*
4226  * read message length (V3 only)
4227  */
4228 int
pool_read_message_length(POOL_CONNECTION_POOL * cp)4229 pool_read_message_length(POOL_CONNECTION_POOL * cp)
4230 {
4231 	int			length,
4232 				length0;
4233 	int			i;
4234 
4235 	/* read message from main node */
4236 	pool_read(CONNECTION(cp, MAIN_NODE_ID), &length0, sizeof(length0));
4237 	length0 = ntohl(length0);
4238 
4239 	ereport(DEBUG5,
4240 			(errmsg("reading message length"),
4241 			 errdetail("slot: %d length: %d", MAIN_NODE_ID, length0)));
4242 
4243 	for (i = 0; i < NUM_BACKENDS; i++)
4244 	{
4245 		if (!VALID_BACKEND(i) || IS_MAIN_NODE_ID(i))
4246 		{
4247 			continue;
4248 		}
4249 
4250 		pool_read(CONNECTION(cp, i), &length, sizeof(length));
4251 
4252 		length = ntohl(length);
4253 		ereport(DEBUG5,
4254 				(errmsg("reading message length"),
4255 				 errdetail("slot: %d length: %d", i, length)));
4256 
4257 		if (length != length0)
4258 		{
4259 			ereport(LOG,
4260 					(errmsg("unable to read message length"),
4261 					 errdetail("message length (%d) in slot %d does not match with slot 0(%d)", length, i, length0)));
4262 			return -1;
4263 		}
4264 
4265 	}
4266 
4267 	if (length0 < 0)
4268 		ereport(ERROR,
4269 				(errmsg("unable to read message length"),
4270 				 errdetail("invalid message length (%d)", length)));
4271 
4272 	return length0;
4273 }
4274 
4275 /*
4276  * read message length2 (V3 only)
4277  * unlike pool_read_message_length, this returns an array of message length.
4278  * The array is in the static storage, thus it will be destroyed by subsequent calls.
4279  */
4280 int *
pool_read_message_length2(POOL_CONNECTION_POOL * cp)4281 pool_read_message_length2(POOL_CONNECTION_POOL * cp)
4282 {
4283 	int			length,
4284 				length0;
4285 	int			i;
4286 	static int	length_array[MAX_CONNECTION_SLOTS];
4287 
4288 	/* read message from main node */
4289 	pool_read(CONNECTION(cp, MAIN_NODE_ID), &length0, sizeof(length0));
4290 
4291 	length0 = ntohl(length0);
4292 	length_array[MAIN_NODE_ID] = length0;
4293 	ereport(DEBUG5,
4294 			(errmsg("reading message length"),
4295 			 errdetail("main slot: %d length: %d", MAIN_NODE_ID, length0)));
4296 
4297 	for (i = 0; i < NUM_BACKENDS; i++)
4298 	{
4299 		if (VALID_BACKEND(i) && !IS_MAIN_NODE_ID(i))
4300 		{
4301 			pool_read(CONNECTION(cp, i), &length, sizeof(length));
4302 
4303 			length = ntohl(length);
4304 			ereport(DEBUG5,
4305 					(errmsg("reading message length"),
4306 					 errdetail("main slot: %d length: %d", i, length)));
4307 
4308 			if (length != length0)
4309 			{
4310 				ereport(LOG,
4311 						(errmsg("reading message length"),
4312 						 errdetail("message length (%d) in slot %d does not match with slot 0(%d)", length, i, length0)));
4313 			}
4314 
4315 			if (length < 0)
4316 			{
4317 				ereport(ERROR,
4318 						(errmsg("unable to read message length"),
4319 						 errdetail("invalid message length (%d)", length)));
4320 			}
4321 
4322 			length_array[i] = length;
4323 		}
4324 
4325 	}
4326 	return &length_array[0];
4327 }
4328 
4329 signed char
pool_read_kind(POOL_CONNECTION_POOL * cp)4330 pool_read_kind(POOL_CONNECTION_POOL * cp)
4331 {
4332 	char		kind0,
4333 				kind;
4334 	int			i;
4335 
4336 	kind = -1;
4337 	kind0 = 0;
4338 
4339 	for (i = 0; i < NUM_BACKENDS; i++)
4340 	{
4341 		if (!VALID_BACKEND(i))
4342 		{
4343 			continue;
4344 		}
4345 
4346 		pool_read(CONNECTION(cp, i), &kind, sizeof(kind));
4347 
4348 		if (IS_MAIN_NODE_ID(i))
4349 		{
4350 			kind0 = kind;
4351 		}
4352 		else
4353 		{
4354 			if (kind != kind0)
4355 			{
4356 				char	   *message;
4357 
4358 				if (kind0 == 'E')
4359 				{
4360 					if (pool_extract_error_message(false, MAIN(cp), MAJOR(cp), true, &message) == 1)
4361 					{
4362 						ereport(LOG,
4363 								(errmsg("pool_read_kind: error message from main backend:%s", message)));
4364 						pfree(message);
4365 					}
4366 				}
4367 				else if (kind == 'E')
4368 				{
4369 					if (pool_extract_error_message(false, CONNECTION(cp, i), MAJOR(cp), true, &message) == 1)
4370 					{
4371 						ereport(LOG,
4372 								(errmsg("pool_read_kind: error message from %d th backend:%s", i, message)));
4373 						pfree(message);
4374 					}
4375 				}
4376 				ereport(ERROR,
4377 						(errmsg("unable to read message kind"),
4378 						 errdetail("kind does not match between main(%x) slot[%d] (%x)", kind0, i, kind)));
4379 			}
4380 		}
4381 	}
4382 
4383 	return kind;
4384 }
4385 
4386 int
pool_read_int(POOL_CONNECTION_POOL * cp)4387 pool_read_int(POOL_CONNECTION_POOL * cp)
4388 {
4389 	int			data0,
4390 				data;
4391 	int			i;
4392 
4393 	data = -1;
4394 	data0 = 0;
4395 
4396 	for (i = 0; i < NUM_BACKENDS; i++)
4397 	{
4398 		if (!VALID_BACKEND(i))
4399 		{
4400 			continue;
4401 		}
4402 		pool_read(CONNECTION(cp, i), &data, sizeof(data));
4403 		if (IS_MAIN_NODE_ID(i))
4404 		{
4405 			data0 = data;
4406 		}
4407 		else
4408 		{
4409 			if (data != data0)
4410 			{
4411 				ereport(ERROR,
4412 						(errmsg("unable to read int value"),
4413 						 errdetail("data does not match between between main(%x) slot[%d] (%x)", data0, i, data)));
4414 
4415 			}
4416 		}
4417 	}
4418 	return data;
4419 }
4420 
4421 /*
4422  * Acquire snapshot in snapshot isolation mode.
4423  * If tstate_check is true, check the transaction state is 'T' (idle in transaction).
4424  * In case of starting an internal transaction, this should be false.
4425  */
4426 static void
si_get_snapshot(POOL_CONNECTION * frontend,POOL_CONNECTION_POOL * backend,Node * node,bool tstate_check)4427 si_get_snapshot(POOL_CONNECTION * frontend, POOL_CONNECTION_POOL * backend, Node * node, bool tstate_check)
4428 {
4429 	POOL_SESSION_CONTEXT *session_context;
4430 
4431 	session_context = pool_get_session_context(true);
4432 	if (!session_context)
4433 		return;
4434 
4435 	/*
4436 	 * From now on it is possible that query is actually sent to backend.
4437 	 * So we need to acquire snapshot while there's no committing backend
4438 	 * in snapshot isolation mode except while processing reset queries.
4439 	 * For this purpose, we send a query to know whether the transaction
4440 	 * is READ ONLY or not.  Sending actual user's query is not possible
4441 	 * because it might cause rw-conflict, which in turn causes a
4442 	 * deadlock.
4443 	 */
4444 	if (pool_config->backend_clustering_mode == CM_SNAPSHOT_ISOLATION &&
4445 		(!tstate_check || (tstate_check && TSTATE(backend, MAIN_NODE_ID) == 'T')) &&
4446 		si_snapshot_acquire_command(node) &&
4447 		!si_snapshot_prepared() &&
4448 		frontend && frontend->no_forward == 0)
4449 	{
4450 		int	i;
4451 
4452 		si_acquire_snapshot();
4453 
4454 		for (i = 0; i < NUM_BACKENDS; i++)
4455 		{
4456 			static	char *si_query = "SELECT current_setting('transaction_read_only')";
4457 			POOL_SELECT_RESULT *res;
4458 
4459 			/* We cannot use VALID_BACKEND macro here because load balance
4460 			 * node has not been decided yet.
4461 			 */
4462 			if (!VALID_BACKEND_RAW(i))
4463 				continue;
4464 
4465 			do_query(CONNECTION(backend, i), si_query, &res, MAJOR(backend));
4466 			if (res)
4467 			{
4468 				if (res->data[0] && !strcmp(res->data[0], "on"))
4469 					session_context->transaction_read_only = true;
4470 				else
4471 					session_context->transaction_read_only = false;
4472 				free_select_result(res);
4473 			}
4474 			per_node_statement_log(backend, i, si_query);
4475 		}
4476 
4477 		si_snapshot_acquired();
4478 	}
4479 }
4480